From de3e5c52cbb8e0c8207f32a0f01b2acaed0a1f85 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 24 Sep 2025 17:49:16 +0200 Subject: [PATCH] more tests and refactoring --- src/dest.rs | 224 +++++++++++++++++++++++--- src/lib.rs | 17 +- src/source.rs | 439 ++++++++++++++++++++++++++++---------------------- 3 files changed, 454 insertions(+), 226 deletions(-) diff --git a/src/dest.rs b/src/dest.rs index b9f2f32..a36722c 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -189,7 +189,8 @@ struct TransactionParams { acked_params: Option, deferred_procedure_timer: Option, finished_params: FinishedParams, - positive_ack_params: Option>, + positive_ack_params: Option, + ack_timer: Option, metadata_only: bool, completion_disposition: Cell, checksum: u32, @@ -243,6 +244,7 @@ impl Default for TransactionParams { acked_params: None, metadata_only: false, positive_ack_params: None, + ack_timer: None, finished_params: FinishedParams::default(), completion_disposition: Cell::new(CompletionDisposition::Completed), checksum: 0, @@ -1600,6 +1602,11 @@ impl< } if self.step() == TransactionStep::WaitingForFinishedAck { sent_packets += self.handle_positive_ack_procedures()?; + // Little hack because of the state machine handling order to ensure the transfer + // is completed in one go. + if self.step() == TransactionStep::TransferCompletion { + sent_packets += self.transfer_completion(cfdp_user)?; + } } Ok(sent_packets) } @@ -1692,9 +1699,19 @@ impl< } fn start_positive_ack_procedure(&mut self) { - self.transaction_params.positive_ack_params = Some(PositiveAckParams { - ack_timer: self - .check_timer_creator + match &mut self.transaction_params.positive_ack_params { + Some(current) => { + current.ack_counter = 0; + } + None => { + self.transaction_params.positive_ack_params = Some(PositiveAckParams { + ack_counter: 0, + positive_ack_of_cancellation: false, + }) + } + } + self.transaction_params.ack_timer = Some( + self.check_timer_creator .create_countdown(TimerContext::PositiveAck { expiry_time: self .transaction_params @@ -1703,19 +1720,24 @@ impl< .unwrap() .positive_ack_timer_interval, }), - ack_counter: 0, - }) + ); } fn handle_positive_ack_procedures(&mut self) -> Result { // Do we have positive-ack params? - let params = match self.transaction_params.positive_ack_params.as_mut() { - Some(p) => p, - None => return Ok(0), - }; + if self.transaction_params.positive_ack_params.is_none() { + return Ok(0); + } + let params = self.transaction_params.positive_ack_params.unwrap(); // Has the timer expired? - if !params.ack_timer.has_expired() { + if !self + .transaction_params + .ack_timer + .as_mut() + .unwrap() + .has_expired() + { return Ok(0); } @@ -1731,12 +1753,23 @@ impl< == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); + return Ok(0); } + self.transaction_params + .positive_ack_params + .as_mut() + .unwrap() + .positive_ack_of_cancellation = true; return Ok(0); } - params.ack_timer.reset(); - params.ack_counter += 1; + let params_mut = self + .transaction_params + .positive_ack_params + .as_mut() + .unwrap(); + params_mut.ack_counter += 1; + self.transaction_params.ack_timer.as_mut().unwrap().reset(); self.send_finished_pdu() } @@ -1782,10 +1815,17 @@ impl< // Cache those, because they might be reset when abandoning the transaction. let transaction_id = self.transaction_id().unwrap(); let progress = self.transaction_params.progress; - let fh_code = self + let mut fh_code = self .local_cfg .fault_handler .get_fault_handler(condition_code); + // CFDP 4.11.2.3.2: Any fault declared in the course of transferring the Finished (cancel) + // PDU must result in abadonment of the transaction. + if let Some(positive_ack) = &self.transaction_params.positive_ack_params { + if positive_ack.positive_ack_of_cancellation { + fh_code = FaultHandlerCode::AbandonTransaction; + } + } match fh_code { FaultHandlerCode::NoticeOfCancellation => { self.notice_of_cancellation(condition_code, EntityIdTlv::new(self.local_cfg().id)); @@ -1794,11 +1834,10 @@ impl< FaultHandlerCode::IgnoreError => (), FaultHandlerCode::AbandonTransaction => (), } - self.local_cfg.fault_handler.report_fault(FaultInfo::new( - transaction_id, - condition_code, - progress, - )) + self.local_cfg.fault_handler.report_fault( + fh_code, + FaultInfo::new(transaction_id, condition_code, progress), + ) } fn notice_of_cancellation(&self, condition_code: ConditionCode, fault_location: EntityIdTlv) { @@ -2039,6 +2078,10 @@ mod tests { self.expiry_control.set_nak_activity_expired(); } + fn set_positive_ack_expired(&mut self) { + self.expiry_control.set_positive_ack_expired(); + } + fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser { TestCfdpUser::new( 0, @@ -2151,6 +2194,7 @@ mod tests { user: &mut TestCfdpUser, cond_code: ConditionCode, file_status: FileStatus, + delivery_code: DeliveryCode, ) { assert_eq!(user.finished_indic_queue.len(), 1); let finished_indication = user.finished_indic_queue.pop_front().unwrap(); @@ -2159,7 +2203,7 @@ mod tests { self.handler.transaction_id().unwrap() ); assert_eq!(finished_indication.file_status, file_status); - assert_eq!(finished_indication.delivery_code, DeliveryCode::Incomplete); + assert_eq!(finished_indication.delivery_code, delivery_code); assert_eq!(finished_indication.condition_code, cond_code); } @@ -2195,6 +2239,7 @@ mod tests { &mut self, cond_code: ConditionCode, file_status: FileStatus, + delivery_code: DeliveryCode, ) { let pdu = self.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); @@ -2203,10 +2248,9 @@ mod tests { FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::from_bytes(&pdu.raw_pdu).unwrap(); - assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); + assert_eq!(finished_pdu.delivery_code(), delivery_code); assert_eq!(finished_pdu.file_status(), file_status); assert_eq!(finished_pdu.condition_code(), cond_code); - //assert!(finished_pdu.fault_location().is_none()); } fn acknowledge_finished_pdu( @@ -2236,7 +2280,7 @@ mod tests { if !self.all_fault_queues_empty() { let fh_queues = self.handler.local_cfg.user_fault_hook().borrow(); println!( - "fault queues not empyt. cancellation {}, suspension {}, ignored {}, abandon {}", + "fault queues not empty. cancellation {}, suspension {}, ignored {}, abandon {}", fh_queues.notice_of_cancellation_queue.len(), fh_queues.notice_of_suspension_queue.len(), fh_queues.ignored_queue.len(), @@ -3567,8 +3611,13 @@ mod tests { &mut user, ConditionCode::NakLimitReached, FileStatus::Retained, + DeliveryCode::Incomplete, + ); + tb.check_finished_pdu_failure( + ConditionCode::NakLimitReached, + FileStatus::Retained, + DeliveryCode::Incomplete, ); - tb.check_finished_pdu_failure(ConditionCode::NakLimitReached, FileStatus::Retained); { let mut fault_hook = tb.fault_handler().user_hook.borrow_mut(); @@ -3587,6 +3636,131 @@ mod tests { #[test] fn test_positive_ack_procedure() { - // TODO. + let fault_handler = TestFaultHandler::default(); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); + let mut user = tb.test_user_from_cached_paths(0); + let transfer_info = tb + .generic_transfer_init(&mut user, 0) + .expect("transfer init failed"); + tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); + tb.generic_eof_no_error(&mut user, Vec::new()) + .expect("EOF no error insertion failed"); + tb.check_completion_indication_success(&mut user); + assert_eq!(tb.pdu_queue_len(), 2); + tb.check_eof_ack_pdu(ConditionCode::NoError); + tb.check_finished_pdu_success(); + + tb.set_positive_ack_expired(); + // This should cause the PDU to be sent again. + assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); + tb.check_finished_pdu_success(); + tb.acknowledge_finished_pdu(&mut user, &transfer_info); + } + + fn generic_positive_ack_test( + tb: &mut DestHandlerTestbench, + user: &mut TestCfdpUser, + ) -> TransferInfo { + let transfer_info = tb + .generic_transfer_init(user, 0) + .expect("transfer init failed"); + tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); + tb.generic_eof_no_error(user, Vec::new()) + .expect("EOF no error insertion failed"); + tb.check_completion_indication_success(user); + assert_eq!(tb.pdu_queue_len(), 2); + tb.check_eof_ack_pdu(ConditionCode::NoError); + tb.check_finished_pdu_success(); + + // This should cause the PDU to be sent again. + tb.set_positive_ack_expired(); + assert_eq!(tb.handler.state_machine_no_packet(user).unwrap(), 1); + tb.check_finished_pdu_success(); + + // Positive ACK limit reached. + tb.set_positive_ack_expired(); + assert_eq!(tb.handler.state_machine_no_packet(user).unwrap(), 1); + + tb.check_finished_pdu_failure( + ConditionCode::PositiveAckLimitReached, + FileStatus::Retained, + DeliveryCode::Complete, + ); + tb.check_completion_indication_failure( + user, + ConditionCode::PositiveAckLimitReached, + FileStatus::Retained, + DeliveryCode::Complete, + ); + { + let mut fault_handler = tb.fault_handler().user_hook.borrow_mut(); + assert!(!fault_handler.cancellation_queue_empty()); + let cancellation = fault_handler + .notice_of_cancellation_queue + .pop_front() + .unwrap(); + assert_eq!(cancellation.transaction_id(), transfer_info.id); + assert_eq!( + cancellation.condition_code(), + ConditionCode::PositiveAckLimitReached + ); + assert_eq!(cancellation.progress(), 0); + } + transfer_info + } + + #[test] + fn test_positive_ack_limit_reached() { + let fault_handler = TestFaultHandler::default(); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); + let mut user = tb.test_user_from_cached_paths(0); + let transfer_info = generic_positive_ack_test(&mut tb, &mut user); + // Chances are that this one won't work either leading to transfer abandonment, but we + // acknowledge it here + tb.acknowledge_finished_pdu(&mut user, &transfer_info); + } + + #[test] + fn test_positive_ack_limit_reached_with_subsequent_abandonment() { + let fault_handler = TestFaultHandler::default(); + let mut tb = DestHandlerTestbench::new_with_fixed_paths( + fault_handler, + TransmissionMode::Acknowledged, + false, + ); + let mut user = tb.test_user_from_cached_paths(0); + let transfer_info = generic_positive_ack_test(&mut tb, &mut user); + + // This should cause the PDU to be sent again. + tb.set_positive_ack_expired(); + assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); + tb.check_finished_pdu_failure( + ConditionCode::PositiveAckLimitReached, + FileStatus::Retained, + DeliveryCode::Complete, + ); + + // Postive ACK limit reached which leads to abandonment. + tb.set_positive_ack_expired(); + assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 0); + { + let mut fault_handler = tb.fault_handler().user_hook.borrow_mut(); + assert!(!fault_handler.abandoned_queue_empty()); + let cancellation = fault_handler.abandoned_queue.pop_front().unwrap(); + assert_eq!(cancellation.transaction_id(), transfer_info.id); + assert_eq!( + cancellation.condition_code(), + ConditionCode::PositiveAckLimitReached + ); + assert_eq!(cancellation.progress(), 0); + } } } diff --git a/src/lib.rs b/src/lib.rs index a5f74ee..e899094 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -603,14 +603,9 @@ impl FaultHandler { self.handler_array[array_idx.unwrap()] } - pub fn report_fault(&self, fault_info: FaultInfo) -> FaultHandlerCode { - let array_idx = Self::condition_code_to_array_index(fault_info.condition_code()); - if array_idx.is_none() { - return FaultHandlerCode::IgnoreError; - } - let fh_code = self.handler_array[array_idx.unwrap()]; + pub fn report_fault(&self, code: FaultHandlerCode, fault_info: FaultInfo) -> FaultHandlerCode { let mut handler_mut = self.user_hook.borrow_mut(); - match fh_code { + match code { FaultHandlerCode::NoticeOfCancellation => { handler_mut.notice_of_cancellation_cb(fault_info); } @@ -624,7 +619,7 @@ impl FaultHandler { handler_mut.abandoned_cb(fault_info); } } - fh_code + code } } @@ -1072,10 +1067,10 @@ pub mod alloc_mod { } } -#[derive(Debug)] -struct PositiveAckParams { - ack_timer: CountdownInstance, +#[derive(Debug, Clone, Copy)] +struct PositiveAckParams { ack_counter: u32, + positive_ack_of_cancellation: bool, } #[cfg(test)] diff --git a/src/source.rs b/src/source.rs index 55df9cc..2bcdb6a 100644 --- a/src/source.rs +++ b/src/source.rs @@ -45,8 +45,8 @@ use core::{ use spacepackets::{ ByteConversionError, cfdp::{ - ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl, - TransactionStatus, TransmissionMode, + ConditionCode, Direction, FaultHandlerCode, LargeFileFlag, PduType, SegmentMetadataFlag, + SegmentationControl, TransactionStatus, TransmissionMode, lv::Lv, pdu::{ CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, @@ -96,7 +96,7 @@ pub enum TransactionStep { NoticeOfCompletion = 10, } -#[derive(Default, Copy, Clone)] +#[derive(Default, Debug, Copy, Clone)] pub struct FileParams { pub progress: u64, pub segment_len: u64, @@ -144,16 +144,6 @@ pub struct FinishedParams { file_status: FileStatus, } -#[derive(Debug, derive_new::new)] -pub struct TransferState { - transaction_id: Cell, - remote_cfg: RefCell, - transmission_mode: Cell, - closure_requested: Cell, - cond_code_eof: Cell>, - finished_params: Cell>, -} - #[derive(Debug, thiserror::Error)] pub enum SourceError { #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")] @@ -214,6 +204,49 @@ pub enum FsmContext { ResetWhenPossible, } +#[derive(Debug)] +pub struct TransactionParams { + transaction_id: Option, + remote_cfg: Option, + transmission_mode: Option, + closure_requested: bool, + cond_code_eof: Cell>, + finished_params: Option, + // File specific transfer fields + file_params: FileParams, + // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. + pdu_conf: CommonPduConfig, + check_timer: Option, + positive_ack_params: Cell>, + ack_timer: RefCell>, +} + +impl Default for TransactionParams { + fn default() -> Self { + Self { + transaction_id: Default::default(), + remote_cfg: Default::default(), + transmission_mode: Default::default(), + closure_requested: Default::default(), + cond_code_eof: Default::default(), + finished_params: Default::default(), + file_params: Default::default(), + pdu_conf: Default::default(), + check_timer: Default::default(), + positive_ack_params: Default::default(), + ack_timer: Default::default(), + } + } +} + +impl TransactionParams { + #[inline] + fn reset(&mut self) { + self.transaction_id = None; + self.transmission_mode = None; + } +} + /// This is the primary CFDP source handler. It models the CFDP source entity, which is /// primarily responsible for handling put requests to send files to another CFDP destination /// entity. @@ -257,14 +290,7 @@ pub struct SourceHandler< remote_cfg_table: RemoteConfigStoreInstance, vfs: Vfs, state_helper: StateHelper, - // Transfer related state information - transfer_state: Option, - // File specific transfer fields - file_params: FileParams, - // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. - pdu_conf: CommonPduConfig, - check_timer: RefCell>, - positive_ack_params: RefCell>>, + transaction_params: TransactionParams, timer_creator: TimerCreatorInstance, seq_count_provider: SequenceCounterInstance, anomalies: AnomalyTracker, @@ -330,12 +356,8 @@ impl< vfs, put_request_cacher, state_helper: Default::default(), - transfer_state: Default::default(), - file_params: Default::default(), - pdu_conf: Default::default(), + transaction_params: Default::default(), anomalies: Default::default(), - check_timer: RefCell::new(None), - positive_ack_params: RefCell::new(None), timer_creator, seq_count_provider, } @@ -384,15 +406,13 @@ impl< #[inline] pub fn transaction_id(&self) -> Option { - self.transfer_state.as_ref().map(|v| v.transaction_id.get()) + self.transaction_params.transaction_id } /// Returns the [TransmissionMode] for the active file operation. #[inline] pub fn transmission_mode(&self) -> Option { - self.transfer_state - .as_ref() - .map(|v| v.transmission_mode.get()) + self.transaction_params.transmission_mode } /// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when @@ -487,28 +507,29 @@ impl< }; // Set PDU configuration fields which are important for generating PDUs. - self.pdu_conf + self.transaction_params + .pdu_conf .set_source_and_dest_id( create_id(&self.local_cfg.id), create_id(&self.put_request_cacher.static_fields.destination_id), ) .unwrap(); // Set up other PDU configuration fields. - self.pdu_conf.direction = Direction::TowardsReceiver; - self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into(); - self.pdu_conf.transaction_seq_num = *transaction_id.seq_num(); - self.pdu_conf.trans_mode = transmission_mode; - self.file_params.segment_len = self.calculate_max_file_seg_len(remote_cfg); + self.transaction_params.pdu_conf.direction = Direction::TowardsReceiver; + self.transaction_params.pdu_conf.crc_flag = + remote_cfg.crc_on_transmission_by_default.into(); + self.transaction_params.pdu_conf.transaction_seq_num = *transaction_id.seq_num(); + self.transaction_params.pdu_conf.trans_mode = transmission_mode; + self.transaction_params.file_params.segment_len = + self.calculate_max_file_seg_len(remote_cfg); + + self.transaction_params.transaction_id = Some(transaction_id); + self.transaction_params.remote_cfg = Some(*remote_cfg); + self.transaction_params.transmission_mode = Some(transmission_mode); + self.transaction_params.closure_requested = closure_requested; + self.transaction_params.cond_code_eof.set(None); + self.transaction_params.finished_params = None; - // Set up the transfer context structure. - self.transfer_state = Some(TransferState { - transaction_id: Cell::new(transaction_id), - remote_cfg: RefCell::new(*remote_cfg), - transmission_mode: Cell::new(transmission_mode), - closure_requested: Cell::new(closure_requested), - cond_code_eof: Cell::new(None), - finished_params: Cell::new(None), - }); self.state_helper.state.set(super::State::Busy); Ok(()) } @@ -602,9 +623,7 @@ impl< /// behaviour. pub fn reset(&mut self) { self.state_helper = Default::default(); - self.transfer_state = None; - self.file_params = Default::default(); - *self.check_timer.borrow_mut() = None; + self.transaction_params.reset(); } #[inline] @@ -668,36 +687,54 @@ impl< user: &mut impl CfdpUser, ) -> Result { let mut sent_packets = 0; - let mut positive_ack_limit_reached = false; - if let Some(positive_ack_params) = self.positive_ack_params.borrow_mut().as_mut() { - if positive_ack_params.ack_timer.has_expired() { + let current_params = self.transaction_params.positive_ack_params.get(); + if let Some(mut positive_ack_params) = current_params { + if self + .transaction_params + .ack_timer + .borrow_mut() + .as_ref() + .unwrap() + .has_expired() + { let ack_timer_exp_limit = self - .transfer_state + .transaction_params + .remote_cfg .as_ref() .unwrap() - .remote_cfg - .borrow() .positive_ack_timer_expiration_limit; if positive_ack_params.ack_counter + 1 >= ack_timer_exp_limit { - positive_ack_limit_reached = true; + let (fault_packets_sent, ctx) = + self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?; + sent_packets += fault_packets_sent; + if ctx == FsmContext::ResetWhenPossible { + self.reset(); + } else { + positive_ack_params.ack_counter = 0; + positive_ack_params.positive_ack_of_cancellation = true; + } } else { - positive_ack_params.ack_timer.reset(); + self.transaction_params + .ack_timer + .borrow_mut() + .as_mut() + .unwrap() + .reset(); positive_ack_params.ack_counter += 1; self.prepare_and_send_eof_pdu( user, - self.file_params.checksum_completed_file.unwrap(), + self.transaction_params + .file_params + .checksum_completed_file + .unwrap(), )?; sent_packets += 1; } } - } - if positive_ack_limit_reached { - let (fault_packets_sent, ctx) = - self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?; - if ctx == FsmContext::ResetWhenPossible { - self.reset(); - } - sent_packets += fault_packets_sent; + + self.transaction_params + .positive_ack_params + .set(Some(positive_ack_params)); } Ok(sent_packets) } @@ -712,14 +749,18 @@ impl< sent_packets += 1; continue; } else { - if (segment_req.1 < segment_req.0) || (segment_req.0 > self.file_params.progress) { + if (segment_req.1 < segment_req.0) + || (segment_req.0 > self.transaction_params.file_params.progress) + { return Err(SourceError::InvalidNakPdu); } let mut missing_chunk_len = segment_req.1 - segment_req.0; let current_offset = segment_req.0; while missing_chunk_len > 0 { - let chunk_size = - core::cmp::min(missing_chunk_len, self.file_params.segment_len); + let chunk_size = core::cmp::min( + missing_chunk_len, + self.transaction_params.file_params.segment_len, + ); self.prepare_and_send_file_data_pdu(current_offset, chunk_size)?; sent_packets += 1; missing_chunk_len -= missing_chunk_len; @@ -736,7 +777,12 @@ impl< // If we reach this state, countdown definitely is set. #[allow(clippy::collapsible_if)] if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged - && self.check_timer.borrow().as_ref().unwrap().has_expired() + && self + .transaction_params + .check_timer + .as_ref() + .unwrap() + .has_expired() { let (sent_packets, ctx) = self.declare_fault(user, ConditionCode::CheckLimitReached)?; if ctx == FsmContext::ResetWhenPossible { @@ -750,21 +796,31 @@ impl< fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> { let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), - self.tstate_ref().remote_cfg.borrow().default_crc_type, - self.file_params.file_size, + self.transaction_params + .remote_cfg + .as_ref() + .unwrap() + .default_crc_type, + self.transaction_params.file_params.file_size, &mut self.pdu_and_cksum_buffer.borrow_mut(), )?; - self.file_params.checksum_completed_file = Some(checksum); + self.transaction_params.file_params.checksum_completed_file = Some(checksum); self.prepare_and_send_eof_pdu(user, checksum)?; if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { - if self.tstate_ref().closure_requested.get() { - *self.check_timer.borrow_mut() = Some(self.timer_creator.create_countdown( - crate::TimerContext::CheckLimit { - local_id: self.local_cfg.id, - remote_id: self.tstate_ref().remote_cfg.borrow().entity_id, - entity_type: EntityType::Sending, - }, - )); + if self.transaction_params.closure_requested { + self.transaction_params.check_timer = Some( + self.timer_creator + .create_countdown(crate::TimerContext::CheckLimit { + local_id: self.local_cfg.id, + remote_id: self + .transaction_params + .remote_cfg + .as_ref() + .unwrap() + .entity_id, + entity_type: EntityType::Sending, + }), + ); self.set_step(TransactionStep::WaitingForFinished); } else { self.set_step(TransactionStep::NoticeOfCompletion); @@ -777,18 +833,33 @@ impl< fn start_positive_ack_procedure(&self) { self.set_step_internal(TransactionStep::WaitingForEofAck); - *self.positive_ack_params.borrow_mut() = Some(PositiveAckParams { - ack_timer: self - .timer_creator + match self.transaction_params.positive_ack_params.get() { + Some(mut current) => { + current.ack_counter = 0; + self.transaction_params + .positive_ack_params + .set(Some(current)); + } + None => self + .transaction_params + .positive_ack_params + .set(Some(PositiveAckParams { + ack_counter: 0, + positive_ack_of_cancellation: false, + })), + } + + *self.transaction_params.ack_timer.borrow_mut() = Some( + self.timer_creator .create_countdown(crate::TimerContext::PositiveAck { expiry_time: self - .tstate_ref() + .transaction_params .remote_cfg - .borrow() + .as_ref() + .unwrap() .positive_ack_timer_interval, }), - ack_counter: 0, - }) + ); } fn handle_transaction_start( @@ -796,7 +867,7 @@ impl< cfdp_user: &mut impl CfdpUser, ) -> Result<(), SourceError> { if !self.put_request_cacher.has_source_file() { - self.file_params.metadata_only = true; + self.transaction_params.file_params.metadata_only = true; } else { let source_file = self .put_request_cacher @@ -811,14 +882,14 @@ impl< self.put_request_cacher .dest_file() .map_err(SourceError::DestFileNotValidUtf8)?; - self.file_params.file_size = self.vfs.file_size(source_file)?; - if self.file_params.file_size > u32::MAX as u64 { - self.pdu_conf.file_flag = LargeFileFlag::Large + self.transaction_params.file_params.file_size = self.vfs.file_size(source_file)?; + if self.transaction_params.file_params.file_size > u32::MAX as u64 { + self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Large } else { - if self.file_params.file_size == 0 { - self.file_params.empty_file = true; + if self.transaction_params.file_params.file_size == 0 { + self.transaction_params.file_params.empty_file = true; } - self.pdu_conf.file_flag = LargeFileFlag::Normal + self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Normal } } cfdp_user.transaction_indication(&self.transaction_id().unwrap()); @@ -831,7 +902,7 @@ impl< transaction_status: TransactionStatus, ) -> Result<(), SourceError> { let ack_pdu = AckPdu::new( - PduHeader::new_for_file_directive(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0), FileDirectiveType::FinishedPdu, condition_code, transaction_status, @@ -842,15 +913,18 @@ impl< } fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> { - let tstate = self.tstate_ref(); let metadata_params = MetadataGenericParams::new( - tstate.closure_requested.get(), - tstate.remote_cfg.borrow().default_crc_type, - self.file_params.file_size, + self.transaction_params.closure_requested, + self.transaction_params + .remote_cfg + .as_ref() + .unwrap() + .default_crc_type, + self.transaction_params.file_params.file_size, ); - if self.file_params.metadata_only { + if self.transaction_params.file_params.metadata_only { let metadata_pdu = MetadataPduCreator::new( - PduHeader::new_for_file_directive(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0), metadata_params, Lv::new_empty(), Lv::new_empty(), @@ -859,7 +933,7 @@ impl< return self.pdu_send_helper(&metadata_pdu); } let metadata_pdu = MetadataPduCreator::new( - PduHeader::new_for_file_directive(self.pdu_conf, 0), + PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0), metadata_params, Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(), Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(), @@ -869,24 +943,25 @@ impl< } fn file_data_fsm(&mut self) -> Result, SourceError> { - //if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged { - // TODO: Handle re-transmission - //} - if !self.file_params.metadata_only - && self.file_params.progress < self.file_params.file_size + if !self.transaction_params.file_params.metadata_only + && self.transaction_params.file_params.progress + < self.transaction_params.file_params.file_size && self.send_progressing_file_data_pdu()? { return Ok(ControlFlow::Break(1)); } - if self.file_params.empty_file || self.file_params.progress >= self.file_params.file_size { + if self.transaction_params.file_params.empty_file + || self.transaction_params.file_params.progress + >= self.transaction_params.file_params.file_size + { // EOF is still expected. self.set_step(TransactionStep::SendingEof); - self.tstate_ref() + self.transaction_params .cond_code_eof .set(Some(ConditionCode::NoError)); - } else if self.file_params.metadata_only { + } else if self.transaction_params.file_params.metadata_only { // Special case: Metadata Only, no EOF required. - if self.tstate_ref().closure_requested.get() { + if self.transaction_params.closure_requested { self.set_step(TransactionStep::WaitingForFinished); } else { self.set_step(TransactionStep::NoticeOfCompletion); @@ -898,7 +973,7 @@ impl< fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) { if self.local_cfg.indication_cfg.transaction_finished { // The first case happens for unacknowledged file copy operation with no closure. - let finished_params = match self.tstate_ref().finished_params.get() { + let finished_params = match self.transaction_params.finished_params { Some(finished_params) => TransactionFinishedParams { id: self.transaction_id().unwrap(), condition_code: finished_params.condition_code, @@ -918,7 +993,7 @@ impl< fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 { let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( - &PduHeader::new_for_file_directive(self.pdu_conf, 0), + &PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0), remote_cfg.max_packet_len, None, ); @@ -933,14 +1008,19 @@ impl< fn send_progressing_file_data_pdu(&mut self) -> Result { // Should never be called, but use defensive programming here. - if self.file_params.progress >= self.file_params.file_size { + if self.transaction_params.file_params.progress + >= self.transaction_params.file_params.file_size + { return Ok(false); } - let read_len = self - .file_params - .segment_len - .min(self.file_params.file_size - self.file_params.progress); - self.prepare_and_send_file_data_pdu(self.file_params.progress, read_len)?; + let read_len = self.transaction_params.file_params.segment_len.min( + self.transaction_params.file_params.file_size + - self.transaction_params.file_params.progress, + ); + self.prepare_and_send_file_data_pdu( + self.transaction_params.file_params.progress, + read_len, + )?; Ok(true) } @@ -951,7 +1031,7 @@ impl< ) -> Result<(), SourceError> { let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata( PduHeader::new_for_file_data( - self.pdu_conf, + self.transaction_params.pdu_conf, 0, SegmentMetadataFlag::NotPresent, SegmentationControl::NoRecordBoundaryPreservation, @@ -973,7 +1053,7 @@ impl< None, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; - self.file_params.progress += size; + self.transaction_params.file_params.progress += size; Ok(()) } @@ -983,13 +1063,13 @@ impl< checksum: u32, ) -> Result<(), SourceError> { let eof_pdu = EofPdu::new( - PduHeader::new_for_file_directive(self.pdu_conf, 0), - self.tstate_ref() + PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0), + self.transaction_params .cond_code_eof .get() .unwrap_or(ConditionCode::NoError), checksum, - self.file_params.progress, + self.transaction_params.file_params.progress, None, ); self.pdu_send_helper(&eof_pdu)?; @@ -1021,15 +1101,14 @@ impl< directive_type: Some(FileDirectiveType::FinishedPdu), }); } - let tstate_ref = self.tstate_ref(); // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE // mode. - tstate_ref.finished_params.set(Some(FinishedParams { + self.transaction_params.finished_params = Some(FinishedParams { condition_code: finished_pdu.condition_code(), delivery_code: finished_pdu.delivery_code(), file_status: finished_pdu.file_status(), - })); - if tstate_ref.transmission_mode.get() == TransmissionMode::Acknowledged { + }); + if let Some(TransmissionMode::Acknowledged) = self.transmission_mode() { self.prepare_and_send_ack_pdu( finished_pdu.condition_code(), TransactionStatus::Active, @@ -1066,12 +1145,9 @@ impl< condition_code: ConditionCode, ) -> Result { let mut sent_packets = 0; - match self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)? { - ControlFlow::Continue(ctx) | ControlFlow::Break(ctx) => { - if ctx == FsmContext::ResetWhenPossible { - self.reset(); - } - } + let ctx = self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)?; + if ctx == FsmContext::ResetWhenPossible { + self.reset(); } Ok(sent_packets) } @@ -1081,43 +1157,30 @@ impl< user: &mut impl CfdpUser, condition_code: ConditionCode, sent_packets: &mut u32, - ) -> Result, SourceError> { - let transaction_id = self.transaction_id().unwrap(); - // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring - // the EOF (cancel) PDU must result in abandonment of the transaction. - if let Some(cond_code_eof) = self.tstate_ref().cond_code_eof.get() { - if cond_code_eof != ConditionCode::NoError { - // Still call the abandonment callback to ensure the fault is logged. - self.local_cfg - .fault_handler - .user_hook - .borrow_mut() - .abandoned_cb(FaultInfo::new( - transaction_id, - cond_code_eof, - self.file_params.progress, - )); - return Ok(ControlFlow::Break(FsmContext::ResetWhenPossible)); - } - } - - self.tstate_ref().cond_code_eof.set(Some(condition_code)); + ) -> Result { + self.transaction_params + .cond_code_eof + .set(Some(condition_code)); // As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply // the checksum for the file copy progress sent so far. let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), - self.tstate_ref().remote_cfg.borrow().default_crc_type, - self.file_params.progress, + self.transaction_params + .remote_cfg + .as_ref() + .unwrap() + .default_crc_type, + self.transaction_params.file_params.progress, &mut self.pdu_and_cksum_buffer.borrow_mut(), )?; self.prepare_and_send_eof_pdu(user, checksum)?; *sent_packets += 1; if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { // We are done. - Ok(ControlFlow::Continue(FsmContext::ResetWhenPossible)) + Ok(FsmContext::ResetWhenPossible) } else { self.start_positive_ack_procedure(); - Ok(ControlFlow::Continue(FsmContext::default())) + Ok(FsmContext::default()) } } @@ -1140,42 +1203,38 @@ impl< cond: ConditionCode, ) -> Result<(u32, FsmContext), SourceError> { let mut sent_packets = 0; - let fh = self.local_cfg.fault_handler.get_fault_handler(cond); - let mut ctx = FsmContext::default(); - match fh { - spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => { - match self.notice_of_cancellation_internal(user, cond, &mut sent_packets)? { - ControlFlow::Continue(ctx_cancellation) => { - ctx = ctx_cancellation; - } - ControlFlow::Break(ctx_cancellation) => { - return Ok((sent_packets, ctx_cancellation)); - } - } - } - spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => { - self.notice_of_suspension_internal(); - } - spacepackets::cfdp::FaultHandlerCode::IgnoreError => (), - spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => { - return Ok((sent_packets, FsmContext::ResetWhenPossible)); + let mut fh = self.local_cfg.fault_handler.get_fault_handler(cond); + // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring + // the EOF (cancel) PDU must result in abandonment of the transaction. + if let Some(positive_ack_params) = self.transaction_params.positive_ack_params.get() { + if positive_ack_params.positive_ack_of_cancellation { + fh = FaultHandlerCode::AbandonTransaction; } } - self.local_cfg.fault_handler.report_fault(FaultInfo::new( - self.transaction_id().unwrap(), - cond, - self.file_params.progress, - )); + let mut ctx = FsmContext::default(); + match fh { + FaultHandlerCode::NoticeOfCancellation => { + ctx = self.notice_of_cancellation_internal(user, cond, &mut sent_packets)?; + } + FaultHandlerCode::NoticeOfSuspension => { + self.notice_of_suspension_internal(); + } + FaultHandlerCode::IgnoreError => (), + FaultHandlerCode::AbandonTransaction => { + ctx = FsmContext::ResetWhenPossible; + } + } + self.local_cfg.fault_handler.report_fault( + fh, + FaultInfo::new( + self.transaction_id().unwrap(), + cond, + self.transaction_params.file_params.progress, + ), + ); Ok((sent_packets, ctx)) } - // Internal helper function. - fn tstate_ref(&self) -> &TransferState { - self.transfer_state - .as_ref() - .expect("transfer state should be set in busy state") - } - fn handle_keep_alive_pdu(&mut self) {} }