//! # CFDP Destination Entity Module //! //! The [DestinationHandler] is the primary component of this module which converts the PDUs sent //! from a remote source entity back to a file. A file copy operation on the receiver side //! is started with the reception of a Metadata PDU, for example one generated by the //! [spacepackets::cfdp::pdu::metadata::MetadataPduCreator]. After that, file packet PDUs, for //! example generated with the [spacepackets::cfdp::pdu::file_data] module, can be inserted into //! the destination handler and will be assembled into a file. //! //! A destination entity might still generate packets which need to be sent back to the source //! entity of the file transfer. However, this handler allows freedom of communication like the //! source entity by using a user-provided [PduSendProvider] to send all generated PDUs. //! //! The transaction will be finished when following conditions are met: //! //! 1. A valid EOF PDU, for example one generated by the [spacepackets::cfdp::pdu::eof::EofPdu] //! helper, has been inserted into the class. //! 2. The file checksum verification has been successful. If this is not the case for the //! unacknowledged mode, the handler will re-attempt the checksum calculation up to a certain //! threshold called the check limit. If the threshold is reached, the transaction will //! finish with a failure. //! //! ### Unacknowledged mode with closure //! //! 3. Finished PDU has been sent back to the remote side. //! //! ### Acknowledged mode (*not implemented yet*) //! //! 3. An EOF ACK PDU has been sent back to the remote side. //! 4. A Finished PDU has been sent back to the remote side. //! 5. A Finished PDU ACK was received. use crate::{user::TransactionFinishedParams, DummyPduProvider, GenericSendError, PduProvider}; use core::str::{from_utf8, from_utf8_unchecked, Utf8Error}; use super::{ filestore::{FilestoreError, NativeFilestore, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCountdown, StdRemoteEntityConfigProvider, StdTimerCreator, TimerContext, TimerCreatorProvider, TransactionId, UserFaultHookProvider, }; use smallvec::SmallVec; use spacepackets::{ cfdp::{ pdu::{ eof::EofPdu, file_data::FileDataPdu, finished::{DeliveryCode, FileStatus, FinishedPduCreator}, metadata::{MetadataGenericParams, MetadataPduReader}, CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, }, tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, GenericTlv, ReadableTlv, TlvType}, ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode, }, util::{UnsignedByteField, UnsignedEnum}, }; #[derive(Debug)] struct FileProperties { src_file_name: [u8; u8::MAX as usize], src_file_name_len: usize, dest_file_name: [u8; u8::MAX as usize], dest_file_name_len: usize, dest_path_buf: [u8; u8::MAX as usize * 2], dest_file_path_len: usize, } #[derive(Debug, PartialEq, Eq, Copy, Clone)] enum CompletionDisposition { Completed = 0, Cancelled = 1, } /// This enumeration models the different transaction steps of the destination entity handler. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum TransactionStep { Idle = 0, TransactionStart = 1, ReceivingFileDataPdus = 2, ReceivingFileDataPdusWithCheckLimitHandling = 3, SendingAckPdu = 4, TransferCompletion = 5, SendingFinishedPdu = 6, } // This contains transfer state parameters for destination transaction. #[derive(Debug)] struct TransferState { transaction_id: Option, metadata_params: MetadataGenericParams, progress: u64, // file_size_eof: u64, metadata_only: bool, condition_code: ConditionCode, delivery_code: DeliveryCode, fault_location_finished: Option, file_status: FileStatus, completion_disposition: CompletionDisposition, checksum: u32, current_check_count: u32, current_check_timer: Option, } impl Default for TransferState { fn default() -> Self { Self { transaction_id: None, metadata_params: Default::default(), progress: Default::default(), // file_size_eof: Default::default(), metadata_only: false, condition_code: ConditionCode::NoError, delivery_code: DeliveryCode::Incomplete, fault_location_finished: None, file_status: FileStatus::Unreported, completion_disposition: CompletionDisposition::Completed, checksum: 0, current_check_count: 0, current_check_timer: None, } } } // This contains parameters for destination transaction. #[derive(Debug)] struct TransactionParams { tstate: TransferState, pdu_conf: CommonPduConfig, file_properties: FileProperties, cksum_buf: [u8; 1024], msgs_to_user_size: usize, // TODO: Should we make this configurable? msgs_to_user_buf: [u8; 1024], remote_cfg: Option, } impl TransactionParams { fn transmission_mode(&self) -> TransmissionMode { self.pdu_conf.trans_mode } } impl Default for FileProperties { fn default() -> Self { Self { src_file_name: [0; u8::MAX as usize], src_file_name_len: Default::default(), dest_file_name: [0; u8::MAX as usize], dest_file_name_len: Default::default(), dest_path_buf: [0; u8::MAX as usize * 2], dest_file_path_len: Default::default(), } } } impl TransactionParams { fn file_size(&self) -> u64 { self.tstate.metadata_params.file_size } fn metadata_params(&self) -> &MetadataGenericParams { &self.tstate.metadata_params } } impl Default for TransactionParams { fn default() -> Self { Self { pdu_conf: Default::default(), cksum_buf: [0; 1024], msgs_to_user_size: 0, msgs_to_user_buf: [0; 1024], tstate: Default::default(), file_properties: Default::default(), remote_cfg: None, } } } impl TransactionParams { fn reset(&mut self) { self.tstate.condition_code = ConditionCode::NoError; self.tstate.delivery_code = DeliveryCode::Incomplete; self.tstate.file_status = FileStatus::Unreported; } } #[derive(Debug, thiserror::Error)] pub enum DestError { /// File directive expected, but none specified #[error("expected file directive")] DirectiveFieldEmpty, #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")] CantProcessPacketType { pdu_type: PduType, directive_type: Option, }, #[error("can not process file data PDUs in current state")] WrongStateForFileData, #[error("can not process EOF PDUs in current state")] WrongStateForEof, // Received new metadata PDU while being already being busy with a file transfer. #[error("busy with transfer")] RecvdMetadataButIsBusy, #[error("empty source file field")] EmptySrcFileField, #[error("empty dest file field")] EmptyDestFileField, #[error("packets to be sent are still left")] PacketToSendLeft, #[error("pdu error {0}")] Pdu(#[from] PduError), #[error("io error {0}")] #[cfg(feature = "std")] Io(#[from] std::io::Error), #[error("file store error {0}")] Filestore(#[from] FilestoreError), #[error("path conversion error {0}")] PathConversion(#[from] Utf8Error), #[error("error building dest path from source file name and dest folder")] PathConcat, #[error("no remote entity configuration found for {0:?}")] NoRemoteCfgFound(UnsignedByteField), #[error("issue sending PDU: {0}")] SendError(#[from] GenericSendError), #[error("cfdp feature not implemented")] NotImplemented, } /// This is the primary CFDP destination handler. It models the CFDP destination entity, which is /// primarily responsible for receiving files sent from another CFDP entity. It performs the /// reception side of File Copy Operations. /// /// The [DestinationHandler::state_machine] function is the primary function to drive the /// destination handler. It can be used to insert packets into the destination /// handler and driving the state machine, which might generate new packets to be sent to the /// remote entity. Please note that the destination handler can also only process Metadata, EOF and /// Prompt PDUs in addition to ACK PDUs where the acknowledged PDU is the Finished PDU. /// All generated packets are sent using the user provided [PduSendProvider]. /// /// The handler requires the [alloc] feature but will allocated all required memory on construction /// time. This means that the handler is still suitable for embedded systems where run-time /// allocation is prohibited. Furthermore, it uses the [VirtualFilestore] abstraction to allow /// usage on systems without a [std] filesystem. /// /// This handler is able to deal with file copy operations to directories, similarly to how the /// UNIX tool `cp` works. If the destination path is a directory instead of a regular full path, /// the source path base file name will be appended to the destination path to form the resulting /// new full path. /// // This handler also does not support concurrency out of the box but is flexible enough to be used /// in different concurrent contexts. For example, you can dynamically create new handlers and /// run them inside a thread pool, or move the newly created handler to a new thread.""" pub struct DestinationHandler< PduSender: PduSendProvider, UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, CheckTimerCreator: TimerCreatorProvider, CheckTimerProvider: CountdownProvider, > { local_cfg: LocalEntityConfig, step: TransactionStep, state: State, tparams: TransactionParams, packet_buf: alloc::vec::Vec, pub pdu_sender: PduSender, pub vfs: Vfs, pub remote_cfg_table: RemoteCfgTable, pub check_timer_creator: CheckTimerCreator, } #[cfg(feature = "std")] pub type StdDestinationHandler = DestinationHandler< PduSender, UserFaultHook, NativeFilestore, StdRemoteEntityConfigProvider, StdTimerCreator, StdCountdown, >; impl< PduSender: PduSendProvider, UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, TimerCreator: TimerCreatorProvider, Countdown: CountdownProvider, > DestinationHandler { /// Constructs a new destination handler. /// /// # Arguments /// /// * `local_cfg` - The local CFDP entity configuration. /// * `max_packet_len` - The maximum expected generated packet size in bytes. Each time a /// packet is sent, it will be buffered inside an internal buffer. The length of this buffer /// will be determined by this parameter. This parameter can either be a known upper bound, /// or it can specifically be determined by the largest packet size parameter of all remote /// entity configurations in the passed `remote_cfg_table`. /// * `pdu_sender` - [PduSendProvider] used to send generated PDU packets. /// * `vfs` - [VirtualFilestore] implementation used by the handler, which decouples the CFDP /// implementation from the underlying filestore/filesystem. This allows to use this handler /// for embedded systems where a standard runtime might not be available. /// * `remote_cfg_table` - The [RemoteEntityConfigProvider] used to look up remote /// entities and target specific configuration for file copy operations. /// * `check_timer_creator` - [TimerCreatorProvider] used by the CFDP handler to generate /// timers required by various tasks. This allows to use this handler for embedded systems /// where the standard time APIs might not be available. pub fn new( local_cfg: LocalEntityConfig, max_packet_len: usize, pdu_sender: PduSender, vfs: Vfs, remote_cfg_table: RemoteCfgTable, timer_creator: TimerCreator, ) -> Self { Self { local_cfg, step: TransactionStep::Idle, state: State::Idle, tparams: Default::default(), packet_buf: alloc::vec![0; max_packet_len], pdu_sender, vfs, remote_cfg_table, check_timer_creator: timer_creator, } } pub fn state_machine_no_packet( &mut self, cfdp_user: &mut impl CfdpUser, ) -> Result { self.state_machine(cfdp_user, None::<&DummyPduProvider>) } /// This is the core function to drive the destination handler. It is also used to insert /// packets into the destination handler. /// /// The state machine should either be called if a packet with the appropriate destination ID /// is received and periodically to perform all CFDP related tasks, for example /// checking for timeouts or missed file segments. /// /// The function returns the number of sent PDU packets on success. pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, packet_to_insert: Option<&impl PduProvider>, ) -> Result { if let Some(packet) = packet_to_insert { self.insert_packet(cfdp_user, packet)?; } match self.state { State::Idle => { // TODO: In acknowledged mode, add timer handling. Ok(0) } State::Busy => self.fsm_busy(cfdp_user), State::Suspended => { // There is now way to suspend the handler currently anyway. Ok(0) } } } /// This function models the Cancel.request CFDP primitive and is the recommended way /// to cancel a transaction. It will cause a Notice Of Cancellation at this entity. /// Please note that the state machine might still be active because a cancelled transfer /// might still require some packets to be sent to the remote sender entity. /// /// If no unexpected errors occur, this function returns whether the current transfer /// was cancelled. It returns [false] if the state machine is currently idle or if there /// is a transaction ID missmatch. pub fn cancel_request(&mut self, transaction_id: &TransactionId) -> bool { if self.state() == super::State::Idle { return false; } if let Some(active_id) = self.transaction_id() { if active_id == *transaction_id { self.trigger_notice_of_completion_cancelled( ConditionCode::CancelRequestReceived, EntityIdTlv::new(self.local_cfg.id), ); self.step = TransactionStep::TransferCompletion; return true; } } false } /// Returns [None] if the state machine is IDLE, and the transmission mode of the current /// request otherwise. pub fn transmission_mode(&self) -> Option { if self.state == State::Idle { return None; } Some(self.tparams.transmission_mode()) } pub fn transaction_id(&self) -> Option { self.tstate().transaction_id } fn insert_packet( &mut self, cfdp_user: &mut impl CfdpUser, packet_to_insert: &impl PduProvider, ) -> Result<(), DestError> { if packet_to_insert.packet_target()? != PacketTarget::DestEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. return Err(DestError::CantProcessPacketType { pdu_type: packet_to_insert.pdu_type(), directive_type: packet_to_insert.file_directive_type(), }); } match packet_to_insert.pdu_type() { PduType::FileDirective => { if packet_to_insert.file_directive_type().is_none() { return Err(DestError::DirectiveFieldEmpty); } self.handle_file_directive( cfdp_user, packet_to_insert.file_directive_type().unwrap(), packet_to_insert.pdu(), ) } PduType::FileData => self.handle_file_data(cfdp_user, packet_to_insert.pdu()), } } fn handle_file_directive( &mut self, cfdp_user: &mut impl CfdpUser, pdu_directive: FileDirectiveType, raw_packet: &[u8], ) -> Result<(), DestError> { match pdu_directive { FileDirectiveType::EofPdu => self.handle_eof_pdu(cfdp_user, raw_packet)?, FileDirectiveType::FinishedPdu | FileDirectiveType::NakPdu | FileDirectiveType::KeepAlivePdu => { return Err(DestError::CantProcessPacketType { pdu_type: PduType::FileDirective, directive_type: Some(pdu_directive), }); } FileDirectiveType::AckPdu => { return Err(DestError::NotImplemented); } FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?, FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?, }; Ok(()) } fn handle_metadata_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> { if self.state != State::Idle { return Err(DestError::RecvdMetadataButIsBusy); } let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?; self.tparams.reset(); self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params(); let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value()); if remote_cfg.is_none() { return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id())); } self.tparams.remote_cfg = Some(*remote_cfg.unwrap()); // TODO: Support for metadata only PDUs. let src_name = metadata_pdu.src_file_name(); let dest_name = metadata_pdu.dest_file_name(); if src_name.is_empty() && dest_name.is_empty() { self.tparams.tstate.metadata_only = true; } if !self.tparams.tstate.metadata_only && src_name.is_empty() { return Err(DestError::EmptySrcFileField); } if !self.tparams.tstate.metadata_only && dest_name.is_empty() { return Err(DestError::EmptyDestFileField); } if !self.tparams.tstate.metadata_only { self.tparams.file_properties.src_file_name[..src_name.len_value()] .copy_from_slice(src_name.value()); self.tparams.file_properties.src_file_name_len = src_name.len_value(); if dest_name.is_empty() { return Err(DestError::EmptyDestFileField); } self.tparams.file_properties.dest_file_name[..dest_name.len_value()] .copy_from_slice(dest_name.value()); self.tparams.file_properties.dest_file_name_len = dest_name.len_value(); self.tparams.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf(); self.tparams.msgs_to_user_size = 0; } if !metadata_pdu.options().is_empty() { for option_tlv in metadata_pdu.options_iter().unwrap() { if option_tlv.is_standard_tlv() && option_tlv.tlv_type().unwrap() == TlvType::MsgToUser { self.tparams .msgs_to_user_buf .copy_from_slice(option_tlv.raw_data().unwrap()); self.tparams.msgs_to_user_size += option_tlv.len_full(); } } } self.state = State::Busy; self.step = TransactionStep::TransactionStart; Ok(()) } fn handle_file_data( &mut self, user: &mut impl CfdpUser, raw_packet: &[u8], ) -> Result<(), DestError> { if self.state == State::Idle || (self.step != TransactionStep::ReceivingFileDataPdus && self.step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling) { return Err(DestError::WrongStateForFileData); } let fd_pdu = FileDataPdu::from_bytes(raw_packet)?; if self.local_cfg.indication_cfg.file_segment_recv { user.file_segment_recvd_indication(&FileSegmentRecvdParams { id: self.tstate().transaction_id.unwrap(), offset: fd_pdu.offset(), length: fd_pdu.file_data().len(), segment_metadata: fd_pdu.segment_metadata(), }); } if let Err(e) = self.vfs.write_data( // Safety: It was already verified that the path is valid during the transaction start. unsafe { from_utf8_unchecked( //from_utf8( &self.tparams.file_properties.dest_path_buf [0..self.tparams.file_properties.dest_file_path_len], ) }, fd_pdu.offset(), fd_pdu.file_data(), ) { self.declare_fault(ConditionCode::FilestoreRejection); return Err(e.into()); } self.tstate_mut().progress += fd_pdu.file_data().len() as u64; Ok(()) } fn handle_eof_pdu( &mut self, cfdp_user: &mut impl CfdpUser, raw_packet: &[u8], ) -> Result<(), DestError> { if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus { return Err(DestError::WrongStateForEof); } let eof_pdu = EofPdu::from_bytes(raw_packet)?; if self.local_cfg.indication_cfg.eof_recv { // Unwrap is okay here, application logic ensures that transaction ID is valid here. cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap()); } let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError { self.handle_no_error_eof_pdu(&eof_pdu)? } else { // This is an EOF (Cancel), perform Cancel Response Procedures according to chapter // 4.6.6 of the standard. self.trigger_notice_of_completion_cancelled( eof_pdu.condition_code(), EntityIdTlv::new(self.tparams.remote_cfg.unwrap().entity_id), ); self.tparams.tstate.progress = eof_pdu.file_size(); if eof_pdu.file_size() > 0 { self.tparams.tstate.delivery_code = DeliveryCode::Incomplete; } else { self.tparams.tstate.delivery_code = DeliveryCode::Complete; } // TODO: The cancel EOF also supplies a checksum and a progress number. We could cross // check that checksum, but how would we deal with a checksum failure? The standard // does not specify anything for this case.. It could be part of the status report // issued to the user though. true }; if regular_transfer_finish { self.file_transfer_complete_transition(); } Ok(()) } fn trigger_notice_of_completion_cancelled( &mut self, cond_code: ConditionCode, fault_location: EntityIdTlv, ) { self.tparams.tstate.completion_disposition = CompletionDisposition::Cancelled; self.tparams.tstate.condition_code = cond_code; self.tparams.tstate.fault_location_finished = Some(fault_location); // For anything larger than 0, we'd have to do a checksum check to verify whether // the delivery is really complete, and we need the EOF checksum for that.. if self.tparams.tstate.progress == 0 { self.tparams.tstate.delivery_code = DeliveryCode::Complete; } } /// Returns whether the transfer can be completed regularly. fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result { // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size if self.tparams.tstate.progress > eof_pdu.file_size() && self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError { return Ok(false); } else if (self.tparams.tstate.progress < eof_pdu.file_size()) && self.tparams.transmission_mode() == TransmissionMode::Acknowledged { // CFDP 4.6.4.3.1: The end offset of the last received file segment and the file // size as stated in the EOF PDU is not the same, so we need to add that segment to // the lost segments for the deferred lost segment detection procedure. // TODO: Proper lost segment handling. // self._params.acked_params.lost_seg_tracker.add_lost_segment( // (self._params.fp.progress, self._params.fp.file_size_eof) // ) } self.tparams.tstate.checksum = eof_pdu.file_checksum(); if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged && !self.checksum_verify(self.tparams.tstate.checksum) { if self.declare_fault(ConditionCode::FileChecksumFailure) != FaultHandlerCode::IgnoreError { return Ok(false); } self.start_check_limit_handling(); return Ok(false); } Ok(true) } fn file_transfer_complete_transition(&mut self) { if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged { self.step = TransactionStep::TransferCompletion; } else { // TODO: Prepare ACK PDU somehow. self.step = TransactionStep::SendingAckPdu; } } fn checksum_verify(&mut self, checksum: u32) -> bool { let mut file_delivery_complete = false; if self.tparams.metadata_params().checksum_type == ChecksumType::NullChecksum || self.tparams.tstate.metadata_only { file_delivery_complete = true; } else { match self.vfs.checksum_verify( checksum, // Safety: It was already verified that the path is valid during the transaction start. unsafe { from_utf8_unchecked( &self.tparams.file_properties.dest_path_buf [0..self.tparams.file_properties.dest_file_path_len], ) }, self.tparams.metadata_params().checksum_type, self.tparams.tstate.progress, &mut self.tparams.cksum_buf, ) { Ok(checksum_success) => { file_delivery_complete = checksum_success; if !checksum_success { self.tparams.tstate.delivery_code = DeliveryCode::Incomplete; self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure; } } Err(e) => match e { FilestoreError::ChecksumTypeNotImplemented(_) => { self.declare_fault(ConditionCode::UnsupportedChecksumType); // For this case, the applicable algorithm shall be the the null checksum, // which is always succesful. file_delivery_complete = true; } _ => { self.declare_fault(ConditionCode::FilestoreRejection); // Treat this equivalent to a failed checksum procedure. } }, }; } if file_delivery_complete { self.tparams.tstate.delivery_code = DeliveryCode::Complete; self.tparams.tstate.condition_code = ConditionCode::NoError; } file_delivery_complete } fn start_check_limit_handling(&mut self) { self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling; self.tparams.tstate.current_check_timer = Some(self.check_timer_creator.create_countdown( TimerContext::CheckLimit { local_id: self.local_cfg.id, remote_id: self.tparams.remote_cfg.unwrap().entity_id, entity_type: EntityType::Receiving, }, )); self.tparams.tstate.current_check_count = 0; } fn check_limit_handling(&mut self) { if self.tparams.tstate.current_check_timer.is_none() { return; } let check_timer = self.tparams.tstate.current_check_timer.as_ref().unwrap(); if check_timer.has_expired() { if self.checksum_verify(self.tparams.tstate.checksum) { self.file_transfer_complete_transition(); return; } if self.tparams.tstate.current_check_count + 1 >= self.tparams.remote_cfg.unwrap().check_limit { self.declare_fault(ConditionCode::CheckLimitReached); } else { self.tparams.tstate.current_check_count += 1; self.tparams .tstate .current_check_timer .as_mut() .unwrap() .reset(); } } } pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> { Err(DestError::NotImplemented) } fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { let mut sent_packets = 0; if self.step == TransactionStep::TransactionStart { let result = self.transaction_start(cfdp_user); if let Err(e) = result { // If we can not even start the transaction properly, reset the handler. // We could later let the user do this optionally, but for now this is the safer // approach to prevent inconsistent states of the handler. self.reset(); return Err(e); } } if self.step == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { self.check_limit_handling(); } if self.step == TransactionStep::TransferCompletion { sent_packets += self.transfer_completion(cfdp_user)?; } if self.step == TransactionStep::SendingAckPdu { return Err(DestError::NotImplemented); } if self.step == TransactionStep::SendingFinishedPdu { self.reset(); } Ok(sent_packets) } /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. pub fn step(&self) -> TransactionStep { self.step } /// Get the step, which denotes whether the CFDP handler is active, and which CFDP class /// is used if it is active. pub fn state(&self) -> State { self.state } fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { let dest_name = from_utf8( &self.tparams.file_properties.dest_file_name [..self.tparams.file_properties.dest_file_name_len], )?; self.tparams.file_properties.dest_path_buf[0..dest_name.len()] .copy_from_slice(dest_name.as_bytes()); self.tparams.file_properties.dest_file_path_len = dest_name.len(); let source_id = self.tparams.pdu_conf.source_id(); let id = TransactionId::new(source_id, self.tparams.pdu_conf.transaction_seq_num); let src_name = from_utf8( &self.tparams.file_properties.src_file_name [0..self.tparams.file_properties.src_file_name_len], )?; let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new(); let mut num_msgs_to_user = 0; if self.tparams.msgs_to_user_size > 0 { let mut index = 0; while index < self.tparams.msgs_to_user_size { // This should never panic as the validity of the options was checked beforehand. let msgs_to_user_tlv = MsgToUserTlv::from_bytes(&self.tparams.msgs_to_user_buf[index..]) .expect("message to user creation failed unexpectedly"); msgs_to_user.push(msgs_to_user_tlv); index += msgs_to_user_tlv.len_full(); num_msgs_to_user += 1; } } let metadata_recvd_params = MetadataReceivedParams { id, source_id, file_size: self.tparams.file_size(), src_file_name: src_name, dest_file_name: dest_name, msgs_to_user: &msgs_to_user[..num_msgs_to_user], }; self.tparams.tstate.transaction_id = Some(id); cfdp_user.metadata_recvd_indication(&metadata_recvd_params); if self.vfs.exists(dest_name)? && self.vfs.is_dir(dest_name)? { // Create new destination path by concatenating the last part of the source source // name and the destination folder. For example, for a source file of /tmp/hello.txt // and a destination name of /home/test, the resulting file name should be // /home/test/hello.txt // Safety: It was already verified that the path is valid during the transaction start. let source_file_name = self.vfs.file_name(src_name)?; if source_file_name.is_none() { return Err(DestError::PathConcat); } let source_name = source_file_name.unwrap(); self.tparams.file_properties.dest_path_buf[dest_name.len()] = b'/'; self.tparams.file_properties.dest_path_buf [dest_name.len() + 1..dest_name.len() + 1 + source_name.len()] .copy_from_slice(source_name.as_bytes()); self.tparams.file_properties.dest_file_path_len += 1 + source_name.len(); } let dest_path_str = from_utf8( &self.tparams.file_properties.dest_path_buf [0..self.tparams.file_properties.dest_file_path_len], )?; if self.vfs.exists(dest_path_str)? { self.vfs.truncate_file(dest_path_str)?; } else { self.vfs.create_file(dest_path_str)?; } self.tparams.tstate.file_status = FileStatus::Retained; self.step = TransactionStep::ReceivingFileDataPdus; Ok(()) } fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { let mut sent_packets = 0; self.notice_of_completion(cfdp_user)?; if self.tparams.transmission_mode() == TransmissionMode::Acknowledged || self.tparams.metadata_params().closure_requested { sent_packets += self.send_finished_pdu()?; } self.step = TransactionStep::SendingFinishedPdu; Ok(sent_packets) } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { if self.tstate().completion_disposition == CompletionDisposition::Completed { // TODO: Execute any filestore requests } else if self .tparams .remote_cfg .as_ref() .unwrap() .disposition_on_cancellation && self.tstate().delivery_code == DeliveryCode::Incomplete { // Safety: We already verified that the path is valid during the transaction start. let dest_path = unsafe { from_utf8_unchecked( &self.tparams.file_properties.dest_path_buf [0..self.tparams.file_properties.dest_file_path_len], ) }; if self.vfs.exists(dest_path)? && self.vfs.is_file(dest_path)? { self.vfs.remove_file(dest_path)?; } self.tstate_mut().file_status = FileStatus::DiscardDeliberately; } let tstate = self.tstate(); let transaction_finished_params = TransactionFinishedParams { id: tstate.transaction_id.unwrap(), condition_code: tstate.condition_code, delivery_code: tstate.delivery_code, file_status: tstate.file_status, }; cfdp_user.transaction_finished_indication(&transaction_finished_params); Ok(()) } fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode { // Cache those, because they might be reset when abandoning the transaction. let transaction_id = self.tstate().transaction_id.unwrap(); let progress = self.tstate().progress; let fh_code = self .local_cfg .fault_handler .get_fault_handler(condition_code); match fh_code { FaultHandlerCode::NoticeOfCancellation => { self.notice_of_cancellation(condition_code, EntityIdTlv::new(self.local_cfg().id)); } FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(), FaultHandlerCode::IgnoreError => (), FaultHandlerCode::AbandonTransaction => self.abandon_transaction(), } self.local_cfg .fault_handler .report_fault(transaction_id, condition_code, progress) } fn notice_of_cancellation( &mut self, condition_code: ConditionCode, fault_location: EntityIdTlv, ) { self.step = TransactionStep::TransferCompletion; self.tstate_mut().condition_code = condition_code; self.tstate_mut().fault_location_finished = Some(fault_location); self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled; } fn notice_of_suspension(&mut self) { // TODO: Implement suspension handling. } fn abandon_transaction(&mut self) { self.reset(); } /// This function is public to allow completely resetting the handler, but it is explicitely /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. /// Resetting the handler might interfere with these mechanisms and lead to unexpected /// behaviour. pub fn reset(&mut self) { self.step = TransactionStep::Idle; self.state = State::Idle; // self.packets_to_send_ctx.packet_available = false; self.tparams.reset(); } fn send_finished_pdu(&mut self) -> Result { let tstate = self.tstate(); let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0); let finished_pdu = if tstate.condition_code == ConditionCode::NoError || tstate.condition_code == ConditionCode::UnsupportedChecksumType { FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status) } else { FinishedPduCreator::new_generic( pdu_header, tstate.condition_code, tstate.delivery_code, tstate.file_status, &[], tstate.fault_location_finished, ) }; finished_pdu.write_to_bytes(&mut self.packet_buf)?; self.pdu_sender.send_pdu( finished_pdu.pdu_type(), finished_pdu.file_directive_type(), &self.packet_buf[0..finished_pdu.len_written()], )?; Ok(1) } pub fn local_cfg(&self) -> &LocalEntityConfig { &self.local_cfg } fn tstate(&self) -> &TransferState { &self.tparams.tstate } fn tstate_mut(&mut self) -> &mut TransferState { &mut self.tparams.tstate } } #[cfg(test)] mod tests { use core::{cell::Cell, sync::atomic::AtomicBool}; #[allow(unused_imports)] use std::println; use std::{ fs, path::{Path, PathBuf}, string::String, }; use alloc::{sync::Arc, vec::Vec}; use rand::Rng; use spacepackets::{ cfdp::{ lv::Lv, pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket}, ChecksumType, TransmissionMode, }, util::{UbfU16, UnsignedByteFieldU8}, }; use crate::{ filestore::NativeFilestore, tests::{ basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler, LOCAL_ID, REMOTE_ID, }, CountdownProvider, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, TimerCreatorProvider, CRC_32, }; use super::*; #[derive(Debug)] struct TestCheckTimer { counter: Cell, expired: Arc, } impl CountdownProvider for TestCheckTimer { fn has_expired(&self) -> bool { self.expired.load(core::sync::atomic::Ordering::Relaxed) } fn reset(&mut self) { self.counter.set(0); } } impl TestCheckTimer { pub fn new(expired_flag: Arc) -> Self { Self { counter: Cell::new(0), expired: expired_flag, } } } struct TestCheckTimerCreator { check_limit_expired_flag: Arc, } impl TestCheckTimerCreator { pub fn new(expired_flag: Arc) -> Self { Self { check_limit_expired_flag: expired_flag, } } } impl TimerCreatorProvider for TestCheckTimerCreator { type Countdown = TestCheckTimer; fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { match timer_context { TimerContext::CheckLimit { .. } => { TestCheckTimer::new(self.check_limit_expired_flag.clone()) } _ => { panic!("invalid check timer creator, can only be used for check limit handling") } } } } type TestDestHandler = DestinationHandler< TestCfdpSender, TestFaultHandler, NativeFilestore, StdRemoteEntityConfigProvider, TestCheckTimerCreator, TestCheckTimer, >; struct DestHandlerTestbench { check_timer_expired: Arc, handler: TestDestHandler, src_path: PathBuf, dest_path: PathBuf, check_dest_file: bool, check_handler_idle_at_drop: bool, closure_requested: bool, pdu_header: PduHeader, expected_full_data: Vec, expected_file_size: u64, buf: [u8; 512], } impl DestHandlerTestbench { fn new_with_fixed_paths(fault_handler: TestFaultHandler, closure_requested: bool) -> Self { let (src_path, dest_path) = init_full_filepaths_textfile(); assert!(!Path::exists(&dest_path)); Self::new(fault_handler, closure_requested, true, src_path, dest_path) } fn new( fault_handler: TestFaultHandler, closure_requested: bool, check_dest_file: bool, src_path: PathBuf, dest_path: PathBuf, ) -> Self { let check_timer_expired = Arc::new(AtomicBool::new(false)); let test_sender = TestCfdpSender::default(); let dest_handler = default_dest_handler(fault_handler, test_sender, check_timer_expired.clone()); let handler = Self { check_timer_expired, handler: dest_handler, src_path, closure_requested, dest_path, check_dest_file, check_handler_idle_at_drop: true, expected_file_size: 0, pdu_header: create_pdu_header(UbfU16::new(0)), expected_full_data: Vec::new(), buf: [0; 512], }; handler.state_check(State::Idle, TransactionStep::Idle); handler } fn dest_path(&self) -> &PathBuf { &self.dest_path } fn all_fault_queues_empty(&self) -> bool { self.handler .local_cfg .user_fault_hook() .borrow() .all_queues_empty() } #[allow(dead_code)] fn indication_cfg_mut(&mut self) -> &mut IndicationConfig { &mut self.handler.local_cfg.indication_cfg } fn get_next_pdu(&mut self) -> Option { self.handler.pdu_sender.retrieve_next_pdu() } fn indication_cfg(&mut self) -> &IndicationConfig { &self.handler.local_cfg.indication_cfg } fn set_check_timer_expired(&mut self) { self.check_timer_expired .store(true, core::sync::atomic::Ordering::Relaxed); } fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser { TestCfdpUser::new( 0, self.src_path.to_string_lossy().into(), self.dest_path.to_string_lossy().into(), expected_file_size, ) } fn generic_transfer_init( &mut self, user: &mut TestCfdpUser, file_size: u64, ) -> Result { self.expected_file_size = file_size; assert_eq!(user.transaction_indication_call_count, 0); assert_eq!(user.metadata_recv_queue.len(), 0); let metadata_pdu = create_metadata_pdu( &self.pdu_header, self.src_path.as_path(), self.dest_path.as_path(), file_size, self.closure_requested, ); let packet_info = create_packet_info(&metadata_pdu, &mut self.buf); self.handler.state_machine(user, Some(&packet_info))?; assert_eq!(user.metadata_recv_queue.len(), 1); assert_eq!( self.handler.transmission_mode().unwrap(), TransmissionMode::Unacknowledged ); assert_eq!(user.transaction_indication_call_count, 0); assert_eq!(user.metadata_recv_queue.len(), 1); let metadata_recvd = user.metadata_recv_queue.pop_front().unwrap(); assert_eq!(metadata_recvd.source_id, LOCAL_ID.into()); assert_eq!( metadata_recvd.src_file_name, String::from(self.src_path.to_str().unwrap()) ); assert_eq!( metadata_recvd.dest_file_name, String::from(self.dest_path().to_str().unwrap()) ); assert_eq!(metadata_recvd.id, self.handler.transaction_id().unwrap()); assert_eq!(metadata_recvd.file_size, file_size); assert!(metadata_recvd.msgs_to_user.is_empty()); Ok(self.handler.transaction_id().unwrap()) } fn generic_file_data_insert( &mut self, user: &mut TestCfdpUser, offset: u64, file_data_chunk: &[u8], ) -> Result { let filedata_pdu = FileDataPdu::new_no_seg_metadata(self.pdu_header, offset, file_data_chunk); filedata_pdu .write_to_bytes(&mut self.buf) .expect("writing file data PDU failed"); let packet_info = PduRawWithInfo::new(&self.buf).expect("creating packet info failed"); let result = self.handler.state_machine(user, Some(&packet_info)); if self.indication_cfg().file_segment_recv { assert!(!user.file_seg_recvd_queue.is_empty()); assert_eq!(user.file_seg_recvd_queue.back().unwrap().offset, offset); assert_eq!( user.file_seg_recvd_queue.back().unwrap().length, file_data_chunk.len() ); } result } fn generic_eof_no_error( &mut self, user: &mut TestCfdpUser, expected_full_data: Vec, ) -> Result { self.expected_full_data = expected_full_data; assert_eq!(user.finished_indic_queue.len(), 0); let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header); let packet_info = create_packet_info(&eof_pdu, &mut self.buf); self.check_handler_idle_at_drop = true; self.check_dest_file = true; let result = self.handler.state_machine(user, Some(&packet_info)); if self.indication_cfg().eof_recv { assert_eq!(user.eof_recvd_call_count, 1); } result } fn check_completion_indication_success(&mut self, user: &mut TestCfdpUser) { assert_eq!(user.finished_indic_queue.len(), 1); let finished_indication = user.finished_indic_queue.pop_front().unwrap(); assert_eq!( finished_indication.id, self.handler.transaction_id().unwrap() ); assert_eq!(finished_indication.file_status, FileStatus::Retained); assert_eq!(finished_indication.delivery_code, DeliveryCode::Complete); assert_eq!(finished_indication.condition_code, ConditionCode::NoError); } fn state_check(&self, state: State, step: TransactionStep) { assert_eq!(self.handler.state(), state); assert_eq!(self.handler.step(), step); } } // Specifying some checks in the drop method avoids some boilerplate. impl Drop for DestHandlerTestbench { fn drop(&mut self) { assert!(self.all_fault_queues_empty()); assert!(self.handler.pdu_sender.queue_empty()); if self.check_handler_idle_at_drop { self.state_check(State::Idle, TransactionStep::Idle); } if self.check_dest_file { assert!(Path::exists(&self.dest_path)); let read_content = fs::read(&self.dest_path).expect("reading back string failed"); assert_eq!(read_content.len() as u64, self.expected_file_size); assert_eq!(read_content, self.expected_full_data); assert!(fs::remove_file(self.dest_path.as_path()).is_ok()); } } } fn init_full_filepaths_textfile() -> (PathBuf, PathBuf) { ( tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(), tempfile::NamedTempFile::new() .unwrap() .into_temp_path() .to_path_buf(), ) } fn default_dest_handler( test_fault_handler: TestFaultHandler, test_packet_sender: TestCfdpSender, check_timer_expired: Arc, ) -> TestDestHandler { let local_entity_cfg = LocalEntityConfig { id: REMOTE_ID.into(), indication_cfg: IndicationConfig::default(), fault_handler: FaultHandler::new(test_fault_handler), }; DestinationHandler::new( local_entity_cfg, 2048, test_packet_sender, NativeFilestore::default(), basic_remote_cfg_table(LOCAL_ID, 1024, true), TestCheckTimerCreator::new(check_timer_expired), ) } fn create_pdu_header(seq_num: impl Into) -> PduHeader { let mut pdu_conf = CommonPduConfig::new_with_byte_fields(LOCAL_ID, REMOTE_ID, seq_num).unwrap(); pdu_conf.trans_mode = TransmissionMode::Unacknowledged; PduHeader::new_no_file_data(pdu_conf, 0) } fn create_metadata_pdu<'filename>( pdu_header: &PduHeader, src_name: &'filename Path, dest_name: &'filename Path, file_size: u64, closure_requested: bool, ) -> MetadataPduCreator<'filename, 'filename, 'static> { let checksum_type = if file_size == 0 { ChecksumType::NullChecksum } else { ChecksumType::Crc32 }; let metadata_params = MetadataGenericParams::new(closure_requested, checksum_type, file_size); MetadataPduCreator::new_no_opts( *pdu_header, metadata_params, Lv::new_from_str(src_name.as_os_str().to_str().unwrap()).unwrap(), Lv::new_from_str(dest_name.as_os_str().to_str().unwrap()).unwrap(), ) } fn create_packet_info<'a>( pdu: &'a impl WritablePduPacket, buf: &'a mut [u8], ) -> PduRawWithInfo<'a> { let written_len = pdu .write_to_bytes(buf) .expect("writing metadata PDU failed"); PduRawWithInfo::new(&buf[..written_len]).expect("generating packet info failed") } fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader) -> EofPdu { let crc32 = if !file_data.is_empty() { let mut digest = CRC_32.digest(); digest.update(file_data); digest.finalize() } else { 0 }; EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64) } #[test] fn test_basic() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); assert!(dest_handler.transmission_mode().is_none()); assert!(dest_handler .local_cfg .fault_handler .user_hook .borrow() .all_queues_empty()); assert!(dest_handler.pdu_sender.queue_empty()); assert_eq!(dest_handler.state(), State::Idle); assert_eq!(dest_handler.step(), TransactionStep::Idle); } #[test] fn test_cancelling_idle_fsm() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let mut dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); assert!(!dest_handler.cancel_request(&TransactionId::new( UnsignedByteFieldU8::new(0).into(), UnsignedByteFieldU8::new(0).into() ))); } #[test] fn test_empty_file_transfer_not_acked_no_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_small_file_transfer_not_acked() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = tb.test_user_from_cached_paths(file_size); tb.generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, file_data) .expect("file data insertion failed"); tb.generic_eof_no_error(&mut test_user, file_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_segmented_file_transfer_not_acked() { let mut rng = rand::thread_rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = tb.test_user_from_cached_paths(file_size); tb.generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) .expect("file data insertion failed"); tb.generic_file_data_insert( &mut test_user, segment_len as u64, &random_data[segment_len..], ) .expect("file data insertion failed"); tb.generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_check_limit_handling_transfer_success() { let mut rng = rand::thread_rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = tb.test_user_from_cached_paths(file_size); let transaction_id = tb .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) .expect("file data insertion 0 failed"); tb.generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.generic_file_data_insert( &mut test_user, segment_len as u64, &random_data[segment_len..], ) .expect("file data insertion 1 failed"); tb.set_check_timer_expired(); tb.handler .state_machine_no_packet(&mut test_user) .expect("fsm failure"); let mut fault_handler = tb.handler.local_cfg.fault_handler.user_hook.borrow_mut(); assert_eq!(fault_handler.ignored_queue.len(), 1); let cancelled = fault_handler.ignored_queue.pop_front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.2, segment_len as u64); drop(fault_handler); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_check_limit_handling_limit_reached() { let mut rng = rand::thread_rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; let fault_handler = TestFaultHandler::default(); let mut testbench = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = testbench.test_user_from_cached_paths(file_size); let transaction_id = testbench .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); testbench .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) .expect("file data insertion 0 failed"); testbench .generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); testbench.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); testbench.set_check_timer_expired(); testbench .handler .state_machine_no_packet(&mut test_user) .expect("fsm error"); testbench.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); testbench.set_check_timer_expired(); testbench .handler .state_machine_no_packet(&mut test_user) .expect("fsm error"); testbench.state_check(State::Idle, TransactionStep::Idle); let mut fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow_mut(); assert!(fault_hook.notice_of_suspension_queue.is_empty()); let ignored_queue = &mut fault_hook.ignored_queue; assert_eq!(ignored_queue.len(), 1); let cancelled = ignored_queue.pop_front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.2, segment_len as u64); let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue; assert_eq!(cancelled_queue.len(), 1); let cancelled = cancelled_queue.pop_front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); assert_eq!(cancelled.2, segment_len as u64); drop(fault_hook); assert!(testbench.handler.pdu_sender.queue_empty()); // Check that the broken file exists. testbench.check_dest_file = false; assert!(Path::exists(testbench.dest_path())); let read_content = fs::read(testbench.dest_path()).expect("reading back string failed"); assert_eq!(read_content.len(), segment_len); assert_eq!(read_content, &random_data[0..segment_len]); assert!(fs::remove_file(testbench.dest_path().as_path()).is_ok()); } fn check_finished_pdu_success(sent_pdu: &SentPdu) { assert_eq!(sent_pdu.pdu_type, PduType::FileDirective); assert_eq!( sent_pdu.file_directive_type, Some(FileDirectiveType::FinishedPdu) ); let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap(); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!(finished_pdu.condition_code(), ConditionCode::NoError); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); assert!(finished_pdu.fault_location().is_none()); assert_eq!(finished_pdu.fs_responses_raw(), &[]); } #[test] fn test_file_transfer_with_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); let mut test_user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let sent_packets = tb .generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); assert_eq!(sent_packets, 1); assert!(tb.all_fault_queues_empty()); // The Finished PDU was sent, so the state machine is done. tb.state_check(State::Idle, TransactionStep::Idle); assert!(!tb.handler.pdu_sender.queue_empty()); let sent_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); check_finished_pdu_success(&sent_pdu); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_finished_pdu_insertion_rejected() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); tb.check_dest_file = false; let mut user = tb.test_user_from_cached_paths(0); let finished_pdu = FinishedPduCreator::new_default( PduHeader::new_no_file_data(CommonPduConfig::default(), 0), DeliveryCode::Complete, FileStatus::Retained, ); let finished_pdu_raw = finished_pdu.to_vec().unwrap(); let packet_info = PduRawWithInfo::new(&finished_pdu_raw).unwrap(); let error = tb.handler.state_machine(&mut user, Some(&packet_info)); assert!(error.is_err()); let error = error.unwrap_err(); if let DestError::CantProcessPacketType { pdu_type, directive_type, } = error { assert_eq!(pdu_type, PduType::FileDirective); assert_eq!(directive_type, Some(FileDirectiveType::FinishedPdu)); } } #[test] fn test_metadata_insertion_twice_fails() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); let mut user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.check_handler_idle_at_drop = false; tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let metadata_pdu = create_metadata_pdu( &tb.pdu_header, tb.src_path.as_path(), tb.dest_path.as_path(), 0, tb.closure_requested, ); let packet_info = create_packet_info(&metadata_pdu, &mut tb.buf); let error = tb.handler.state_machine(&mut user, Some(&packet_info)); assert!(error.is_err()); let error = error.unwrap_err(); if let DestError::RecvdMetadataButIsBusy = error { } else { panic!("unexpected error: {:?}", error); } } #[test] fn test_checksum_failure_not_acked() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); let mut user = tb.test_user_from_cached_paths(file_size); tb.generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); let faulty_file_data = b"Hemlo World!"; assert_eq!( tb.generic_file_data_insert(&mut user, 0, faulty_file_data) .expect("file data insertion failed"), 0 ); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let sent_packets = tb .generic_eof_no_error(&mut user, file_data.into()) .expect("EOF no error insertion failed"); // FSM enters check limit algorithm here, so no finished PDU is created. assert_eq!(sent_packets, 0); let transaction_id = tb.handler.transaction_id().unwrap(); let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); assert!(fault_hook.notice_of_suspension_queue.is_empty()); // The file checksum failure is ignored by default and check limit handling is now // performed. let ignored_queue = &mut fault_hook.ignored_queue; assert_eq!(ignored_queue.len(), 1); let cancelled = ignored_queue.pop_front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.2, file_size); drop(fault_hook); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); tb.handler .state_machine_no_packet(&mut user) .expect("fsm error"); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); tb.handler .state_machine_no_packet(&mut user) .expect("fsm error"); tb.state_check(State::Idle, TransactionStep::Idle); // Transaction is cancelled because the check limit is reached. let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue; assert_eq!(cancelled_queue.len(), 1); let cancelled = cancelled_queue.pop_front().unwrap(); assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); assert_eq!(cancelled.2, file_size); drop(fault_hook); let sent_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); assert_eq!(sent_pdu.pdu_type, PduType::FileDirective); assert_eq!( sent_pdu.file_directive_type, Some(FileDirectiveType::FinishedPdu) ); let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap(); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!( finished_pdu.condition_code(), ConditionCode::CheckLimitReached ); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert!(finished_pdu.fault_location().is_some()); assert_eq!( *finished_pdu.fault_location().unwrap().entity_id(), REMOTE_ID.into() ); assert_eq!(finished_pdu.fs_responses_raw(), &[]); assert!(tb.handler.pdu_sender.queue_empty()); tb.expected_full_data = faulty_file_data.to_vec(); } #[test] fn test_file_copy_to_directory() { let fault_handler = TestFaultHandler::default(); let src_path = tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(); let dest_path = tempfile::TempDir::new().unwrap(); let mut dest_path_buf = dest_path.into_path(); let mut tb = DestHandlerTestbench::new( fault_handler, false, false, src_path.clone(), dest_path_buf.clone(), ); dest_path_buf.push(src_path.file_name().unwrap()); tb.dest_path = dest_path_buf; let mut test_user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_tranfer_cancellation_empty_file_with_eof_pdu() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let cancel_eof = EofPdu::new( tb.pdu_header, ConditionCode::CancelRequestReceived, 0, 0, Some(EntityIdTlv::new(LOCAL_ID.into())), ); let packets = tb .handler .state_machine( &mut test_user, Some(&PduRawWithInfo::new(&cancel_eof.to_vec().unwrap()).unwrap()), ) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); } fn generic_tranfer_cancellation_partial_file_with_eof_pdu(with_closure: bool) { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = 5; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, with_closure); let mut test_user = tb.test_user_from_cached_paths(file_size); tb.generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &file_data[0..5]) .expect("file data insertion failed"); // Checksum is currently ignored on remote side.. we still supply it, according to the // standard. let mut digest = CRC_32.digest(); digest.update(&file_data[0..5]); let checksum = digest.finalize(); let cancel_eof = EofPdu::new( tb.pdu_header, ConditionCode::CancelRequestReceived, checksum, 5, Some(EntityIdTlv::new(LOCAL_ID.into())), ); let packets = tb .handler .state_machine( &mut test_user, Some(&PduRawWithInfo::new(&cancel_eof.to_vec().unwrap()).unwrap()), ) .expect("state machine call with EOF insertion failed"); if with_closure { assert_eq!(packets, 1); let finished_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); assert_eq!(finished_pdu.pdu_type, PduType::FileDirective); assert_eq!( finished_pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::from_bytes(&finished_pdu.raw_pdu).unwrap(); assert_eq!( finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!( finished_pdu .fault_location() .expect("no fault location set"), EntityIdTlv::new(LOCAL_ID.into()) ); } else { assert_eq!(packets, 0); } tb.expected_file_size = file_size; tb.expected_full_data = file_data[0..file_size as usize].to_vec(); } #[test] fn test_tranfer_cancellation_partial_file_with_eof_pdu_no_closure() { generic_tranfer_cancellation_partial_file_with_eof_pdu(false); } #[test] fn test_tranfer_cancellation_partial_file_with_eof_pdu_with_closure() { generic_tranfer_cancellation_partial_file_with_eof_pdu(true); } #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let mut test_user = tb.test_user_from_cached_paths(0); let transaction_id = tb .generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transaction_id); let packets = tb .handler .state_machine_no_packet(&mut test_user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); } #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api_and_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); let mut test_user = tb.test_user_from_cached_paths(0); let transaction_id = tb .generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transaction_id); let packets = tb .handler .state_machine_no_packet(&mut test_user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 1); let next_pdu = tb.get_next_pdu().unwrap(); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::new(&next_pdu.raw_pdu).expect("finished pdu read failed"); assert_eq!( finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); // Empty file, so still complete. assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); assert_eq!( finished_pdu.fault_location(), Some(EntityIdTlv::new(REMOTE_ID.into())) ); } #[test] fn test_tranfer_cancellation_partial_file_with_cancel_api_and_closure() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = 5; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); let mut test_user = tb.test_user_from_cached_paths(file_size); let transaction_id = tb .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &file_data[0..5]) .expect("file data insertion failed"); tb.handler.cancel_request(&transaction_id); let packets = tb .handler .state_machine_no_packet(&mut test_user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 1); let next_pdu = tb.get_next_pdu().unwrap(); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::new(&next_pdu.raw_pdu).expect("finished pdu read failed"); assert_eq!( finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert_eq!( finished_pdu.fault_location(), Some(EntityIdTlv::new(REMOTE_ID.into())) ); tb.expected_file_size = file_size; tb.expected_full_data = file_data[0..file_size as usize].to_vec(); } // Only incomplete received files will be removed. #[test] fn test_tranfer_cancellation_file_disposition_not_done_for_empty_file() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); let remote_cfg_mut = tb .handler .remote_cfg_table .get_mut(LOCAL_ID.value()) .unwrap(); remote_cfg_mut.disposition_on_cancellation = true; let mut test_user = tb.test_user_from_cached_paths(0); let transaction_id = tb .generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transaction_id); let packets = tb .handler .state_machine_no_packet(&mut test_user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); } #[test] fn test_tranfer_cancellation_file_disposition_not_done_for_incomplete_file() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); tb.check_dest_file = false; let remote_cfg_mut = tb .handler .remote_cfg_table .get_mut(LOCAL_ID.value()) .unwrap(); remote_cfg_mut.disposition_on_cancellation = true; let mut test_user = tb.test_user_from_cached_paths(file_size); let transaction_id = tb .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &file_data[0..5]) .expect("file data insertion failed"); tb.handler.cancel_request(&transaction_id); let packets = tb .handler .state_machine_no_packet(&mut test_user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); // File was disposed. assert!(!Path::exists(tb.dest_path())); } }