diff --git a/src/source.rs b/src/source.rs index 1ab45e4..742a254 100644 --- a/src/source.rs +++ b/src/source.rs @@ -774,8 +774,8 @@ impl< Ok(()) } - fn start_positive_ack_procedure(&mut self) { - self.set_step(TransactionStep::WaitingForEofAck); + fn start_positive_ack_procedure(&self) { + self.set_step_internal(TransactionStep::WaitingForEofAck); *self.positive_ack_params.borrow_mut() = Some(PositiveAckParams { ack_timer: self .timer_creator @@ -955,7 +955,7 @@ impl< SegmentMetadataFlag::NotPresent, SegmentationControl::NoRecordBoundaryPreservation, ), - self.file_params.progress, + offset, size, ); let mut unwritten_pdu = @@ -1112,7 +1112,7 @@ impl< // We are done. Ok(ControlFlow::Continue(FsmContext::ResetWhenPossible)) } else { - self.set_step_internal(TransactionStep::WaitingForEofAck); + self.start_positive_ack_procedure(); Ok(ControlFlow::Continue(FsmContext::default())) } } @@ -1186,6 +1186,7 @@ mod tests { ChecksumType, CrcFlag, pdu::{ file_data::FileDataPdu, finished::FinishedPduCreator, metadata::MetadataPduReader, + nak::NakPduCreator, }, }, util::UnsignedByteFieldU16, @@ -1241,10 +1242,28 @@ mod tests { #[allow(dead_code)] struct TransferInfo { id: TransactionId, + file_size: u64, closure_requested: bool, pdu_header: PduHeader, } + #[derive(Debug, Clone, Copy)] + struct EofParams { + file_size: u64, + file_checksum: u32, + condition_code: ConditionCode, + } + + impl EofParams { + pub const fn new_success(file_size: u64, file_checksum: u32) -> Self { + Self { + file_size, + file_checksum, + condition_code: ConditionCode::NoError, + } + } + } + impl SourceHandlerTestbench { fn new( transmission_mode: TransmissionMode, @@ -1354,6 +1373,26 @@ mod tests { assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2); } + fn nak_for_file_segments( + &mut self, + cfdp_user: &mut TestCfdpUser, + transfer_info: &TransferInfo, + seg_reqs: &[(u32, u32)], + ) { + let nak_pdu = NakPduCreator::new( + transfer_info.pdu_header, + 0, + transfer_info.file_size as u32, + seg_reqs, + ) + .unwrap(); + let nak_pdu_vec = nak_pdu.to_vec().unwrap(); + let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap(); + self.handler + .state_machine(cfdp_user, Some(&packet_info)) + .unwrap(); + } + fn generic_file_transfer( &mut self, cfdp_user: &mut TestCfdpUser, @@ -1397,8 +1436,11 @@ mod tests { self.common_eof_pdu_check( cfdp_user, transaction_info.closure_requested, - cfdp_user.expected_file_size, - checksum, + EofParams { + file_size: cfdp_user.expected_file_size, + file_checksum: checksum, + condition_code: ConditionCode::NoError, + }, 1, ); (transaction_info, fd_pdus) @@ -1408,7 +1450,7 @@ mod tests { &mut self, cfdp_user: &mut TestCfdpUser, put_request: PutRequestOwned, - filesize: u64, + file_size: u64, ) -> TransferInfo { assert_eq!(cfdp_user.transaction_indication_call_count, 0); assert_eq!(cfdp_user.eof_sent_call_count, 0); @@ -1451,7 +1493,7 @@ mod tests { .unwrap(), self.destfile ); - assert_eq!(metadata_pdu.metadata_params().file_size, filesize); + assert_eq!(metadata_pdu.metadata_params().file_size, file_size); assert_eq!( metadata_pdu.metadata_params().checksum_type, ChecksumType::Crc32 @@ -1471,6 +1513,7 @@ mod tests { TransferInfo { pdu_header: *pdu_header, closure_requested, + file_size, id, } } @@ -1528,8 +1571,7 @@ mod tests { &mut self, cfdp_user: &mut TestCfdpUser, closure_requested: bool, - filesize: u64, - checksum: u32, + eof_params: EofParams, eof_sent_call_count: u32, ) { let next_pdu = self.get_next_sent_pdu().unwrap(); @@ -1540,9 +1582,9 @@ mod tests { ); let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format"); self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc); - assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError); - assert_eq!(eof_pdu.file_size(), filesize); - assert_eq!(eof_pdu.file_checksum(), checksum); + assert_eq!(eof_pdu.condition_code(), eof_params.condition_code); + assert_eq!(eof_pdu.file_size(), eof_params.file_size); + assert_eq!(eof_pdu.file_checksum(), eof_params.file_checksum); assert_eq!( eof_pdu .pdu_header() @@ -1573,7 +1615,7 @@ mod tests { &mut self, cfdp_user: &mut TestCfdpUser, with_closure: bool, - ) -> TransferInfo { + ) -> (&'static str, TransferInfo) { let mut file = OpenOptions::new() .write(true) .open(&self.srcfile) @@ -1588,7 +1630,7 @@ mod tests { content_str.as_bytes().to_vec(), ); assert_eq!(fd_pdus, 1); - transfer_info + (content_str, transfer_info) } // Finish handling: Simulate completion from the destination side by insert finished PDU. @@ -1626,7 +1668,7 @@ mod tests { #[test] fn test_empty_file_transfer_not_acked_no_closure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); - let filesize = 0; + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1635,14 +1677,16 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_file_transfer_init_with_metadata_check(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, ) } @@ -1650,7 +1694,7 @@ mod tests { #[test] fn test_empty_file_transfer_acked() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); - let filesize = 0; + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1659,14 +1703,16 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_file_transfer_init_with_metadata_check(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, ); @@ -1686,7 +1732,7 @@ mod tests { fn test_tiny_file_transfer_acked() { let mut cfdp_user = TestCfdpUser::default(); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); - let transfer_info = tb.common_tiny_file_transfer(&mut cfdp_user, false); + let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, false); tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut cfdp_user, &transfer_info); tb.common_finished_pdu_ack_check(); @@ -1696,7 +1742,7 @@ mod tests { fn test_tiny_file_transfer_not_acked_with_closure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); let mut cfdp_user = TestCfdpUser::default(); - let transfer_info = tb.common_tiny_file_transfer(&mut cfdp_user, true); + let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); tb.finish_handling(&mut cfdp_user, &transfer_info) } @@ -1760,7 +1806,7 @@ mod tests { #[test] fn test_empty_file_transfer_not_acked_with_closure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); - let filesize = 0; + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1769,14 +1815,16 @@ mod tests { Some(true), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_file_transfer_init_with_metadata_check(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, ); tb.finish_handling(&mut cfdp_user, &transaction_info) @@ -1832,7 +1880,7 @@ mod tests { #[test] fn test_finished_pdu_check_timeout() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); - let filesize = 0; + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1841,15 +1889,17 @@ mod tests { Some(true), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_file_transfer_init_with_metadata_check(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); let expected_id = tb.handler.transaction_id().unwrap(); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, ); assert!(tb.pdu_queue_empty()); @@ -1997,7 +2047,12 @@ mod tests { #[test] fn test_positive_ack_procedure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); - let filesize = 0; + let file_size = 0; + let eof_params = EofParams { + file_size, + file_checksum: CRC_32.digest().finalize(), + condition_code: ConditionCode::NoError, + }; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -2006,14 +2061,16 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transfer_info = - tb.common_file_transfer_init_with_metadata_check(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); tb.common_eof_pdu_check( &mut cfdp_user, transfer_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + eof_params, 1, ); @@ -2029,8 +2086,7 @@ mod tests { tb.common_eof_pdu_check( &mut cfdp_user, transfer_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + eof_params, 2, ); @@ -2038,4 +2094,181 @@ mod tests { tb.finish_handling(&mut cfdp_user, &transfer_info); tb.common_finished_pdu_ack_check(); } + + #[test] + fn test_positive_ack_procedure_ack_limit_reached() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize()); + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 1, + ); + + assert!(tb.pdu_queue_empty()); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 2, + ); + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + eof_params.condition_code = ConditionCode::PositiveAckLimitReached; + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 3, + ); + // This boilerplate handling is still expected. In a real-life use-case I would expect + // this to fail as well, leading to a transaction abandonment. This is tested separately. + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_positive_ack_procedure_ack_limit_reached_abandonment() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize()); + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 1, + ); + + assert!(tb.pdu_queue_empty()); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 2, + ); + // Enforce a postive ack timer expiry -> positive ACK limit reached -> Cancel EOF sent. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + eof_params.condition_code = ConditionCode::PositiveAckLimitReached; + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 3, + ); + // Cancellation fault should have been triggered. + let fault_handler = tb.test_fault_handler_mut(); + let fh_ref_mut = fault_handler.get_mut(); + assert!(!fh_ref_mut.cancellation_queue_empty()); + assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1); + let (id, cond_code, progress) = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap(); + assert_eq!(id, transfer_info.id); + assert_eq!(cond_code, ConditionCode::PositiveAckLimitReached); + assert_eq!(progress, file_size); + fh_ref_mut.all_queues_empty(); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF Cancel PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 4, + ); + + // Enforce a postive ack timer expiry -> positive ACK limit reached -> Transaction + // abandoned + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 0); + // Abandonment fault should have been triggered. + let fault_handler = tb.test_fault_handler_mut(); + let fh_ref_mut = fault_handler.get_mut(); + assert!(!fh_ref_mut.abandoned_queue_empty()); + assert_eq!(fh_ref_mut.abandoned_queue.len(), 1); + let (id, cond_code, progress) = fh_ref_mut.abandoned_queue.pop_back().unwrap(); + assert_eq!(id, transfer_info.id); + assert_eq!(cond_code, ConditionCode::PositiveAckLimitReached); + assert_eq!(progress, file_size); + fh_ref_mut.all_queues_empty(); + } + + #[test] + fn test_nak_for_whole_file() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let mut cfdp_user = TestCfdpUser::default(); + let (data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); + let seg_reqs = &[(0, transfer_info.file_size as u32)]; + tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, seg_reqs); + tb.check_next_file_pdu(0, data.as_bytes()); + tb.all_fault_queues_empty(); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } }