diff --git a/src/source.rs b/src/source.rs index 9c9f476..353ff4b 100644 --- a/src/source.rs +++ b/src/source.rs @@ -462,47 +462,22 @@ impl< /// If not unexpected errors occur, this method returns [true] if the transfer was cancelled /// propery and [false] if there is no transaction active or the passed transaction ID and the /// active ID do not match. - pub fn cancel_request(&mut self, transaction_id: &TransactionId) -> Result { + pub fn cancel_request(&mut self, user: &mut impl CfdpUser, transaction_id: &TransactionId) -> Result { if self.state_helper.state == super::State::Idle { return Ok(false); } if let Some(active_id) = self.transaction_id() { if active_id == *transaction_id { - self.declare_fault(ConditionCode::CancelRequestReceived)?; + self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?; return Ok(true); } } Ok(false) } - pub fn transaction_id(&self) -> Option { - self.tstate.as_ref().map(|v| v.transaction_id) - } - - fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 { - let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( - &PduHeader::new_no_file_data(self.pdu_conf, 0), - remote_cfg.max_packet_len, - None, - ); - if remote_cfg.max_file_segment_len.is_some() { - derived_max_seg_len = core::cmp::min( - remote_cfg.max_file_segment_len.unwrap(), - derived_max_seg_len, - ); - } - derived_max_seg_len as u64 - } - - /// Returns the [TransmissionMode] for the active file operation. - #[inline] - pub fn transmission_mode(&self) -> Option { - self.tstate.as_ref().map(|v| v.transmission_mode) - } - fn fsm_busy( &mut self, - cfdp_user: &mut impl CfdpUser, + user: &mut impl CfdpUser, pdu: Option<&impl PduProvider>, ) -> Result { let mut sent_packets = 0; @@ -510,7 +485,7 @@ impl< self.state_helper.step = TransactionStep::TransactionStart; } if self.state_helper.step == TransactionStep::TransactionStart { - self.handle_transaction_start(cfdp_user)?; + self.handle_transaction_start(user)?; self.state_helper.step = TransactionStep::SendingMetadata; } if self.state_helper.step == TransactionStep::SendingMetadata { @@ -526,14 +501,14 @@ impl< } } if self.state_helper.step == TransactionStep::SendingEof { - self.eof_fsm(cfdp_user)?; + self.eof_fsm(user)?; sent_packets += 1; } if self.state_helper.step == TransactionStep::WaitingForFinished { - self.handle_wait_for_finished_pdu(pdu)?; + self.handle_wait_for_finished_pdu(user, pdu)?; } if self.state_helper.step == TransactionStep::NoticeOfCompletion { - self.notice_of_completion(cfdp_user); + self.notice_of_completion(user); self.reset(); } Ok(sent_packets) @@ -541,6 +516,7 @@ impl< fn handle_wait_for_finished_pdu( &mut self, + user: &mut impl CfdpUser, packet: Option<&impl PduProvider>, ) -> Result<(), SourceError> { if let Some(packet) = packet { @@ -562,7 +538,7 @@ impl< } // If we reach this state, countdown is definitely valid instance. if self.countdown.as_ref().unwrap().has_expired() { - self.declare_fault(ConditionCode::CheckLimitReached)?; + self.declare_fault(user, ConditionCode::CheckLimitReached)?; } /* def _handle_wait_for_finish(self): @@ -592,7 +568,7 @@ impl< Ok(()) } - fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> { + fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> { let tstate = self.tstate.as_ref().unwrap(); let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), @@ -600,11 +576,8 @@ impl< self.fparams.file_size, self.pdu_and_cksum_buffer.get_mut(), )?; - self.prepare_and_send_eof_pdu(checksum)?; + self.prepare_and_send_eof_pdu(user, checksum)?; let tstate = self.tstate.as_ref().unwrap(); - if self.local_cfg.indication_cfg.eof_sent { - cfdp_user.eof_sent_indication(&tstate.transaction_id); - } if tstate.transmission_mode == TransmissionMode::Unacknowledged { if tstate.closure_requested { self.countdown = Some(self.timer_creator.create_countdown( @@ -806,6 +779,21 @@ impl< } } + fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 { + let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( + &PduHeader::new_no_file_data(self.pdu_conf, 0), + remote_cfg.max_packet_len, + None, + ); + if remote_cfg.max_file_segment_len.is_some() { + derived_max_seg_len = core::cmp::min( + remote_cfg.max_file_segment_len.unwrap(), + derived_max_seg_len, + ); + } + derived_max_seg_len as u64 + } + fn send_progressing_file_data_pdu(&mut self) -> Result { // Should never be called, but use defensive programming here. if self.fparams.progress >= self.fparams.file_size { @@ -887,7 +875,7 @@ impl< Ok(true) } - fn prepare_and_send_eof_pdu(&mut self, checksum: u32) -> Result<(), SourceError> { + fn prepare_and_send_eof_pdu(&mut self, cfdp_user: &mut impl CfdpUser, checksum: u32) -> Result<(), SourceError> { let tstate = self .tstate .as_ref() @@ -900,6 +888,9 @@ impl< None, ); self.pdu_send_helper(&eof_pdu)?; + if self.local_cfg.indication_cfg.eof_sent { + cfdp_user.eof_sent_indication(&tstate.transaction_id); + } Ok(()) } @@ -953,6 +944,16 @@ impl< fn handle_keep_alive_pdu(&mut self) {} + pub fn transaction_id(&self) -> Option { + self.tstate.as_ref().map(|v| v.transaction_id) + } + + /// Returns the [TransmissionMode] for the active file operation. + #[inline] + pub fn transmission_mode(&self) -> Option { + self.tstate.as_ref().map(|v| v.transmission_mode) + } + /// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when /// applicable. pub fn step(&self) -> TransactionStep { @@ -967,11 +968,18 @@ impl< &self.local_cfg } - fn declare_fault(&mut self, cond: ConditionCode) -> Result<(), SourceError> { + fn declare_fault( + &mut self, + user: &mut impl CfdpUser, + cond: ConditionCode, + ) -> Result<(), SourceError> { + // Need to cache those in advance, because a notice of cancellation can reset the handler. + let transaction_id = self.tstate.as_ref().unwrap().transaction_id; + let progress = self.fparams.progress; let fh = self.local_cfg.fault_handler.get_fault_handler(cond); match fh { spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => { - if let ControlFlow::Break(_) = self.notice_of_cancellation(cond)? { + if let ControlFlow::Break(_) = self.notice_of_cancellation(user, cond)? { return Ok(()); } } @@ -981,30 +989,30 @@ impl< spacepackets::cfdp::FaultHandlerCode::IgnoreError => (), spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(), } - let tstate = self.tstate.as_ref().unwrap(); self.local_cfg.fault_handler.report_fault( - tstate.transaction_id, + transaction_id, cond, - self.fparams.progress, + progress, ); Ok(()) } fn notice_of_cancellation( &mut self, + user: &mut impl CfdpUser, condition_code: ConditionCode, ) -> Result, SourceError> { + let transaction_id = self.tstate.as_ref().unwrap().transaction_id; // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring // the EOF (cancel) PDU must result in abandonment of the transaction. if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof { if cond_code_eof != ConditionCode::NoError { - let tstate = self.tstate.as_ref().unwrap(); // Still call the abandonment callback to ensure the fault is logged. self.local_cfg .fault_handler .user_hook .get_mut() - .abandoned_cb(tstate.transaction_id, cond_code_eof, self.fparams.progress); + .abandoned_cb(transaction_id, cond_code_eof, self.fparams.progress); self.abandon_transaction(); return Ok(ControlFlow::Break(())); } @@ -1020,7 +1028,13 @@ impl< self.fparams.progress, self.pdu_and_cksum_buffer.get_mut(), )?; - self.prepare_and_send_eof_pdu(checksum)?; + self.prepare_and_send_eof_pdu(user, checksum)?; + if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { + // We are done. + self.reset(); + } else { + self.state_helper.step = TransactionStep::WaitingForEofAck; + } Ok(ControlFlow::Continue(())) } @@ -1072,7 +1086,8 @@ impl< #[cfg(test)] mod tests { - use std::{fs::OpenOptions, io::Write, path::PathBuf, vec::Vec}; + use core::time::Duration; + use std::{fs::OpenOptions, io::Write, path::PathBuf, thread, vec::Vec}; use alloc::string::String; use rand::Rng; @@ -1175,6 +1190,10 @@ mod tests { ) } + fn set_check_limit_timeout(&mut self, timeout: Duration) { + self.handler.timer_creator.check_limit_timeout = timeout; + } + fn put_request( &mut self, put_request: &impl ReadablePutRequest, @@ -1601,14 +1620,48 @@ mod tests { let error = tb.put_request(&put_request); assert!(error.is_err()); let error = error.unwrap_err(); - if let PutRequestError::FileDoesNotExist = error { - } else { + if !matches!(error, PutRequestError::FileDoesNotExist) { panic!("unexpected error type: {:?}", error); } } #[test] fn test_finished_pdu_check_timeout() { - // TODO: Implement. + let fault_handler = TestFaultHandler::default(); + let test_sender = TestCfdpSender::default(); + let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + tb.set_check_limit_timeout(Duration::from_millis(45)); + let filesize = 0; + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Unacknowledged), + Some(true), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, filesize); + let (closure_requested, _pdu_header) = + tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); + tb.common_eof_pdu_check( + &mut cfdp_user, + closure_requested, + filesize, + CRC_32.digest().finalize(), + ); + // After 50 ms delay, we run into a timeout, which leads to a check limit error + // declaration -> leads to a notice of cancellation -> leads to an EOF PDU with the + // appropriate error code. + thread::sleep(Duration::from_millis(50)); + assert_eq!( + tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(), + 0 + ); + let next_pdu = tb.get_next_sent_pdu().unwrap(); + let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format"); + tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc); + assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached); + assert_eq!(eof_pdu.file_size(), 0); + assert_eq!(eof_pdu.file_checksum(), 0); } }