diff --git a/src/dest.rs b/src/dest.rs index c591e20..3648ddc 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -57,7 +57,7 @@ use spacepackets::{ file_data::FileDataPdu, finished::{DeliveryCode, FileStatus, FinishedPduCreator}, metadata::{MetadataGenericParams, MetadataPduReader}, - nak::NakPduCreatorWithReservedSeqReqsBuf, + nak::{NakPduCreator, NakPduCreatorWithReservedSeqReqsBuf}, }, tlv::{EntityIdTlv, GenericTlv, ReadableTlv, TlvType, msg_to_user::MsgToUserTlv}, }, @@ -295,6 +295,8 @@ pub enum DestError { Io(#[from] std::io::Error), #[error("file store error: {0}")] Filestore(#[from] FilestoreError), + #[error("unexpected large file size which is not supported for current transfer")] + UnexpectedLargeFileSize, #[error("lost segment error: {0}")] LostSegmentError(#[from] LostSegmentError), #[error("path conversion error {0}")] @@ -660,14 +662,21 @@ impl< metadata_pdu: MetadataPduReader, first_packet: bool, ) -> Result<(), DestError> { - // TODO: If this arrives while metadata is being awaited (state is waiting on metadata), - // reset NAK activity params. if self.state != State::Idle { return Err(DestError::RecvdMetadataButIsBusy); } if first_packet { self.first_packet_handling(metadata_pdu.pdu_header().common_pdu_conf())?; } + self.reset_nak_activity_parameters_if_active(); + if let Some(acked_params) = self.transaction_params.acked_params.as_mut() { + if acked_params.metadata_missing { + if let Some(timer) = self.transaction_params.deferred_procedure_timer.as_mut() { + acked_params.nak_activity_counter = 0; + timer.reset(); + } + } + } self.transaction_params.metadata_params = *metadata_pdu.metadata_params(); let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value()); @@ -714,6 +723,17 @@ impl< Ok(()) } + fn reset_nak_activity_parameters_if_active(&mut self) { + if let Some(acked_params) = self.transaction_params.acked_params.as_mut() { + if acked_params.metadata_missing { + if let Some(timer) = self.transaction_params.deferred_procedure_timer.as_mut() { + acked_params.nak_activity_counter = 0; + timer.reset(); + } + } + } + } + fn handle_file_data_without_previous_metadata( &mut self, fd_pdu: &FileDataPdu, @@ -797,7 +817,6 @@ impl< user: &mut impl CfdpUser, fd_pdu: FileDataPdu, ) -> Result { - // TODO: Lost segment handling. let mut packets_sent = 0; let mut handle_indication = |id: TransactionId, indication_config: &IndicationConfig| { if indication_config.file_segment_recv { @@ -830,6 +849,9 @@ impl< }); } } + if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { + self.lost_segment_handling(&fd_pdu)?; + } handle_indication( self.transaction_id().unwrap(), &self.local_cfg.indication_cfg, @@ -857,6 +879,77 @@ impl< Ok(packets_sent) } + fn lost_segment_handling(&mut self, fd_pdu: &FileDataPdu) -> Result<(), DestError> { + // Lost segment detection: 4.6.4.3.1 a) and b) are covered by this code. c) is covered + //by dedicated code which is run when the EOF PDU is handled. + if self.transaction_params.acked_params.is_none() { + return Ok(()); + } + + let acked_params = self.transaction_params.acked_params.as_mut().unwrap(); + if fd_pdu.offset() > acked_params.last_end_offset { + let lost_segment = (acked_params.last_end_offset, fd_pdu.offset()); + self.lost_segment_tracker.add_lost_segment(lost_segment)?; + if self + .transaction_params + .remote_cfg + .as_ref() + .unwrap() + .immediate_nak_mode + { + let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + if self.transaction_params.pdu_conf.file_flag == LargeFileFlag::Normal + && lost_segment.0 > u32::MAX as u64 + || lost_segment.1 > u32::MAX as u64 + { + return Err(DestError::UnexpectedLargeFileSize); + } + let seg32 = [(lost_segment.0 as u32, lost_segment.1 as u32)]; + let seg64 = [lost_segment]; + let nak_pdu = if self.transaction_params.pdu_conf.file_flag == LargeFileFlag::Normal + { + NakPduCreator::new_normal_file_size( + pdu_header, + 0, + self.transaction_params.progress as u32, + &seg32, + ) + .unwrap() + } else { + NakPduCreator::new_large_file_size( + pdu_header, + 0, + self.transaction_params.progress, + &seg64, + ) + .unwrap() + }; + let written_len = nak_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?; + self.pdu_sender.send_file_directive_pdu( + FileDirectiveType::NakPdu, + &self.pdu_and_cksum_buffer.borrow()[0..written_len], + )?; + } + } + if fd_pdu.offset() > acked_params.last_end_offset { + acked_params.last_start_offset = fd_pdu.offset(); + acked_params.last_end_offset = fd_pdu.offset() + fd_pdu.file_data().len() as u64; + } + if fd_pdu.offset() + fd_pdu.file_data().len() as u64 <= acked_params.last_start_offset { + // Might be a re-requested FD PDU. + let removed = self.lost_segment_tracker.remove_lost_segment(( + fd_pdu.offset(), + fd_pdu.offset() + fd_pdu.file_data().len() as u64, + ))?; + // Reception of missing segments resets the NAK activity parameters. + // See CFDP 4.6.4.7. + if removed && acked_params.deferred_procedure_active { + self.reset_nak_activity_parameters_if_active(); + } + } + Ok(()) + } + fn handle_eof_pdu( &mut self, cfdp_user: &mut impl CfdpUser, @@ -1547,26 +1640,24 @@ impl< } fn handle_positive_ack_procedures(&mut self) -> Result { - // 1) Do we have positive-ack params? + // Do we have positive-ack params? let params = match self.transaction_params.positive_ack_params.as_mut() { Some(p) => p, None => return Ok(0), }; - // 2) Has the timer expired? + // Has the timer expired? if !params.ack_timer.has_expired() { return Ok(0); } - // 3) Load the remote limit (avoid a panicking unwrap later) let expiration_limit = self .transaction_params .remote_cfg .as_ref() .unwrap() .positive_ack_timer_expiration_limit; - - // 4) If bumping the counter would exceed the limit, fault & maybe abandon + // If bumping the counter would exceed the limit, fault & maybe abandon if params.ack_counter + 1 >= expiration_limit { if self.declare_fault(ConditionCode::PositiveAckLimitReached) == FaultHandlerCode::AbandonTransaction @@ -1576,7 +1667,6 @@ impl< return Ok(0); } - // 5) Otherwise reset the timer, increment, and send the PDU params.ack_timer.reset(); params.ack_counter += 1; self.send_finished_pdu()