diff --git a/src/source.rs b/src/source.rs index 742a254..3eb46a7 100644 --- a/src/source.rs +++ b/src/source.rs @@ -370,15 +370,19 @@ impl< cfdp_user: &mut impl CfdpUser, pdu: Option<&impl PduProvider>, ) -> Result { + let mut sent_packets = 0; if let Some(packet) = pdu { - self.insert_packet(cfdp_user, packet)?; + sent_packets += self.insert_packet(cfdp_user, packet)?; } match self.state() { super::State::Idle => { // TODO: In acknowledged mode, add timer handling. Ok(0) } - super::State::Busy => self.fsm_busy(cfdp_user, pdu), + super::State::Busy => { + sent_packets += self.fsm_busy(cfdp_user, pdu)?; + Ok(sent_packets) + } super::State::Suspended => { // There is now way to suspend the handler currently anyway. Ok(0) @@ -521,7 +525,7 @@ impl< &mut self, _cfdp_user: &mut impl CfdpUser, packet_to_insert: &impl PduProvider, - ) -> Result<(), SourceError> { + ) -> Result { if packet_to_insert.packet_target()? != PacketTarget::SourceEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. @@ -538,6 +542,8 @@ impl< directive_type: None, }); } + let mut sent_packets = 0; + // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is // always a valid value. match packet_to_insert @@ -550,7 +556,7 @@ impl< } FileDirectiveType::NakPdu => { let nak_pdu = NakPduReader::new(packet_to_insert.pdu())?; - self.handle_nak_pdu(&nak_pdu)? + sent_packets += self.handle_nak_pdu(&nak_pdu)?; } FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), FileDirectiveType::AckPdu => { @@ -566,7 +572,7 @@ impl< }); } } - Ok(()) + Ok(sent_packets) } /// This functions models the Cancel.request CFDP primitive and is the recommended way to @@ -704,12 +710,14 @@ impl< Ok(sent_packets) } - fn handle_retransmission(&mut self, nak_pdu: &NakPduReader) -> Result<(), SourceError> { + fn handle_retransmission(&mut self, nak_pdu: &NakPduReader) -> Result { + let mut sent_packets = 0; let segment_req_iter = nak_pdu.get_segment_requests_iterator().unwrap(); for segment_req in segment_req_iter { // Special case: Metadata PDU is re-requested. if segment_req.0 == 0 && segment_req.1 == 0 { self.prepare_and_send_metadata_pdu()?; + sent_packets += 1; continue; } else { if (segment_req.1 < segment_req.0) || (segment_req.0 > self.file_params.progress) { @@ -721,11 +729,12 @@ impl< let chunk_size = core::cmp::min(missing_chunk_len, self.file_params.segment_len); self.prepare_and_send_file_data_pdu(current_offset, chunk_size)?; + sent_packets += 1; missing_chunk_len -= missing_chunk_len; } } } - Ok(()) + Ok(sent_packets) } fn handle_waiting_for_finished_pdu( @@ -1038,9 +1047,8 @@ impl< Ok(()) } - fn handle_nak_pdu(&mut self, nak_pdu: &NakPduReader) -> Result<(), SourceError> { - self.handle_retransmission(nak_pdu)?; - Ok(()) + fn handle_nak_pdu(&mut self, nak_pdu: &NakPduReader) -> Result { + self.handle_retransmission(nak_pdu) } fn handle_ack_pdu(&mut self, ack_pdu: &AckPdu) -> Result<(), SourceError> { @@ -1459,7 +1467,7 @@ mod tests { .expect("put_request call failed"); assert_eq!(self.handler.state(), State::Busy); assert_eq!(self.handler.step(), TransactionStep::Idle); - let id = self.handler.transaction_id().unwrap(); + let transaction_id = self.handler.transaction_id().unwrap(); let sent_packets = self .handler .state_machine_no_packet(cfdp_user) @@ -1468,6 +1476,30 @@ mod tests { assert!(!self.pdu_queue_empty()); let next_pdu = self.get_next_sent_pdu().unwrap(); assert!(!self.pdu_queue_empty()); + let metadata_pdu_reader = self.metadata_check(&next_pdu, file_size); + let closure_requested = if let Some(closure_requested) = put_request.closure_requested { + assert_eq!( + metadata_pdu_reader.metadata_params().closure_requested, + closure_requested + ); + closure_requested + } else { + assert!(metadata_pdu_reader.metadata_params().closure_requested); + metadata_pdu_reader.metadata_params().closure_requested + }; + TransferInfo { + pdu_header: *metadata_pdu_reader.pdu_header(), + closure_requested, + file_size, + id: transaction_id, + } + } + + fn metadata_check<'a>( + &self, + next_pdu: &'a SentPdu, + file_size: u64, + ) -> MetadataPduReader<'a> { assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type, @@ -1475,7 +1507,6 @@ mod tests { ); let metadata_pdu = MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format"); - let pdu_header = metadata_pdu.pdu_header(); self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc); assert_eq!( metadata_pdu @@ -1499,23 +1530,8 @@ mod tests { ChecksumType::Crc32 ); assert_eq!(metadata_pdu.transmission_mode(), self.transmission_mode); - let closure_requested = if let Some(closure_requested) = put_request.closure_requested { - assert_eq!( - metadata_pdu.metadata_params().closure_requested, - closure_requested - ); - closure_requested - } else { - assert!(metadata_pdu.metadata_params().closure_requested); - metadata_pdu.metadata_params().closure_requested - }; assert_eq!(metadata_pdu.options(), &[]); - TransferInfo { - pdu_header: *pdu_header, - closure_requested, - file_size, - id, - } + metadata_pdu } fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) { @@ -2271,4 +2287,79 @@ mod tests { tb.finish_handling(&mut cfdp_user, &transfer_info); tb.common_finished_pdu_ack_check(); } + + #[test] + fn test_nak_for_file_segment() { + let mut cfdp_user = TestCfdpUser::default(); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); + let mut file = OpenOptions::new() + .write(true) + .open(&tb.srcfile) + .expect("opening file failed"); + let mut rand_data = [0u8; 140]; + rand::rng().fill(&mut rand_data[..]); + file.write_all(&rand_data) + .expect("writing file content failed"); + drop(file); + let (transfer_info, fd_pdus) = + tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec()); + assert_eq!(fd_pdus, 2); + tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, &[(0, 90)]); + tb.check_next_file_pdu(0, &rand_data[0..90]); + tb.all_fault_queues_empty(); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_nak_for_metadata() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + EofParams::new_success(file_size, CRC_32.digest().finalize()), + 1, + ); + + // NAK to cause re-transmission of metadata PDU. + let nak_pdu = NakPduCreator::new( + transfer_info.pdu_header, + 0, + transfer_info.file_size as u32, + &[(0, 0)], + ) + .unwrap(); + let nak_pdu_vec = nak_pdu.to_vec().unwrap(); + let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap(); + let sent_packets = tb + .handler + .state_machine(&mut cfdp_user, Some(&packet_info)) + .unwrap(); + assert_eq!(sent_packets, 1); + let next_pdu = tb.get_next_sent_pdu().unwrap(); + // Check the metadata PDU. + tb.metadata_check(&next_pdu, file_size); + tb.all_fault_queues_empty(); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } }