diff --git a/examples/python-interop/main.rs b/examples/python-interop/main.rs index cb4d99e..8c775cf 100644 --- a/examples/python-interop/main.rs +++ b/examples/python-interop/main.rs @@ -15,7 +15,7 @@ use cfdp::{ source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider, - RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider, + RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider, }; use clap::Parser; use log::{debug, info, warn}; @@ -346,6 +346,7 @@ fn main() { put_request_cacher, 2048, remote_cfg_python, + StdTimerCreator::default(), seq_count_provider, ); let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending); @@ -361,7 +362,7 @@ fn main() { dest_tm_tx, NativeFilestore::default(), remote_cfg_python, - StdCheckTimerCreator::default(), + StdTimerCreator::default(), ); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving); diff --git a/src/dest.rs b/src/dest.rs index 6fbc486..8781aff 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -24,7 +24,7 @@ //! //! 3. Finished PDU has been sent back to the remote side. //! -//! ### Acknowledged mode +//! ### Acknowledged mode (*not implemented yet*) //! //! 3. An EOF ACK PDU has been sent back to the remote side. //! 4. A Finished PDU has been sent back to the remote side. @@ -35,10 +35,10 @@ use core::str::{from_utf8, from_utf8_unchecked, Utf8Error}; use super::{ filestore::{FilestoreError, NativeFilestore, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, - CheckTimerProviderCreator, CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, - PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCheckTimer, - StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext, TransactionId, - UserFaultHookProvider, + CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, + RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCountdown, + StdRemoteEntityConfigProvider, StdTimerCreator, TimerContext, TimerCreatorProvider, + TransactionId, UserFaultHookProvider, }; use smallvec::SmallVec; use spacepackets::{ @@ -249,7 +249,7 @@ pub struct DestinationHandler< UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, - CheckTimerCreator: CheckTimerProviderCreator, + CheckTimerCreator: TimerCreatorProvider, CheckTimerProvider: CountdownProvider, > { local_cfg: LocalEntityConfig, @@ -269,8 +269,8 @@ pub type StdDestinationHandler = DestinationHandler< UserFaultHook, NativeFilestore, StdRemoteEntityConfigProvider, - StdCheckTimerCreator, - StdCheckTimer, + StdTimerCreator, + StdCountdown, >; impl< @@ -278,7 +278,7 @@ impl< UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, - CheckTimerCreator: CheckTimerProviderCreator, + CheckTimerCreator: TimerCreatorProvider, CheckTimerProvider: CountdownProvider, > DestinationHandler< @@ -601,6 +601,7 @@ impl< file_delivery_complete = true; } else { match self.vfs.checksum_verify( + checksum, // Safety: It was already verified that the path is valid during the transaction start. unsafe { from_utf8_unchecked( @@ -609,7 +610,7 @@ impl< ) }, self.tparams.metadata_params().checksum_type, - checksum, + self.tparams.tstate.progress, &mut self.tparams.cksum_buf, ) { Ok(checksum_success) => { @@ -642,14 +643,13 @@ impl< fn start_check_limit_handling(&mut self) { self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling; - self.tparams.tstate.current_check_timer = Some( - self.check_timer_creator - .create_check_timer_provider(TimerContext::CheckLimit { - local_id: self.local_cfg.id, - remote_id: self.tparams.remote_cfg.unwrap().entity_id, - entity_type: EntityType::Receiving, - }), - ); + self.tparams.tstate.current_check_timer = Some(self.check_timer_creator.create_countdown( + TimerContext::CheckLimit { + local_id: self.local_cfg.id, + remote_id: self.tparams.remote_cfg.unwrap().entity_id, + entity_type: EntityType::Receiving, + }, + )); self.tparams.tstate.current_check_count = 0; } @@ -951,8 +951,8 @@ mod tests { basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler, LOCAL_ID, }, - CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig, - PduRawWithInfo, StdRemoteEntityConfigProvider, CRC_32, + CountdownProvider, FaultHandler, IndicationConfig, PduRawWithInfo, + StdRemoteEntityConfigProvider, TimerCreatorProvider, CRC_32, }; use super::*; @@ -995,10 +995,10 @@ mod tests { } } - impl CheckTimerProviderCreator for TestCheckTimerCreator { - type CheckTimer = TestCheckTimer; + impl TimerCreatorProvider for TestCheckTimerCreator { + type Countdown = TestCheckTimer; - fn create_check_timer_provider(&self, timer_context: TimerContext) -> Self::CheckTimer { + fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { match timer_context { TimerContext::CheckLimit { .. } => { TestCheckTimer::new(self.check_limit_expired_flag.clone()) diff --git a/src/filestore.rs b/src/filestore.rs index d53fcd3..390c060 100644 --- a/src/filestore.rs +++ b/src/filestore.rs @@ -155,6 +155,7 @@ pub trait VirtualFilestore { &self, file_path: &str, checksum_type: ChecksumType, + size_to_verify: u64, verification_buf: &mut [u8], ) -> Result; @@ -167,13 +168,14 @@ pub trait VirtualFilestore { /// 4096 or 8192 bytes. fn checksum_verify( &self, + expected_checksum: u32, file_path: &str, checksum_type: ChecksumType, - expected_checksum: u32, + size_to_verify: u64, verification_buf: &mut [u8], ) -> Result { Ok( - self.calculate_checksum(file_path, checksum_type, verification_buf)? + self.calculate_checksum(file_path, checksum_type, size_to_verify, verification_buf)? == expected_checksum, ) } @@ -326,18 +328,23 @@ pub mod std_mod { &self, file_path: &str, checksum_type: ChecksumType, + size_to_verify: u64, verification_buf: &mut [u8], ) -> Result { let mut calc_with_crc_lib = |crc: Crc| -> Result { let mut digest = crc.digest(); - let file_to_check = File::open(file_path)?; - let mut buf_reader = BufReader::new(file_to_check); - loop { - let bytes_read = buf_reader.read(verification_buf)?; + let mut buf_reader = BufReader::new(File::open(file_path)?); + let mut remaining_bytes = size_to_verify; + while remaining_bytes > 0 { + // Read the smaller of the remaining bytes or the buffer size + let bytes_to_read = remaining_bytes.min(verification_buf.len() as u64) as usize; + let bytes_read = buf_reader.read(&mut verification_buf[0..bytes_to_read])?; + if bytes_read == 0 { - break; + break; // Reached end of file } digest.update(&verification_buf[0..bytes_read]); + remaining_bytes -= bytes_read as u64; } Ok(digest.finalize()) }; @@ -776,9 +783,10 @@ mod tests { checksum = checksum.wrapping_add(u32::from_be_bytes(buffer)); let mut verif_buf: [u8; 32] = [0; 32]; let result = NATIVE_FS.checksum_verify( + checksum, file_path.to_str().unwrap(), ChecksumType::Modular, - checksum, + EXAMPLE_DATA_CFDP.len() as u64, &mut verif_buf, ); assert!(result.is_ok()); @@ -791,6 +799,7 @@ mod tests { // The file to check does not even need to exist, and the verification buffer can be // empty: the null checksum is always yields the same result. let result = NATIVE_FS.checksum_verify( + 0, file_path.to_str().unwrap(), ChecksumType::NullChecksum, 0, @@ -807,6 +816,7 @@ mod tests { // The file to check does not even need to exist, and the verification buffer can be // empty: the null checksum is always yields the same result. let result = NATIVE_FS.checksum_verify( + 0, file_path.to_str().unwrap(), ChecksumType::Crc32Proximity1, 0, diff --git a/src/lib.rs b/src/lib.rs index 6d5fd8d..1d1d738 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -108,13 +108,13 @@ pub enum TimerContext { /// /// ## 3. Positive ACK procedures /// -/// The timer will be used to perform the Positive Acknowledgement Procedures as specified in +/// The timer will be used to perform the Positive Acknowledgement Procedures as specified in /// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer /// interval of the remote entity configuration. -pub trait CheckTimerProviderCreator { - type CheckTimer: CountdownProvider; +pub trait TimerCreatorProvider { + type Countdown: CountdownProvider; - fn create_check_timer_provider(&self, timer_context: TimerContext) -> Self::CheckTimer; + fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown; } /// This structure models the remote entity configuration information as specified in chapter 8.3 @@ -591,12 +591,12 @@ pub mod std_mod { /// Simple implementation of the [CountdownProvider] trait assuming a standard runtime. /// It also assumes that a second accuracy of the check timer period is sufficient. #[derive(Debug)] - pub struct StdCheckTimer { + pub struct StdCountdown { expiry_time_seconds: u64, start_time: std::time::Instant, } - impl StdCheckTimer { + impl StdCountdown { pub fn new(expiry_time_seconds: u64) -> Self { Self { expiry_time_seconds, @@ -609,7 +609,7 @@ pub mod std_mod { } } - impl CountdownProvider for StdCheckTimer { + impl CountdownProvider for StdCountdown { fn has_expired(&self) -> bool { let elapsed_time = self.start_time.elapsed(); if elapsed_time.as_nanos() > self.expiry_time_seconds as u128 * 1_000_000_000 { @@ -623,11 +623,11 @@ pub mod std_mod { } } - pub struct StdCheckTimerCreator { + pub struct StdTimerCreator { pub check_limit_timeout_secs: u64, } - impl StdCheckTimerCreator { + impl StdTimerCreator { pub const fn new(check_limit_timeout_secs: u64) -> Self { Self { check_limit_timeout_secs, @@ -635,28 +635,28 @@ pub mod std_mod { } } - impl Default for StdCheckTimerCreator { + impl Default for StdTimerCreator { fn default() -> Self { Self::new(5) } } - impl CheckTimerProviderCreator for StdCheckTimerCreator { - type CheckTimer = StdCheckTimer; + impl TimerCreatorProvider for StdTimerCreator { + type Countdown = StdCountdown; - fn create_check_timer_provider(&self, timer_context: TimerContext) -> Self::CheckTimer { + fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { match timer_context { TimerContext::CheckLimit { local_id: _, remote_id: _, entity_type: _, - } => StdCheckTimer::new(self.check_limit_timeout_secs), + } => StdCountdown::new(self.check_limit_timeout_secs), TimerContext::NakActivity { expiry_time_seconds, - } => StdCheckTimer::new(Duration::from_secs_f32(expiry_time_seconds).as_secs()), + } => StdCountdown::new(Duration::from_secs_f32(expiry_time_seconds).as_secs()), TimerContext::PositiveAck { expiry_time_seconds, - } => StdCheckTimer::new(Duration::from_secs_f32(expiry_time_seconds).as_secs()), + } => StdCountdown::new(Duration::from_secs_f32(expiry_time_seconds).as_secs()), } } } @@ -725,6 +725,8 @@ pub enum PacketTarget { DestEntity, } +/// Generic trait which models a raw CFDP packet data unit (PDU) block with some additional context +/// information. pub trait PduProvider { fn pdu_type(&self) -> PduType; fn file_directive_type(&self) -> Option; @@ -948,7 +950,7 @@ pub(crate) mod tests { }; use user::{CfdpUser, OwnedMetadataRecvdParams, TransactionFinishedParams}; - use crate::{PacketTarget, StdCheckTimer}; + use crate::{PacketTarget, StdCountdown}; use super::*; @@ -1298,7 +1300,7 @@ pub(crate) mod tests { #[test] fn test_std_check_timer() { - let mut std_check_timer = StdCheckTimer::new(1); + let mut std_check_timer = StdCountdown::new(1); assert!(!std_check_timer.has_expired()); assert_eq!(std_check_timer.expiry_time_seconds(), 1); std::thread::sleep(Duration::from_millis(800)); @@ -1311,11 +1313,10 @@ pub(crate) mod tests { #[test] fn test_std_check_timer_creator() { - let std_check_timer_creator = StdCheckTimerCreator::new(1); - let check_timer = - std_check_timer_creator.create_check_timer_provider(TimerContext::NakActivity { - expiry_time_seconds: 1.0, - }); + let std_check_timer_creator = StdTimerCreator::new(1); + let check_timer = std_check_timer_creator.create_countdown(TimerContext::NakActivity { + expiry_time_seconds: 1.0, + }); assert_eq!(check_timer.expiry_time_seconds(), 1); } diff --git a/src/source.rs b/src/source.rs index c41cd61..d7c4d3f 100644 --- a/src/source.rs +++ b/src/source.rs @@ -12,11 +12,11 @@ //! The [SourceHandler::state_machine] will generally perform the following steps after a valid //! put request was received through the [SourceHandler::put_request] method: //! -//! 1. Generate the Metadata PDU to be sent to a remote CFDP entity. You can use the +//! 1. Generate the Metadata PDU to be sent to a remote CFDP entity. You can use the //! [spacepackets::cfdp::pdu::metadata::MetadataPduReader] to inspect the generated PDU. -//! 2. Generate all File Data PDUs to be sent to a remote CFDP entity if applicable (file not +//! 2. Generate all File Data PDUs to be sent to a remote CFDP entity if applicable (file not //! empty). The PDU(s) can be inspected using the [spacepackets::cfdp::pdu::file_data::FileDataPdu] reader. -//! 3. Generate an EOF PDU to be sent to a remote CFDP entity. The PDU can be inspected using +//! 3. Generate an EOF PDU to be sent to a remote CFDP entity. The PDU can be inspected using //! the [spacepackets::cfdp::pdu::eof::EofPdu] reader. //! //! If this is an unacknowledged transfer with no transaction closure, the file transfer will be @@ -24,16 +24,16 @@ //! //! ### Unacknowledged transfer with requested closure //! -//! 4. A Finished PDU will be awaited, for example one generated using +//! 4. A Finished PDU will be awaited, for example one generated using //! [spacepackets::cfdp::pdu::finished::FinishedPduCreator]. //! //! ### Acknowledged transfer (*not implemented yet*) //! -//! 4. A EOF ACK packet will be awaited, for example one generated using +//! 4. A EOF ACK packet will be awaited, for example one generated using //! [spacepackets::cfdp::pdu::ack::AckPdu]. -//! 5. A Finished PDU will be awaited, for example one generated using +//! 5. A Finished PDU will be awaited, for example one generated using //! [spacepackets::cfdp::pdu::finished::FinishedPduCreator]. -//! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity. +//! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity. //! The [spacepackets::cfdp::pdu::finished::FinishedPduReader] can be used to inspect the //! generated PDU. use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error}; @@ -60,7 +60,10 @@ use spacepackets::{ use spacepackets::seq_count::SequenceCountProvider; -use crate::{DummyPduProvider, GenericSendError, PduProvider}; +use crate::{ + time::CountdownProvider, DummyPduProvider, EntityType, GenericSendError, PduProvider, + TimerCreatorProvider, +}; use super::{ filestore::{FilestoreError, VirtualFilestore}, @@ -208,6 +211,8 @@ pub struct SourceHandler< UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, + TimerCreator: TimerCreatorProvider, + Countdown: CountdownProvider, SeqCountProvider: SequenceCountProvider, > { local_cfg: LocalEntityConfig, @@ -223,6 +228,8 @@ pub struct SourceHandler< fparams: FileParams, // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. pdu_conf: CommonPduConfig, + countdown: Option, + timer_creator: TimerCreator, seq_count_provider: SeqCountProvider, } @@ -231,8 +238,19 @@ impl< UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, + CheckTimerCreator: TimerCreatorProvider, + CheckTimerProvider: CountdownProvider, SeqCountProvider: SequenceCountProvider, - > SourceHandler + > + SourceHandler< + PduSender, + UserFaultHook, + Vfs, + RemoteCfgTable, + CheckTimerCreator, + CheckTimerProvider, + SeqCountProvider, + > { /// Creates a new instance of a source handler. /// @@ -251,8 +269,12 @@ impl< /// example 2048 or 4096 bytes. /// * `remote_cfg_table` - The [RemoteEntityConfigProvider] used to look up remote /// entities and target specific configuration for file copy operations. + /// * `check_timer_creator` - [CheckTimerProviderCreator] used by the CFDP handler to generate + /// timers required by various tasks. This allows to use this handler for embedded systems + /// where the standard time APIs might not be available. /// * `seq_count_provider` - The [SequenceCountProvider] used to generate the [TransactionId] /// which contains an incrementing counter. + #[allow(clippy::too_many_arguments)] pub fn new( cfg: LocalEntityConfig, pdu_sender: PduSender, @@ -260,6 +282,7 @@ impl< put_request_cacher: StaticPutRequestCacher, pdu_and_cksum_buf_size: usize, remote_cfg_table: RemoteCfgTable, + check_timer_creator: CheckTimerCreator, seq_count_provider: SeqCountProvider, ) -> Self { Self { @@ -273,6 +296,8 @@ impl< tstate: Default::default(), fparams: Default::default(), pdu_conf: Default::default(), + countdown: None, + timer_creator: check_timer_creator, seq_count_provider, } } @@ -296,9 +321,9 @@ impl< pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, - packet_to_insert: Option<&impl PduProvider>, + pdu: Option<&impl PduProvider>, ) -> Result { - if let Some(packet) = packet_to_insert { + if let Some(packet) = pdu { self.insert_packet(cfdp_user, packet)?; } match self.state_helper.state { @@ -306,7 +331,7 @@ impl< // TODO: In acknowledged mode, add timer handling. Ok(0) } - super::State::Busy => self.fsm_busy(cfdp_user), + super::State::Busy => self.fsm_busy(cfdp_user, pdu), super::State::Suspended => { // There is now way to suspend the handler currently anyway. Ok(0) @@ -426,6 +451,34 @@ impl< Ok(()) } + /// This functions models the Cancel.request CFDP primitive and is the recommended way to + /// cancel a transaction. + /// + /// This method will cause a Notice of Cancellation at this entity if a transaction is active + /// and the passed transaction ID matches the currently active transaction ID. Please note + /// that the state machine might still be active because a cancelled transfer might still + /// require some packets to be sent to the remote receiver entity. + /// + /// If not unexpected errors occur, this method returns [true] if the transfer was cancelled + /// propery and [false] if there is no transaction active or the passed transaction ID and the + /// active ID do not match. + pub fn cancel_request(&mut self, transaction_id: &TransactionId) -> Result { + if self.state_helper.state == super::State::Idle { + return Ok(false); + } + if let Some(active_id) = self.transaction_id() { + if active_id == *transaction_id { + self.declare_fault(ConditionCode::CancelRequestReceived)?; + return Ok(true); + } + } + Ok(false) + } + + pub fn transaction_id(&self) -> Option { + self.tstate.as_ref().map(|v| v.transaction_id) + } + fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 { let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( &PduHeader::new_no_file_data(self.pdu_conf, 0), @@ -447,7 +500,11 @@ impl< self.tstate.as_ref().map(|v| v.transmission_mode) } - fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { + fn fsm_busy( + &mut self, + cfdp_user: &mut impl CfdpUser, + pdu: Option<&impl PduProvider>, + ) -> Result { let mut sent_packets = 0; if self.state_helper.step == TransactionStep::Idle { self.state_helper.step = TransactionStep::TransactionStart; @@ -473,31 +530,7 @@ impl< sent_packets += 1; } if self.state_helper.step == TransactionStep::WaitingForFinished { - /* - def _handle_wait_for_finish(self): - if ( - self.transmission_mode == TransmissionMode.ACKNOWLEDGED - and self.__handle_retransmission() - ): - return - if ( - self._inserted_pdu.pdu is None - or self._inserted_pdu.pdu_directive_type is None - or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU - ): - if self._params.check_timer is not None: - if self._params.check_timer.timed_out(): - self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED) - return - finished_pdu = self._inserted_pdu.to_finished_pdu() - self._inserted_pdu.pdu = None - self._params.finished_params = finished_pdu.finished_params - if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: - self._prepare_finished_ack_packet(finished_pdu.condition_code) - self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED - else: - self.states.step = TransactionStep.NOTICE_OF_COMPLETION - */ + self.handle_wait_for_finished_pdu(pdu)?; } if self.state_helper.step == TransactionStep::NoticeOfCompletion { self.notice_of_completion(cfdp_user); @@ -506,11 +539,65 @@ impl< Ok(sent_packets) } + fn handle_wait_for_finished_pdu( + &mut self, + packet: Option<&impl PduProvider>, + ) -> Result<(), SourceError> { + if let Some(packet) = packet { + if let Some(FileDirectiveType::FinishedPdu) = packet.file_directive_type() { + let finished_pdu = FinishedPduReader::new(packet.pdu())?; + self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams { + condition_code: finished_pdu.condition_code(), + delivery_code: finished_pdu.delivery_code(), + file_status: finished_pdu.file_status(), + }); + if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { + // TODO: Ack packet handling + self.state_helper.step = TransactionStep::NoticeOfCompletion; + } else { + self.state_helper.step = TransactionStep::NoticeOfCompletion; + } + return Ok(()); + } + } + // If we reach this state, countdown is definitely valid instance. + if self.countdown.as_ref().unwrap().has_expired() { + self.declare_fault(ConditionCode::CheckLimitReached)?; + } + /* + def _handle_wait_for_finish(self): + if ( + self.transmission_mode == TransmissionMode.ACKNOWLEDGED + and self.__handle_retransmission() + ): + return + if ( + self._inserted_pdu.pdu is None + or self._inserted_pdu.pdu_directive_type is None + or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU + ): + if self._params.check_timer is not None: + if self._params.check_timer.timed_out(): + self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED) + return + finished_pdu = self._inserted_pdu.to_finished_pdu() + self._inserted_pdu.pdu = None + self._params.finished_params = finished_pdu.finished_params + if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: + self._prepare_finished_ack_packet(finished_pdu.condition_code) + self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED + else: + self.states.step = TransactionStep.NOTICE_OF_COMPLETION + */ + Ok(()) + } + fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> { let tstate = self.tstate.as_ref().unwrap(); let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), tstate.remote_cfg.default_crc_type, + self.fparams.file_size, self.pdu_and_cksum_buffer.get_mut(), )?; self.prepare_and_send_eof_pdu(checksum)?; @@ -520,7 +607,13 @@ impl< } if tstate.transmission_mode == TransmissionMode::Unacknowledged { if tstate.closure_requested { - // TODO: Check timer handling. + self.countdown = Some(self.timer_creator.create_countdown( + crate::TimerContext::CheckLimit { + local_id: self.local_cfg.id, + remote_id: tstate.remote_cfg.entity_id, + entity_type: EntityType::Sending, + }, + )); self.state_helper.step = TransactionStep::WaitingForFinished; } else { self.state_helper.step = TransactionStep::NoticeOfCompletion; @@ -874,6 +967,97 @@ impl< &self.local_cfg } + fn declare_fault(&mut self, cond: ConditionCode) -> Result<(), SourceError> { + let fh = self.local_cfg.fault_handler.get_fault_handler(cond); + match fh { + spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => { + if let ControlFlow::Break(_) = self.notice_of_cancellation(cond)? { + return Ok(()); + } + } + spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => { + self.notice_of_suspension(); + } + spacepackets::cfdp::FaultHandlerCode::IgnoreError => (), + spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(), + } + let tstate = self.tstate.as_ref().unwrap(); + self.local_cfg.fault_handler.report_fault( + tstate.transaction_id, + cond, + self.fparams.progress, + ); + Ok(()) + } + + fn notice_of_cancellation( + &mut self, + condition_code: ConditionCode, + ) -> Result, SourceError> { + // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring + // the EOF (cancel) PDU must result in abandonment of the transaction. + if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof { + if cond_code_eof != ConditionCode::NoError { + let tstate = self.tstate.as_ref().unwrap(); + // Still call the abandonment callback to ensure the fault is logged. + self.local_cfg + .fault_handler + .user_hook + .get_mut() + .abandoned_cb(tstate.transaction_id, cond_code_eof, self.fparams.progress); + self.abandon_transaction(); + return Ok(ControlFlow::Break(())); + } + } + + let tstate = self.tstate.as_mut().unwrap(); + tstate.cond_code_eof = Some(condition_code); + // As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply + // the checksum for the file copy progress sent so far. + let checksum = self.vfs.calculate_checksum( + self.put_request_cacher.source_file().unwrap(), + tstate.remote_cfg.default_crc_type, + self.fparams.progress, + self.pdu_and_cksum_buffer.get_mut(), + )?; + self.prepare_and_send_eof_pdu(checksum)?; + Ok(ControlFlow::Continue(())) + } + + fn notice_of_suspension(&mut self) {} + + fn abandon_transaction(&mut self) { + // I guess an abandoned transaction just stops whatever the handler is doing and resets + // it to a clean state.. The implementation for this is quite easy. + self.reset(); + } + + /* + def _notice_of_cancellation(self, condition_code: ConditionCode) -> bool: + """Returns whether the fault declaration handler can returns prematurely.""" + # CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring + # the EOF (cancel) PDU must result in abandonment of the transaction. + if ( + self._params.cond_code_eof is not None + and self._params.cond_code_eof != ConditionCode.NO_ERROR + ): + assert self._params.transaction_id is not None + # We still call the abandonment callback to ensure the fault is logged. + self.cfg.default_fault_handlers.abandoned_cb( + self._params.transaction_id, + self._params.cond_code_eof, + self._params.fp.progress, + ) + self._abandon_transaction() + return False + self._params.cond_code_eof = condition_code + # As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply + # the checksum for the file copy progress sent so far. + self._prepare_eof_pdu(self._checksum_calculation(self._params.fp.progress)) + self.states.step = TransactionStep.SENDING_EOF + return True + */ + /// This function is public to allow completely resetting the handler, but it is explicitely /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. /// Resetting the handler might interfere with these mechanisms and lead to unexpected @@ -882,6 +1066,7 @@ impl< self.state_helper = Default::default(); self.tstate = None; self.fparams = Default::default(); + self.countdown = None; } } @@ -907,7 +1092,8 @@ mod tests { filestore::NativeFilestore, request::PutRequestOwned, tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler}, - FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, CRC_32, + FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown, + StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32, }; use spacepackets::seq_count::SeqCountProviderSimple; @@ -927,6 +1113,8 @@ mod tests { TestFaultHandler, NativeFilestore, StdRemoteEntityConfigProvider, + StdTimerCreator, + StdCountdown, SeqCountProviderSimple, >; @@ -967,6 +1155,7 @@ mod tests { max_packet_len, crc_on_transmission_by_default, ), + StdTimerCreator::new(1), SeqCountProviderSimple::default(), ), srcfile_handle, diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index a8ad21d..0f5bbca 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -14,7 +14,7 @@ use cfdp::{ source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig, - StdCheckTimerCreator, TransactionId, UserFaultHookProvider, + StdTimerCreator, TransactionId, UserFaultHookProvider, }; use spacepackets::{ cfdp::{ChecksumType, ConditionCode, TransmissionMode}, @@ -202,6 +202,7 @@ fn end_to_end_test(with_closure: bool) { put_request_cacher, 2048, remote_cfg_of_dest, + StdTimerCreator::default(), seq_count_provider, ); let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source); @@ -225,7 +226,7 @@ fn end_to_end_test(with_closure: bool) { dest_tx, NativeFilestore::default(), remote_cfg_of_source, - StdCheckTimerCreator::default(), + StdTimerCreator::default(), ); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);