diff --git a/src/dest.rs b/src/dest.rs index f4f4f83..6a59701 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -473,20 +473,22 @@ impl< cfdp_user: &mut impl CfdpUser, packet_to_insert: Option<&impl PduProvider>, ) -> Result { + let mut sent_packets = 0; if let Some(packet) = packet_to_insert { - self.insert_packet(cfdp_user, packet)?; + sent_packets += self.insert_packet(cfdp_user, packet)?; } match self.state { State::Idle => { // TODO: In acknowledged mode, add timer handling. - Ok(0) } - State::Busy => self.fsm_busy(cfdp_user), + State::Busy => { + sent_packets += self.fsm_busy(cfdp_user)?; + } State::Suspended => { // There is now way to suspend the handler currently anyway. - Ok(0) } } + Ok(sent_packets) } /// This function models the Cancel.request CFDP primitive and is the recommended way @@ -590,7 +592,7 @@ impl< } PduType::FileData => { let fd_pdu = FileDataPdu::from_bytes(packet_to_insert.raw_pdu())?; - self.handle_file_data(cfdp_user, fd_pdu)?; + sent_packets += self.handle_file_data(cfdp_user, fd_pdu)?; } } Ok(sent_packets) @@ -669,11 +671,26 @@ impl< self.first_packet_handling(metadata_pdu.pdu_header().common_pdu_conf())?; } self.reset_nak_activity_parameters_if_active(); - if let Some(acked_params) = self.transaction_params.acked_params.as_mut() { - if acked_params.metadata_missing { - if let Some(timer) = self.transaction_params.deferred_procedure_timer.as_mut() { - acked_params.nak_activity_counter = 0; - timer.reset(); + if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { + match self.transaction_params.acked_params.as_mut() { + Some(acked_params) => { + if acked_params.metadata_missing { + if let Some(timer) = + self.transaction_params.deferred_procedure_timer.as_mut() + { + acked_params.nak_activity_counter = 0; + timer.reset(); + } + } + } + None => { + self.transaction_params.acked_params = Some(AcknowledgedModeParams { + last_start_offset: 0, + last_end_offset: 0, + metadata_missing: false, + nak_activity_counter: 0, + deferred_procedure_active: false, + }); } } } @@ -770,7 +787,7 @@ impl< if fd_pdu.file_data().len() as u64 > 0 { num_seg_reqs += 1; } - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let mut nak_pdu = NakPduCreatorWithReservedSeqReqsBuf::new( self.pdu_and_cksum_buffer.get_mut(), pdu_header, @@ -798,11 +815,10 @@ impl< buf_mut[current_offset..current_offset + increment] .copy_from_slice(&(self.transaction_params.progress as u32).to_be_bytes()); } - nak_pdu - .set_start_and_end_of_scope(0, self.transaction_params.progress) - .map_err(PduError::from)?; } - let written_size = nak_pdu.finish(); + let written_size = nak_pdu + .finish(0, self.transaction_params.progress) + .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_size], @@ -817,7 +833,7 @@ impl< user: &mut impl CfdpUser, fd_pdu: FileDataPdu, ) -> Result { - let mut packets_sent = 0; + let mut sent_packets = 0; let mut handle_indication = |id: TransactionId, indication_config: &IndicationConfig| { if indication_config.file_segment_recv { user.file_segment_recvd_indication(&FileSegmentRecvdParams { @@ -831,13 +847,13 @@ impl< let step = self.step.get(); if self.state == State::Idle { if step == TransactionStep::Idle { - packets_sent += self.handle_file_data_without_previous_metadata(&fd_pdu)?; + sent_packets += self.handle_file_data_without_previous_metadata(&fd_pdu)?; self.set_step(TransactionStep::WaitingForMetadata); handle_indication( self.transaction_id().unwrap(), &self.local_cfg.indication_cfg, ); - return Ok(packets_sent); + return Ok(sent_packets); } if step != TransactionStep::ReceivingFileDataPdus && step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling @@ -849,9 +865,6 @@ impl< }); } } - if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { - self.lost_segment_handling(&fd_pdu)?; - } handle_indication( self.transaction_id().unwrap(), &self.local_cfg.indication_cfg, @@ -875,17 +888,24 @@ impl< } return Err(e.into()); } - self.transaction_params.progress += fd_pdu.file_data().len() as u64; - Ok(packets_sent) + self.transaction_params.progress = core::cmp::max( + self.transaction_params.progress, + fd_pdu.offset() + fd_pdu.file_data().len() as u64, + ); + if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { + sent_packets += self.lost_segment_handling(&fd_pdu)?; + } + Ok(sent_packets) } - fn lost_segment_handling(&mut self, fd_pdu: &FileDataPdu) -> Result<(), DestError> { + fn lost_segment_handling(&mut self, fd_pdu: &FileDataPdu) -> Result { // Lost segment detection: 4.6.4.3.1 a) and b) are covered by this code. c) is covered //by dedicated code which is run when the EOF PDU is handled. if self.transaction_params.acked_params.is_none() { - return Ok(()); + return Ok(0); } + let mut sent_packets = 0; let acked_params = self.transaction_params.acked_params.as_mut().unwrap(); if fd_pdu.offset() > acked_params.last_end_offset { let lost_segment = (acked_params.last_end_offset, fd_pdu.offset()); @@ -897,7 +917,8 @@ impl< .unwrap() .immediate_nak_mode { - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let pdu_header = + PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); if self.transaction_params.pdu_conf.file_flag == LargeFileFlag::Normal && lost_segment.0 > u32::MAX as u64 || lost_segment.1 > u32::MAX as u64 @@ -929,9 +950,10 @@ impl< FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; + sent_packets += 1; } } - if fd_pdu.offset() > acked_params.last_end_offset { + if fd_pdu.offset() >= acked_params.last_end_offset { acked_params.last_start_offset = fd_pdu.offset(); acked_params.last_end_offset = fd_pdu.offset() + fd_pdu.file_data().len() as u64; } @@ -947,7 +969,7 @@ impl< self.reset_nak_activity_parameters_if_active(); } } - Ok(()) + Ok(sent_packets) } fn handle_eof_pdu( @@ -955,7 +977,7 @@ impl< cfdp_user: &mut impl CfdpUser, eof_pdu: EofPdu, ) -> Result { - let mut sent_packets = 0; + let sent_packets = 0; if self.local_cfg.indication_cfg.eof_recv { // Unwrap is okay here, application logic ensures that transaction ID is valid here. cfdp_user @@ -972,14 +994,34 @@ impl< self.handle_eof_without_previous_metadata(&eof_pdu)?; return Ok(sent_packets); } - let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError { - self.handle_eof_no_error(&eof_pdu)? + if eof_pdu.condition_code() == ConditionCode::NoError { + if !self.handle_eof_no_error(&eof_pdu)? { + return Ok(sent_packets); + } } else { self.handle_eof_cancel(&eof_pdu); - true }; - if regular_transfer_finish { - sent_packets += self.file_transfer_complete_transition()?; + let mut sent_packets = 0; + match self.transaction_params.transmission_mode() { + TransmissionMode::Acknowledged => { + self.acknowledge_eof_pdu(&eof_pdu)?; + sent_packets += 1; + if self + .transaction_params + .acked_params + .as_ref() + .unwrap() + .metadata_missing + || !self.lost_segment_tracker.is_empty() + { + self.start_deferred_lost_segment_handling(); + } else { + self.set_step(TransactionStep::TransferCompletion); + } + } + TransmissionMode::Unacknowledged => { + self.set_step(TransactionStep::TransferCompletion); + } } Ok(sent_packets) } @@ -1055,7 +1097,7 @@ impl< } fn acknowledge_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<(), DestError> { - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let ack_pdu = AckPdu::new_for_eof_pdu( pdu_header, eof_pdu.condition_code(), @@ -1087,7 +1129,8 @@ impl< } } - fn deferred_lost_segment_handling(&mut self) -> Result<(), DestError> { + fn deferred_lost_segment_handling(&mut self) -> Result { + let mut sent_packets = 0; assert!( self.transaction_params.acked_params.is_some(), "acknowledged parameters unexpectedly None" @@ -1126,7 +1169,7 @@ impl< .unwrap() .deferred_procedure_active = false; self.set_step(TransactionStep::TransferCompletion); - return Ok(()); + return Ok(sent_packets); } let first_nak_issuance = self.transaction_params.deferred_procedure_timer.is_none(); @@ -1149,7 +1192,7 @@ impl< .unwrap() .has_expired() { - return Ok(()); + return Ok(sent_packets); } if !first_nak_issuance && acked_params.nak_activity_counter + 1 @@ -1169,9 +1212,9 @@ impl< { self.abandon_transaction(); } - return Ok(()); + return Ok(sent_packets); } - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let max_segment_reqs = NakPduCreatorWithReservedSeqReqsBuf::calculate_max_segment_requests( self.transaction_params .remote_cfg @@ -1202,17 +1245,20 @@ impl< self.lost_segment_tracker .write_to_nak_segment_list(&mut nak_pdu_creator, acked_params.metadata_missing) .expect("unexpected lost segment write error"); - let written_len = nak_pdu_creator.finish(); + let written_len = nak_pdu_creator + .finish(0, self.transaction_params.progress) + .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; + sent_packets += 1; } else { - self.write_multi_packet_nak_sequence(max_segment_reqs, acked_params.metadata_missing)?; + sent_packets += self + .write_multi_packet_nak_sequence(max_segment_reqs, acked_params.metadata_missing)?; } - // TODO. - Ok(()) + Ok(sent_packets) } #[cold] @@ -1220,8 +1266,9 @@ impl< &mut self, max_segments: usize, first_segment_metadata: bool, - ) -> Result<(), DestError> { - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + ) -> Result { + let mut sent_packets = 0; + let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let mut nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new( self.pdu_and_cksum_buffer.get_mut(), pdu_header, @@ -1232,6 +1279,8 @@ impl< let mut segment_index = 0; let mut buf_index = 0; let mut remaining = max_segments; + let mut current_start_of_scope = 0; + let mut current_end_of_scope = 0; let mut seg_buf = nak_pdu_creator.segment_request_buffer_mut(); // First segment re-requests metadata PDU. @@ -1246,14 +1295,17 @@ impl< segment_index = 1; // account for the header gap entry } - for (start, end) in self.lost_segment_tracker.iter() { + for (idx, (start, end)) in self.lost_segment_tracker.iter().enumerate() { // flush full PDU *before* writing the new entry if segment_index == max_segments { - let written_len = nak_pdu_creator.finish(); + let written_len = nak_pdu_creator + .finish(current_start_of_scope, current_end_of_scope) + .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[..written_len], )?; + sent_packets += 1; remaining = remaining.saturating_sub(max_segments); // new PDU with the same capacity @@ -1265,6 +1317,7 @@ impl< .unwrap(); seg_buf = nak_pdu_creator.segment_request_buffer_mut(); segment_index = 0; + current_start_of_scope = current_end_of_scope; buf_index = 0; } @@ -1280,18 +1333,26 @@ impl< seg_buf[buf_index..buf_index + 4].copy_from_slice(&(end as u32).to_be_bytes()); buf_index += 4; } + if idx == max_segments { + // Clamp the end of scope to the progress for the last NAK in the sequence. + current_end_of_scope = self.transaction_params.progress; + } else { + current_end_of_scope = end; + } segment_index += 1; } // send trailing PDU if anything was written if segment_index > 0 { - let written_len = nak_pdu_creator.finish(); + let written_len = nak_pdu_creator + .finish(current_start_of_scope, current_end_of_scope) + .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[..written_len], )?; } - Ok(()) + Ok(sent_packets) } fn trigger_notice_of_completion_cancelled( @@ -1340,10 +1401,8 @@ impl< // CFDP 4.6.4.3.1: The end offset of the last received file segment and the file // size as stated in the EOF PDU is not the same, so we need to add that segment to // the lost segments for the deferred lost segment detection procedure. - // TODO: Proper lost segment handling. - // self._params.acked_params.lost_seg_tracker.add_lost_segment( - // (self._params.fp.progress, self._params.fp.file_size_eof) - // ) + self.lost_segment_tracker + .add_lost_segment((self.transaction_params.progress, eof_pdu.file_size()))?; } self.transaction_params.checksum = eof_pdu.file_checksum(); @@ -1367,28 +1426,6 @@ impl< Ok(true) } - fn file_transfer_complete_transition(&mut self) -> Result { - let mut sent_packets = 0; - if self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged { - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); - let ack_pdu = AckPdu::new( - pdu_header, - FileDirectiveType::EofPdu, - self.transaction_params.finished_params.condition_code.get(), - TransactionStatus::Active, - ) - .unwrap(); - let written_len = ack_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?; - self.pdu_sender.send_file_directive_pdu( - FileDirectiveType::AckPdu, - &self.pdu_and_cksum_buffer.borrow()[0..written_len], - )?; - sent_packets += 1; - } - self.set_step(TransactionStep::TransferCompletion); - Ok(sent_packets) - } - fn checksum_verify(&mut self, verify_len: u64, checksum: u32) -> bool { if self.transaction_params.metadata_params().checksum_type == ChecksumType::NullChecksum || self.transaction_params.metadata_only @@ -1520,7 +1557,7 @@ impl< { if let Some(ack_params) = &mut self.transaction_params.acked_params { if ack_params.deferred_procedure_active { - self.deferred_lost_segment_handling()?; + sent_packets += self.deferred_lost_segment_handling()?; } } } @@ -1765,17 +1802,17 @@ impl< fn send_finished_pdu(&mut self) -> Result { let tstate = &self.transaction_params; - let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let finished_pdu = if tstate.finished_params.condition_code.get() == ConditionCode::NoError || tstate.finished_params.condition_code.get() == ConditionCode::UnsupportedChecksumType { - FinishedPduCreator::new_default( + FinishedPduCreator::new_no_error( pdu_header, tstate.finished_params.delivery_code.get(), tstate.finished_params.file_status, ) } else { - FinishedPduCreator::new_generic( + FinishedPduCreator::new( pdu_header, tstate.finished_params.condition_code.get(), tstate.finished_params.delivery_code.get(), @@ -1809,7 +1846,10 @@ mod tests { cfdp::{ ChecksumType, TransmissionMode, lv::Lv, - pdu::{WritablePduPacket, finished::FinishedPduReader, metadata::MetadataPduCreator}, + pdu::{ + WritablePduPacket, finished::FinishedPduReader, metadata::MetadataPduCreator, + nak::NakPduReader, + }, }, util::UnsignedByteFieldU8, }; @@ -1857,14 +1897,27 @@ mod tests { } impl DestHandlerTestbench { - fn new_with_fixed_paths(fault_handler: TestFaultHandler, closure_requested: bool) -> Self { + fn new_with_fixed_paths( + fault_handler: TestFaultHandler, + + transmission_mode: TransmissionMode, + closure_requested: bool, + ) -> Self { let (src_path, dest_path) = init_full_filepaths_textfile(); assert!(!Path::exists(&dest_path)); - Self::new(fault_handler, closure_requested, true, src_path, dest_path) + Self::new( + fault_handler, + transmission_mode, + closure_requested, + true, + src_path, + dest_path, + ) } fn new( fault_handler: TestFaultHandler, + transmission_mode: TransmissionMode, closure_requested: bool, check_dest_file: bool, src_path: PathBuf, @@ -1873,7 +1926,7 @@ mod tests { let expiry_control = TimerExpiryControl::default(); let test_sender = TestCfdpSender::default(); let dest_handler = default_dest_handler(fault_handler, test_sender, &expiry_control); - let handler = Self { + let mut handler = Self { expiry_control, handler: dest_handler, src_path, @@ -1891,6 +1944,7 @@ mod tests { expected_full_data: Vec::new(), buf: [0; 512], }; + handler.pdu_conf.trans_mode = transmission_mode; handler.state_check(State::Idle, TransactionStep::Idle); handler } @@ -1912,6 +1966,13 @@ mod tests { &mut self.handler.local_cfg.indication_cfg } + fn remote_cfg_mut(&mut self) -> &mut RemoteEntityConfig { + self.handler + .remote_cfg_table + .get_mut(LOCAL_ID.value()) + .unwrap() + } + fn pdu_queue_empty(&mut self) -> bool { self.handler.pdu_sender.queue_empty() } @@ -1947,13 +2008,11 @@ mod tests { &mut self, user: &mut TestCfdpUser, file_size: u64, - transmission_mode: TransmissionMode, ) -> Result { self.expected_file_size = file_size; - self.pdu_conf.trans_mode = transmission_mode; assert_eq!(user.transaction_indication_call_count, 0); assert_eq!(user.metadata_recv_queue.len(), 0); - let pdu_header = PduHeader::new_no_file_data(self.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(self.pdu_conf, 0); let metadata_pdu = create_metadata_pdu( &pdu_header, self.src_path.as_path(), @@ -1964,7 +2023,10 @@ mod tests { let packet_info = create_packet_info(&metadata_pdu, &mut self.buf); self.handler.state_machine(user, Some(&packet_info))?; assert_eq!(user.metadata_recv_queue.len(), 1); - assert_eq!(self.handler.transmission_mode().unwrap(), transmission_mode); + assert_eq!( + self.handler.transmission_mode().unwrap(), + self.pdu_conf.trans_mode + ); assert_eq!(user.transaction_indication_call_count, 0); assert_eq!(user.metadata_recv_queue.len(), 1); let metadata_recvd = user.metadata_recv_queue.pop_front().unwrap(); @@ -2016,7 +2078,7 @@ mod tests { ) -> Result { self.expected_full_data = expected_full_data; assert_eq!(user.finished_indic_queue.len(), 0); - let pdu_header = PduHeader::new_no_file_data(self.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(self.pdu_conf, 0); let eof_pdu = create_no_error_eof(&self.expected_full_data, &pdu_header); let packet_info = create_packet_info(&eof_pdu, &mut self.buf); self.check_handler_idle_at_drop = true; @@ -2226,9 +2288,13 @@ mod tests { #[test] fn test_empty_file_transfer_not_acked_no_closure() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut test_user = tb.test_user_from_cached_paths(0); - tb.generic_transfer_init(&mut test_user, 0, TransmissionMode::Unacknowledged) + tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut test_user, Vec::new()) @@ -2239,10 +2305,14 @@ mod tests { #[test] fn test_empty_file_transfer_acked() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb - .generic_transfer_init(&mut user, 0, TransmissionMode::Acknowledged) + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut user, Vec::new()) @@ -2261,16 +2331,26 @@ mod tests { let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut user = tb.test_user_from_cached_paths(file_size); let _transfer_info = tb - .generic_transfer_init(&mut user, file_size, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.generic_file_data_insert(&mut user, 0, file_data) - .expect("file data insertion failed"); - tb.generic_eof_no_error(&mut user, file_data.to_vec()) - .expect("EOF no error insertion failed"); + assert_eq!( + tb.generic_file_data_insert(&mut user, 0, file_data) + .expect("file data insertion failed"), + 0 + ); + assert_eq!( + tb.generic_eof_no_error(&mut user, file_data.to_vec()) + .expect("EOF no error insertion failed"), + 0 + ); tb.check_completion_indication_success(&mut user); } @@ -2281,16 +2361,26 @@ mod tests { let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb - .generic_transfer_init(&mut user, file_size, TransmissionMode::Acknowledged) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.generic_file_data_insert(&mut user, 0, file_data) - .expect("file data insertion failed"); - tb.generic_eof_no_error(&mut user, file_data.to_vec()) - .expect("EOF no error insertion failed"); + assert_eq!( + tb.generic_file_data_insert(&mut user, 0, file_data) + .expect("file data insertion failed"), + 0 + ); + assert_eq!( + tb.generic_eof_no_error(&mut user, file_data.to_vec()) + .expect("EOF no error insertion failed"), + 2 + ); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); @@ -2307,9 +2397,13 @@ mod tests { let segment_len = 256; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut test_user = tb.test_user_from_cached_paths(file_size); - tb.generic_transfer_init(&mut test_user, file_size, TransmissionMode::Unacknowledged) + tb.generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) @@ -2334,10 +2428,14 @@ mod tests { let segment_len = 256; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb - .generic_transfer_init(&mut user, file_size, TransmissionMode::Acknowledged) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut user, 0, &random_data[0..segment_len]) @@ -2362,10 +2460,14 @@ mod tests { let segment_len = 256; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut test_user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb - .generic_transfer_init(&mut test_user, file_size, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); @@ -2423,10 +2525,14 @@ mod tests { }; let fault_handler = TestFaultHandler::default(); - let mut testbench = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut testbench = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut test_user = testbench.test_user_from_cached_paths(file_size); let transfer_info = testbench - .generic_transfer_init(&mut test_user, file_size, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); @@ -2512,9 +2618,13 @@ mod tests { #[test] fn test_file_transfer_with_closure() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + true, + ); let mut test_user = tb.test_user_from_cached_paths(0); - tb.generic_transfer_init(&mut test_user, 0, TransmissionMode::Unacknowledged) + tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let sent_packets = tb @@ -2534,11 +2644,15 @@ mod tests { #[test] fn test_finished_pdu_insertion_rejected() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); tb.check_dest_file = false; let mut user = tb.test_user_from_cached_paths(0); - let finished_pdu = FinishedPduCreator::new_default( - PduHeader::new_no_file_data(CommonPduConfig::default(), 0), + let finished_pdu = FinishedPduCreator::new_no_error( + PduHeader::new_for_file_directive(CommonPduConfig::default(), 0), DeliveryCode::Complete, FileStatus::Retained, ); @@ -2560,13 +2674,17 @@ mod tests { #[test] fn test_metadata_insertion_twice_fails() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + true, + ); let mut user = tb.test_user_from_cached_paths(0); - tb.generic_transfer_init(&mut user, 0, TransmissionMode::Unacknowledged) + tb.generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.check_handler_idle_at_drop = false; tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - let pdu_header = PduHeader::new_no_file_data(tb.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(tb.pdu_conf, 0); let metadata_pdu = create_metadata_pdu( &pdu_header, tb.src_path.as_path(), @@ -2590,9 +2708,13 @@ mod tests { let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + true, + ); let mut user = tb.test_user_from_cached_paths(file_size); - tb.generic_transfer_init(&mut user, file_size, TransmissionMode::Unacknowledged) + tb.generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); let faulty_file_data = b"Hemlo World!"; assert_eq!( @@ -2699,6 +2821,7 @@ mod tests { let mut dest_path_buf = dest_path.keep(); let mut tb = DestHandlerTestbench::new( fault_handler, + TransmissionMode::Unacknowledged, false, false, src_path.clone(), @@ -2707,7 +2830,7 @@ mod tests { dest_path_buf.push(src_path.file_name().unwrap()); tb.dest_path = dest_path_buf; let mut user = tb.test_user_from_cached_paths(0); - tb.generic_transfer_init(&mut user, 0, TransmissionMode::Unacknowledged) + tb.generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut user, Vec::new()) @@ -2718,13 +2841,17 @@ mod tests { #[test] fn test_tranfer_cancellation_empty_file_with_eof_pdu() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb - .generic_transfer_init(&mut user, 0, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - let pdu_header = PduHeader::new_no_file_data(tb.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(tb.pdu_conf, 0); let cancel_eof = EofPdu::new( pdu_header, ConditionCode::CancelRequestReceived, @@ -2756,10 +2883,14 @@ mod tests { let file_size = 5; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, with_closure); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + with_closure, + ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb - .generic_transfer_init(&mut user, file_size, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); if insert_packet { @@ -2769,7 +2900,7 @@ mod tests { let mut digest = CRC_32.digest(); digest.update(&file_data[0..5]); let checksum = digest.finalize(); - let pdu_header = PduHeader::new_no_file_data(tb.pdu_conf, 0); + let pdu_header = PduHeader::new_for_file_directive(tb.pdu_conf, 0); let cancel_eof = EofPdu::new( pdu_header, ConditionCode::CancelRequestReceived, @@ -2858,10 +2989,14 @@ mod tests { #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb - .generic_transfer_init(&mut user, 0, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transfer_info.id); @@ -2880,10 +3015,14 @@ mod tests { #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api_and_closure() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + true, + ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb - .generic_transfer_init(&mut user, 0, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transfer_info.id); @@ -2925,10 +3064,14 @@ mod tests { let file_size = 5; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + true, + ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb - .generic_transfer_init(&mut user, file_size, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) @@ -2971,7 +3114,11 @@ mod tests { #[test] fn test_tranfer_cancellation_file_disposition_not_done_for_empty_file() { let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); let remote_cfg_mut = tb .handler .remote_cfg_table @@ -2980,7 +3127,7 @@ mod tests { remote_cfg_mut.disposition_on_cancellation = true; let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb - .generic_transfer_init(&mut user, 0, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); @@ -3004,7 +3151,11 @@ mod tests { let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); - let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Unacknowledged, + false, + ); tb.check_dest_file = false; let remote_cfg_mut = tb .handler @@ -3014,7 +3165,7 @@ mod tests { remote_cfg_mut.disposition_on_cancellation = true; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb - .generic_transfer_init(&mut user, file_size, TransmissionMode::Unacknowledged) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) @@ -3036,4 +3187,130 @@ mod tests { FileStatus::DiscardDeliberately, ); } + + #[test] + fn test_immediate_nak_request() { + let file_data_str = "Hello World!"; + let file_data = file_data_str.as_bytes(); + let file_size = file_data.len() as u64; + let fault_handler = TestFaultHandler::default(); + + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); + tb.remote_cfg_mut().immediate_nak_mode = true; + let mut user = tb.test_user_from_cached_paths(file_size); + let transfer_info = tb + .generic_transfer_init(&mut user, file_size) + .expect("transfer init failed"); + tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); + + // This should immediately trigger a NAK request for file segment 0..4 + assert_eq!( + tb.generic_file_data_insert(&mut user, 4, &file_data[4..]) + .expect("file data insertion failed"), + 1 + ); + assert_eq!(tb.pdu_queue_len(), 1); + let pdu = tb.get_next_pdu().unwrap(); + assert_eq!(pdu.pdu_type, PduType::FileDirective); + assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); + let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); + assert_eq!(nak_pdu.start_of_scope(), 0); + assert_eq!(nak_pdu.end_of_scope(), file_size); + let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); + assert_eq!(seg_reqs.len(), 1); + assert_eq!(seg_reqs[0], (0, 4)); + // We simulate the reply by re-inserting the missing file segment. + tb.generic_file_data_insert(&mut user, 0, &file_data[0..4]) + .expect("file data insertion failed"); + + tb.generic_eof_no_error(&mut user, file_data.to_vec()) + .expect("EOF no error insertion failed"); + tb.check_completion_indication_success(&mut user); + assert_eq!(tb.pdu_queue_len(), 2); + tb.check_eof_ack_pdu(ConditionCode::NoError); + tb.check_finished_pdu_success(); + tb.acknowledge_finished_pdu(&mut user, &transfer_info); + } + + #[test] + fn test_deferred_nak_request() { + let file_data_str = "Hello World!"; + let file_data = file_data_str.as_bytes(); + let file_size = file_data.len() as u64; + let fault_handler = TestFaultHandler::default(); + + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); + // Disable this, we only want to check the deferred procedure. + tb.remote_cfg_mut().immediate_nak_mode = false; + let mut user = tb.test_user_from_cached_paths(file_size); + let transfer_info = tb + .generic_transfer_init(&mut user, file_size) + .expect("transfer init failed"); + tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); + + assert_eq!( + tb.generic_file_data_insert(&mut user, 4, &file_data[4..]) + .expect("file data insertion failed"), + 0 + ); + assert_eq!( + tb.generic_eof_no_error(&mut user, file_data.to_vec()) + .expect("EOF no error insertion failed"), + 2 + ); + assert_eq!(tb.pdu_queue_len(), 2); + tb.check_eof_ack_pdu(ConditionCode::NoError); + assert_eq!(tb.pdu_queue_len(), 1); + let pdu = tb.get_next_pdu().unwrap(); + assert_eq!(pdu.pdu_type, PduType::FileDirective); + assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); + let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); + assert_eq!(nak_pdu.start_of_scope(), 0); + assert_eq!(nak_pdu.end_of_scope(), file_size); + let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); + assert_eq!(seg_reqs.len(), 1); + assert_eq!(seg_reqs[0], (0, 4)); + + // We simulate the reply by re-inserting the missing file segment. + tb.generic_file_data_insert(&mut user, 0, &file_data[0..4]) + .expect("file data insertion failed"); + tb.check_completion_indication_success(&mut user); + assert_eq!(tb.pdu_queue_len(), 1); + tb.check_finished_pdu_success(); + tb.acknowledge_finished_pdu(&mut user, &transfer_info); + } + + #[test] + fn test_file_data_before_metadata() { + let file_data_str = "Hello World!"; + let file_data = file_data_str.as_bytes(); + let file_size = file_data.len() as u64; + let fault_handler = TestFaultHandler::default(); + + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); + tb.remote_cfg_mut().immediate_nak_mode = true; + let mut user = tb.test_user_from_cached_paths(file_size); + assert_eq!( + tb.generic_file_data_insert(&mut user, 0, file_data) + .expect("file data insertion failed"), + 0 + ); + tb.state_check(State::Busy, TransactionStep::WaitingForMetadata); + // TODO: Metadata should now be re-requested. + let _transfer_info = tb + .generic_transfer_init(&mut user, file_size) + .expect("transfer init failed"); + } } diff --git a/src/lib.rs b/src/lib.rs index 35b4894..92e1e5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1112,6 +1112,7 @@ pub(crate) mod tests { pub(crate) struct TimerExpiryControl { pub(crate) check_limit: Arc, pub(crate) positive_ack: Arc, + pub(crate) nak_activity: Arc, } impl TimerExpiryControl { @@ -1149,7 +1150,10 @@ pub(crate) mod tests { .expiry_control .positive_ack .load(core::sync::atomic::Ordering::Acquire), - TimerContext::NakActivity { expiry_time: _ } => todo!(), + TimerContext::NakActivity { expiry_time: _ } => self + .expiry_control + .nak_activity + .load(core::sync::atomic::Ordering::Acquire), } } fn reset(&mut self) { @@ -1162,7 +1166,10 @@ pub(crate) mod tests { .expiry_control .check_limit .store(false, core::sync::atomic::Ordering::Release), - TimerContext::NakActivity { expiry_time: _ } => todo!(), + TimerContext::NakActivity { expiry_time: _ } => self + .expiry_control + .nak_activity + .store(false, core::sync::atomic::Ordering::Release), TimerContext::PositiveAck { expiry_time: _ } => self .expiry_control .positive_ack @@ -1205,8 +1212,8 @@ pub(crate) mod tests { TimerContext::PositiveAck { expiry_time: _ } => { TestCheckTimer::new(timer_context, &self.expiry_control) } - _ => { - panic!("invalid check timer creator, can only be used for check limit handling") + TimerContext::NakActivity { expiry_time: _ } => { + TestCheckTimer::new(timer_context, &self.expiry_control) } } } @@ -1527,7 +1534,7 @@ pub(crate) mod tests { fn generic_pdu_header() -> PduHeader { let pdu_conf = CommonPduConfig::default(); - PduHeader::new_no_file_data(pdu_conf, 0) + PduHeader::new_for_file_directive(pdu_conf, 0) } #[test] diff --git a/src/source.rs b/src/source.rs index 6602929..4a9f24e 100644 --- a/src/source.rs +++ b/src/source.rs @@ -831,7 +831,7 @@ impl< transaction_status: TransactionStatus, ) -> Result<(), SourceError> { let ack_pdu = AckPdu::new( - PduHeader::new_no_file_data(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.pdu_conf, 0), FileDirectiveType::FinishedPdu, condition_code, transaction_status, @@ -850,7 +850,7 @@ impl< ); if self.file_params.metadata_only { let metadata_pdu = MetadataPduCreator::new( - PduHeader::new_no_file_data(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.pdu_conf, 0), metadata_params, Lv::new_empty(), Lv::new_empty(), @@ -859,7 +859,7 @@ impl< return self.pdu_send_helper(&metadata_pdu); } let metadata_pdu = MetadataPduCreator::new( - PduHeader::new_no_file_data(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.pdu_conf, 0), metadata_params, Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(), Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(), @@ -918,7 +918,7 @@ impl< fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 { let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( - &PduHeader::new_no_file_data(self.pdu_conf, 0), + &PduHeader::new_for_file_directive(self.pdu_conf, 0), remote_cfg.max_packet_len, None, ); @@ -983,7 +983,7 @@ impl< checksum: u32, ) -> Result<(), SourceError> { let eof_pdu = EofPdu::new( - PduHeader::new_no_file_data(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.pdu_conf, 0), self.tstate_ref() .cond_code_eof .get() @@ -1643,7 +1643,7 @@ mod tests { // Finish handling: Simulate completion from the destination side by insert finished PDU. fn finish_handling(&mut self, user: &mut TestCfdpUser, transfer_info: &TransferInfo) { - let finished_pdu = FinishedPduCreator::new_default( + let finished_pdu = FinishedPduCreator::new_no_error( transfer_info.pdu_header, DeliveryCode::Complete, FileStatus::Retained,