diff --git a/Cargo.toml b/Cargo.toml index 6b02234..9436268 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ alloc = [ "hashbrown", "spacepackets/alloc" ] -serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde"] +serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde", "heapless/serde"] defmt = ["dep:defmt", "spacepackets/defmt"] [dev-dependencies] diff --git a/examples/python-interop/main.rs b/examples/python-interop/main.rs index ef21d7b..b8f2f64 100644 --- a/examples/python-interop/main.rs +++ b/examples/python-interop/main.rs @@ -13,9 +13,10 @@ use std::{ use cfdp::{ EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider, - RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider, + RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHook, dest::DestinationHandler, filestore::NativeFilestore, + lost_segments::LostSegmentsList, request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, @@ -62,7 +63,7 @@ pub struct Cli { #[derive(Default)] pub struct ExampleFaultHandler {} -impl UserFaultHookProvider for ExampleFaultHandler { +impl UserFaultHook for ExampleFaultHandler { fn notice_of_suspension_cb( &mut self, transaction_id: TransactionId, @@ -261,7 +262,7 @@ impl UdpServer { while let Ok(tm) = receiver.try_recv() { debug!("Sending PDU: {:?}", tm); pdu_printout(&tm); - let result = self.socket.send_to(tm.pdu(), self.remote_addr()); + let result = self.socket.send_to(tm.raw_pdu(), self.remote_addr()); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } @@ -284,7 +285,7 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) { spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (), spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => { let meta_pdu = - MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed"); + MetadataPduReader::new(pdu.raw_pdu()).expect("creating metadata pdu failed"); debug!("Metadata PDU: {:?}", meta_pdu) } spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (), @@ -292,7 +293,8 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) { spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (), }, spacepackets::cfdp::PduType::FileData => { - let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed"); + let fd_pdu = + FileDataPdu::from_bytes(pdu.raw_pdu()).expect("creating file data pdu failed"); debug!("File data PDU: {:?}", fd_pdu); } } @@ -367,6 +369,7 @@ fn main() { NativeFilestore::default(), remote_cfg_python, StdTimerCreator::default(), + LostSegmentsList::default(), ); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving); diff --git a/src/dest.rs b/src/dest.rs index 5832f46..82e3cdb 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -30,7 +30,8 @@ //! 4. A Finished PDU has been sent back to the remote side. //! 5. A Finished PDU ACK was received. use crate::{ - DummyPduProvider, GenericSendError, PduProvider, PositiveAckParams, + DummyPduProvider, GenericSendError, IndicationConfig, PduProvider, PositiveAckParams, + lost_segments::{LostSegmentError, LostSegmentStore}, user::TransactionFinishedParams, }; use core::{ @@ -39,16 +40,15 @@ use core::{ }; use super::{ - CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, - RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TimerCreatorProvider, - TransactionId, UserFaultHookProvider, + Countdown, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, RemoteConfigStore, + RemoteEntityConfig, State, TimerContext, TimerCreator, TransactionId, UserFaultHook, filestore::{FilestoreError, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, }; use smallvec::SmallVec; use spacepackets::{ cfdp::{ - ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransactionStatus, + ChecksumType, ConditionCode, FaultHandlerCode, LargeFileFlag, PduType, TransactionStatus, TransmissionMode, pdu::{ CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, @@ -57,6 +57,7 @@ use spacepackets::{ file_data::FileDataPdu, finished::{DeliveryCode, FileStatus, FinishedPduCreator}, metadata::{MetadataGenericParams, MetadataPduReader}, + nak::NakPduCreatorWithReservedSeqReqsBuf, }, tlv::{EntityIdTlv, GenericTlv, ReadableTlv, TlvType, msg_to_user::MsgToUserTlv}, }, @@ -64,7 +65,7 @@ use spacepackets::{ }; #[derive(Debug)] -struct FileProperties { +struct FileNames { src_file_name: [u8; u8::MAX as usize], src_file_name_len: usize, dest_file_name: [u8; u8::MAX as usize], @@ -76,15 +77,33 @@ struct FileProperties { #[derive(Debug, Default, Clone, Copy)] pub struct AnomalyTracker { invalid_ack_directive_code: u8, + lost_segment_errors: u8, } impl AnomalyTracker { + #[inline] pub fn invalid_ack_directive_code(&mut self) -> u8 { self.invalid_ack_directive_code } + #[inline] + pub fn lost_segment_errors(&mut self) -> u8 { + self.lost_segment_errors + } + + #[inline] + pub fn increment_lost_segment_errors(&mut self) { + self.lost_segment_errors = self.lost_segment_errors.wrapping_add(1); + } + + #[inline] + pub fn increment_invalid_ack_directive_code(&mut self) { + self.invalid_ack_directive_code = self.invalid_ack_directive_code.wrapping_add(1); + } + + #[inline] pub fn reset(&mut self) { - self.invalid_ack_directive_code = 0; + *self = Default::default(); } } @@ -143,62 +162,45 @@ impl FinishedParams { self.file_status = FileStatus::Unreported; } } -// This contains transfer state parameters for destination transaction. + #[derive(Debug)] -struct TransferState { +pub struct AcknowledgedModeParams { + last_start_offset: u64, + last_end_offset: u64, + metadata_missing: bool, +} + +// This contains parameters for destination transaction. +#[derive(Debug)] +struct TransactionParams { + pdu_conf: CommonPduConfig, + file_names: FileNames, + msgs_to_user_size: usize, + // TODO: Should we make this configurable? + msgs_to_user_buf: [u8; 1024], + remote_cfg: Option, transaction_id: Option, metadata_params: MetadataGenericParams, + file_size: u64, progress: u64, - // TODO: Can we delete this for good? - // file_size_eof: u64, + acknowledged_mode_params: Option, finished_params: FinishedParams, - positive_ack_params: Option>, + positive_ack_params: Option>, metadata_only: bool, completion_disposition: Cell, checksum: u32, anomaly_tracker: AnomalyTracker, current_check_count: u32, - current_check_timer: Option, + current_check_timer: Option, } -impl Default for TransferState { - fn default() -> Self { - Self { - transaction_id: None, - metadata_params: Default::default(), - progress: Default::default(), - // file_size_eof: Default::default(), - metadata_only: false, - positive_ack_params: None, - finished_params: FinishedParams::default(), - completion_disposition: Cell::new(CompletionDisposition::Completed), - checksum: 0, - current_check_count: 0, - anomaly_tracker: AnomalyTracker::default(), - current_check_timer: None, - } - } -} - -// This contains parameters for destination transaction. -#[derive(Debug)] -struct TransactionParams { - transfer_state: TransferState, - pdu_conf: CommonPduConfig, - file_properties: FileProperties, - msgs_to_user_size: usize, - // TODO: Should we make this configurable? - msgs_to_user_buf: [u8; 1024], - remote_cfg: Option, -} - -impl TransactionParams { +impl TransactionParams { fn transmission_mode(&self) -> TransmissionMode { self.pdu_conf.trans_mode } } -impl Default for FileProperties { +impl Default for FileNames { fn default() -> Self { Self { src_file_name: [0; u8::MAX as usize], @@ -211,33 +213,45 @@ impl Default for FileProperties { } } -impl TransactionParams { +impl TransactionParams { fn file_size(&self) -> u64 { - self.transfer_state.metadata_params.file_size + self.metadata_params.file_size } fn metadata_params(&self) -> &MetadataGenericParams { - &self.transfer_state.metadata_params + &self.metadata_params } } -impl Default for TransactionParams { +impl Default for TransactionParams { fn default() -> Self { Self { pdu_conf: Default::default(), msgs_to_user_size: 0, + file_size: 0, msgs_to_user_buf: [0; 1024], - transfer_state: Default::default(), - file_properties: Default::default(), + file_names: Default::default(), remote_cfg: None, + transaction_id: None, + metadata_params: Default::default(), + progress: Default::default(), + acknowledged_mode_params: None, + metadata_only: false, + positive_ack_params: None, + finished_params: FinishedParams::default(), + completion_disposition: Cell::new(CompletionDisposition::Completed), + checksum: 0, + current_check_count: 0, + anomaly_tracker: AnomalyTracker::default(), + current_check_timer: None, } } } -impl TransactionParams { +impl TransactionParams { fn reset(&mut self) { - self.transfer_state.finished_params.reset(); - self.transfer_state.anomaly_tracker.reset(); + self.finished_params.reset(); + self.anomaly_tracker.reset(); } } @@ -252,10 +266,17 @@ pub enum DestError { pdu_type: PduType, directive_type: Option, }, - #[error("can not process file data PDUs in current state")] - WrongStateForFileData, - #[error("can not process EOF PDUs in current state")] - WrongStateForEof, + #[error("first packet must be metadata PDU for unacknowledged transfers")] + FirstPacketNotMetadata, + #[error( + "can not process PDU with type {pdu_type:?} and directive field {file_directive_type:?} in \ + current transaction step {step:?}" + )] + WrongStepForPdu { + step: TransactionStep, + pdu_type: PduType, + file_directive_type: Option, + }, // Received new metadata PDU while being already being busy with a file transfer. #[error("busy with transfer")] RecvdMetadataButIsBusy, @@ -263,21 +284,21 @@ pub enum DestError { EmptySrcFileField, #[error("empty dest file field")] EmptyDestFileField, - #[error("packets to be sent are still left")] - PacketToSendLeft, #[error("pdu error {0}")] Pdu(#[from] PduError), #[error("io error {0}")] #[cfg(feature = "std")] Io(#[from] std::io::Error), - #[error("file store error {0}")] + #[error("file store error: {0}")] Filestore(#[from] FilestoreError), + #[error("lost segment error: {0}")] + LostSegmentError(#[from] LostSegmentError), #[error("path conversion error {0}")] PathConversion(#[from] Utf8Error), #[error("error building dest path from source file name and dest folder")] PathConcat, #[error("no remote entity configuration found for {0:?}")] - NoRemoteCfgFound(UnsignedByteField), + NoRemoteConfigFound(UnsignedByteField), #[error("issue sending PDU: {0}")] SendError(#[from] GenericSendError), #[error("cfdp feature not implemented")] @@ -310,21 +331,23 @@ pub enum DestError { /// run them inside a thread pool, or move the newly created handler to a new thread.""" pub struct DestinationHandler< PduSender: PduSendProvider, - UserFaultHook: UserFaultHookProvider, + UserFaultHookInstance: UserFaultHook, Vfs: VirtualFilestore, - RemoteCfgTable: RemoteEntityConfigProvider, - CheckTimerCreator: TimerCreatorProvider, - CheckTimerProvider: CountdownProvider, + RemoteConfigStoreInstance: RemoteConfigStore, + TimerCreatorInstance: TimerCreator, + CountdownInstance: Countdown, + LostSegmentTracker: LostSegmentStore, > { - local_cfg: LocalEntityConfig, + local_cfg: LocalEntityConfig, step: core::cell::Cell, state: State, - transaction_params: TransactionParams, + transaction_params: TransactionParams, pdu_and_cksum_buffer: RefCell>, pub pdu_sender: PduSender, pub vfs: Vfs, - pub remote_cfg_table: RemoteCfgTable, - pub check_timer_creator: CheckTimerCreator, + pub remote_cfg_table: RemoteConfigStoreInstance, + pub check_timer_creator: TimerCreatorInstance, + lost_segment_tracker: LostSegmentTracker, } #[cfg(feature = "std")] @@ -332,19 +355,52 @@ pub type StdDestinationHandler = DestinationHandler< PduSender, UserFaultHook, crate::filestore::NativeFilestore, - crate::StdRemoteEntityConfigProvider, + crate::RemoteConfigStoreStd, crate::StdTimerCreator, crate::StdCountdown, + crate::lost_segments::LostSegmentsList, >; +#[cfg(feature = "std")] +impl + StdDestinationHandler +{ + #[cfg(feature = "std")] + pub fn new_std( + local_cfg: LocalEntityConfig, + pdu_and_cksum_buf_size: usize, + pdu_sender: PduSender, + ) -> Self { + Self::new( + local_cfg, + pdu_and_cksum_buf_size, + pdu_sender, + crate::filestore::NativeFilestore::default(), + crate::RemoteConfigStoreStd::default(), + crate::StdTimerCreator::default(), + crate::lost_segments::LostSegmentsList::default(), + ) + } +} + impl< PduSender: PduSendProvider, - UserFaultHook: UserFaultHookProvider, + UserFaultHookInstance: UserFaultHook, Vfs: VirtualFilestore, - RemoteCfgTable: RemoteEntityConfigProvider, - TimerCreator: TimerCreatorProvider, - Countdown: CountdownProvider, -> DestinationHandler + RemoteConfigStoreInstance: RemoteConfigStore, + TimerCreatorInstance: TimerCreator, + CountdownInstance: Countdown, + LostSegmentTracker: LostSegmentStore, +> + DestinationHandler< + PduSender, + UserFaultHookInstance, + Vfs, + RemoteConfigStoreInstance, + TimerCreatorInstance, + CountdownInstance, + LostSegmentTracker, + > { /// Constructs a new destination handler. /// @@ -367,12 +423,13 @@ impl< /// timers required by various tasks. This allows to use this handler for embedded systems /// where the standard time APIs might not be available. pub fn new( - local_cfg: LocalEntityConfig, + local_cfg: LocalEntityConfig, pdu_and_cksum_buf_size: usize, pdu_sender: PduSender, vfs: Vfs, - remote_cfg_table: RemoteCfgTable, - timer_creator: TimerCreator, + remote_cfg_table: RemoteConfigStoreInstance, + timer_creator: TimerCreatorInstance, + lost_segment_tracker: LostSegmentTracker, ) -> Self { Self { local_cfg, @@ -384,6 +441,7 @@ impl< vfs, remote_cfg_table, check_timer_creator: timer_creator, + lost_segment_tracker, } } @@ -461,7 +519,7 @@ impl< #[inline] pub fn transaction_id(&self) -> Option { - self.tstate().transaction_id + self.transaction_params.transaction_id } /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. @@ -478,12 +536,12 @@ impl< } #[inline] - pub fn local_cfg(&self) -> &LocalEntityConfig { + pub fn local_cfg(&self) -> &LocalEntityConfig { &self.local_cfg } #[inline] - pub fn local_cfg_mut(&mut self) -> &mut LocalEntityConfig { + pub fn local_cfg_mut(&mut self) -> &mut LocalEntityConfig { &mut self.local_cfg } @@ -519,11 +577,12 @@ impl< sent_packets += self.handle_file_directive( cfdp_user, packet_to_insert.file_directive_type().unwrap(), - packet_to_insert.pdu(), + packet_to_insert.raw_pdu(), )?; } PduType::FileData => { - self.handle_file_data(cfdp_user, packet_to_insert.pdu())?; + let fd_pdu = FileDataPdu::from_bytes(packet_to_insert.raw_pdu())?; + self.handle_file_data(cfdp_user, fd_pdu)?; } } Ok(sent_packets) @@ -538,7 +597,8 @@ impl< let mut sent_packets = 0; match pdu_directive { FileDirectiveType::EofPdu => { - sent_packets += self.handle_eof_pdu(cfdp_user, raw_packet)? + let eof_pdu = EofPdu::from_bytes(raw_packet)?; + sent_packets += self.handle_eof_pdu(cfdp_user, eof_pdu)? } FileDirectiveType::FinishedPdu | FileDirectiveType::NakPdu @@ -554,7 +614,7 @@ impl< } FileDirectiveType::MetadataPdu => { let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?; - self.handle_metadata_pdu(metadata_pdu)? + self.handle_metadata_pdu(metadata_pdu, self.step() == TransactionStep::Idle)? } FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?, }; @@ -564,55 +624,70 @@ impl< fn handle_ack_pdu(&mut self, ack_pdu: AckPdu) -> Result<(), DestError> { if ack_pdu.directive_code_of_acked_pdu() != FileDirectiveType::FinishedPdu { self.transaction_params - .transfer_state .anomaly_tracker - .invalid_ack_directive_code = self - .transaction_params - .transfer_state - .anomaly_tracker - .invalid_ack_directive_code - .wrapping_add(1); + .increment_invalid_ack_directive_code(); } // We are done. self.reset(); Ok(()) } - fn handle_metadata_pdu(&mut self, metadata_pdu: MetadataPduReader) -> Result<(), DestError> { + fn first_packet_handling(&mut self, pdu_config: &CommonPduConfig) -> Result<(), DestError> { + self.transaction_params.reset(); + let remote_cfg = self.remote_cfg_table.get(pdu_config.source_id().value()); + if remote_cfg.is_none() { + return Err(DestError::NoRemoteConfigFound(pdu_config.source_id())); + } + self.transaction_params.remote_cfg = Some(*remote_cfg.unwrap()); + self.transaction_params.transaction_id = Some(TransactionId::new( + pdu_config.source_id(), + pdu_config.transaction_seq_num, + )); + self.transaction_params.pdu_conf = *pdu_config; + self.transaction_params.pdu_conf.direction = spacepackets::cfdp::Direction::TowardsSender; + self.state = State::Busy; + Ok(()) + } + + fn handle_metadata_pdu( + &mut self, + metadata_pdu: MetadataPduReader, + first_packet: bool, + ) -> Result<(), DestError> { if self.state != State::Idle { return Err(DestError::RecvdMetadataButIsBusy); } - self.transaction_params.reset(); - self.transaction_params.transfer_state.metadata_params = *metadata_pdu.metadata_params(); + if first_packet { + self.first_packet_handling(metadata_pdu.pdu_header().common_pdu_conf())?; + } + + self.transaction_params.metadata_params = *metadata_pdu.metadata_params(); let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value()); if remote_cfg.is_none() { - return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id())); + return Err(DestError::NoRemoteConfigFound(metadata_pdu.dest_id())); } - self.transaction_params.remote_cfg = Some(*remote_cfg.unwrap()); - // TODO: Support for metadata only PDUs. let src_name = metadata_pdu.src_file_name(); let dest_name = metadata_pdu.dest_file_name(); if src_name.is_empty() && dest_name.is_empty() { - self.transaction_params.transfer_state.metadata_only = true; + self.transaction_params.metadata_only = true; } - if !self.transaction_params.transfer_state.metadata_only && src_name.is_empty() { + if !self.transaction_params.metadata_only && src_name.is_empty() { return Err(DestError::EmptySrcFileField); } - if !self.transaction_params.transfer_state.metadata_only && dest_name.is_empty() { + if !self.transaction_params.metadata_only && dest_name.is_empty() { return Err(DestError::EmptyDestFileField); } - if !self.transaction_params.transfer_state.metadata_only { - self.transaction_params.file_properties.src_file_name[..src_name.len_value()] + if !self.transaction_params.metadata_only { + self.transaction_params.file_names.src_file_name[..src_name.len_value()] .copy_from_slice(src_name.value()); - self.transaction_params.file_properties.src_file_name_len = src_name.len_value(); + self.transaction_params.file_names.src_file_name_len = src_name.len_value(); if dest_name.is_empty() { return Err(DestError::EmptyDestFileField); } - self.transaction_params.file_properties.dest_file_name[..dest_name.len_value()] + self.transaction_params.file_names.dest_file_name[..dest_name.len_value()] .copy_from_slice(dest_name.value()); - self.transaction_params.file_properties.dest_file_name_len = dest_name.len_value(); - self.transaction_params.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf(); + self.transaction_params.file_names.dest_file_name_len = dest_name.len_value(); self.transaction_params.msgs_to_user_size = 0; } if !metadata_pdu.options().is_empty() { @@ -627,39 +702,138 @@ impl< } } } - self.state = State::Busy; self.set_step(TransactionStep::TransactionStart); Ok(()) } + fn handle_file_data_without_previous_metadata( + &mut self, + fd_pdu: &FileDataPdu, + ) -> Result { + let mut packets_sent = 0; + if fd_pdu.transmission_mode() == TransmissionMode::Unacknowledged { + // In acknowledged mode, we need to wait for the metadata PDU first. + return Err(DestError::FirstPacketNotMetadata); + } + self.first_packet_handling(fd_pdu.pdu_header().common_pdu_conf())?; + self.transaction_params.progress = fd_pdu.offset() + fd_pdu.file_data().len() as u64; + self.transaction_params.acknowledged_mode_params = Some(AcknowledgedModeParams { + last_start_offset: 0, + last_end_offset: 0, + metadata_missing: true, + }); + if fd_pdu.file_data().len() as u64 > 0 { + self.lost_segment_tracker + .add_lost_segment((0, self.transaction_params.progress))?; + let ack_params = self + .transaction_params + .acknowledged_mode_params + .as_mut() + .unwrap(); + ack_params.last_start_offset = self.transaction_params.progress; + ack_params.last_end_offset = self.transaction_params.progress; + } + if self + .transaction_params + .remote_cfg + .as_ref() + .unwrap() + .immediate_nak_mode + { + let mut num_seg_reqs = 1; + if fd_pdu.file_data().len() as u64 > 0 { + num_seg_reqs += 1; + } + let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let mut nak_pdu = NakPduCreatorWithReservedSeqReqsBuf::new( + self.pdu_and_cksum_buffer.get_mut(), + pdu_header, + num_seg_reqs, + ) + .map_err(PduError::from)?; + let buf_mut = nak_pdu.segment_request_buffer_mut(); + let mut current_offset = 0; + let increment = if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large { + 8 + } else { + 4 + }; + buf_mut[0..current_offset].fill(0); + current_offset += increment; + buf_mut[current_offset..current_offset + increment].fill(0); + current_offset += increment; + if fd_pdu.file_data().len() as u64 > 0 { + buf_mut[0..current_offset].fill(0); + current_offset += increment; + if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large { + buf_mut[current_offset..current_offset + increment] + .copy_from_slice(&self.transaction_params.progress.to_be_bytes()); + } else { + buf_mut[current_offset..current_offset + increment] + .copy_from_slice(&(self.transaction_params.progress as u32).to_be_bytes()); + } + nak_pdu + .set_start_and_end_of_scope(0, self.transaction_params.progress) + .map_err(PduError::from)?; + } + let written_size = nak_pdu.finish(); + self.pdu_sender.send_file_directive_pdu( + FileDirectiveType::NakPdu, + &self.pdu_and_cksum_buffer.borrow()[0..written_size], + )?; + packets_sent += 1; + } + Ok(packets_sent) + } + fn handle_file_data( &mut self, user: &mut impl CfdpUser, - raw_packet: &[u8], - ) -> Result<(), DestError> { + fd_pdu: FileDataPdu, + ) -> Result { + let mut packets_sent = 0; + let mut handle_indication = |id: TransactionId, indication_config: &IndicationConfig| { + if indication_config.file_segment_recv { + user.file_segment_recvd_indication(&FileSegmentRecvdParams { + id, + offset: fd_pdu.offset(), + length: fd_pdu.file_data().len(), + segment_metadata: fd_pdu.segment_metadata(), + }); + } + }; let step = self.step.get(); - if self.state == State::Idle - || (step != TransactionStep::ReceivingFileDataPdus - && step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling) - { - return Err(DestError::WrongStateForFileData); - } - let fd_pdu = FileDataPdu::from_bytes(raw_packet)?; - if self.local_cfg.indication_cfg.file_segment_recv { - user.file_segment_recvd_indication(&FileSegmentRecvdParams { - id: self.tstate().transaction_id.unwrap(), - offset: fd_pdu.offset(), - length: fd_pdu.file_data().len(), - segment_metadata: fd_pdu.segment_metadata(), - }); + if self.state == State::Idle { + if step == TransactionStep::Idle { + packets_sent += self.handle_file_data_without_previous_metadata(&fd_pdu)?; + self.set_step(TransactionStep::WaitingForMetadata); + handle_indication( + self.transaction_id().unwrap(), + &self.local_cfg.indication_cfg, + ); + return Ok(packets_sent); + } + if step != TransactionStep::ReceivingFileDataPdus + && step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling + { + return Err(DestError::WrongStepForPdu { + pdu_type: PduType::FileData, + file_directive_type: None, + step, + }); + } } + handle_indication( + self.transaction_id().unwrap(), + &self.local_cfg.indication_cfg, + ); if let Err(e) = self.vfs.write_data( // Safety: It was already verified that the path is valid during the transaction start. unsafe { from_utf8_unchecked( //from_utf8( - &self.transaction_params.file_properties.dest_path_buf - [0..self.transaction_params.file_properties.dest_file_path_len], + &self.transaction_params.file_names.dest_path_buf + [0..self.transaction_params.file_names.dest_file_path_len], ) }, fd_pdu.offset(), @@ -672,57 +846,36 @@ impl< } return Err(e.into()); } - self.tstate_mut().progress += fd_pdu.file_data().len() as u64; - Ok(()) + self.transaction_params.progress += fd_pdu.file_data().len() as u64; + Ok(packets_sent) } fn handle_eof_pdu( &mut self, cfdp_user: &mut impl CfdpUser, - raw_packet: &[u8], + eof_pdu: EofPdu, ) -> Result { let mut sent_packets = 0; - if self.state == State::Idle || self.step() != TransactionStep::ReceivingFileDataPdus { - return Err(DestError::WrongStateForEof); - } - let eof_pdu = EofPdu::from_bytes(raw_packet)?; if self.local_cfg.indication_cfg.eof_recv { // Unwrap is okay here, application logic ensures that transaction ID is valid here. - cfdp_user.eof_recvd_indication( - self.transaction_params - .transfer_state - .transaction_id - .as_ref() - .unwrap(), - ); + cfdp_user + .eof_recvd_indication(self.transaction_params.transaction_id.as_ref().unwrap()); + } + if self.state == State::Idle { + if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { + return Err(DestError::WrongStepForPdu { + pdu_type: PduType::FileDirective, + file_directive_type: Some(FileDirectiveType::EofPdu), + step: self.step(), + }); + } + self.handle_eof_without_previous_metadata(&eof_pdu)?; + return Ok(sent_packets); } let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError { - self.handle_no_error_eof_pdu(&eof_pdu)? + self.handle_eof_no_error(&eof_pdu)? } else { - // This is an EOF (Cancel), perform Cancel Response Procedures according to chapter - // 4.6.6 of the standard. - self.trigger_notice_of_completion_cancelled( - eof_pdu.condition_code(), - EntityIdTlv::new(self.transaction_params.remote_cfg.unwrap().entity_id), - ); - self.transaction_params.transfer_state.progress = eof_pdu.file_size(); - if eof_pdu.file_size() > 0 { - self.transaction_params - .transfer_state - .finished_params - .delivery_code - .set(DeliveryCode::Incomplete); - } else { - self.transaction_params - .transfer_state - .finished_params - .delivery_code - .set(DeliveryCode::Complete); - } - // TODO: The cancel EOF also supplies a checksum and a progress number. We could cross - // check that checksum, but how would we deal with a checksum failure? The standard - // does not specify anything for this case.. It could be part of the status report - // issued to the user though. + self.handle_eof_cancel(&eof_pdu); true }; if regular_transfer_finish { @@ -731,30 +884,111 @@ impl< Ok(sent_packets) } + fn handle_eof_without_previous_metadata(&mut self, eof_pdu: &EofPdu) -> Result<(), DestError> { + self.transaction_params.progress = eof_pdu.file_size(); + self.transaction_params.file_size = eof_pdu.file_size(); + self.transaction_params.acknowledged_mode_params = Some(AcknowledgedModeParams { + last_start_offset: 0, + last_end_offset: 0, + metadata_missing: true, + }); + if self.transaction_params.progress > 0 { + self.lost_segment_tracker.reset(); + // Add the whole file to the lost segments map for now. + self.lost_segment_tracker + .add_lost_segment((0, eof_pdu.file_size()))?; + } + if eof_pdu.condition_code() != ConditionCode::NoError { + self.handle_eof_cancel(eof_pdu); + } + self.acknowledge_eof_pdu(eof_pdu)?; + if self.transaction_params.completion_disposition.get() == CompletionDisposition::Cancelled + { + self.set_step(TransactionStep::TransferCompletion); + return Ok(()); + } + if let Some(ack_params) = &self.transaction_params.acknowledged_mode_params { + if ack_params.metadata_missing || !self.lost_segment_tracker.is_empty() { + self.start_deferred_lost_segment_handling(); + return Ok(()); + } + } + self.set_step(TransactionStep::TransferCompletion); + Ok(()) + } + + fn handle_eof_cancel(&mut self, eof_pdu: &EofPdu) { + // This is an EOF (Cancel), perform Cancel Response Procedures according to chapter + // 4.6.6 of the standard. Set remote ID as fault location. + self.trigger_notice_of_completion_cancelled( + eof_pdu.condition_code(), + EntityIdTlv::new(self.transaction_params.remote_cfg.unwrap().entity_id), + ); + // Store this as progress for the checksum calculation as well. + self.transaction_params.progress = eof_pdu.file_size(); + if let Some(ack_params) = &self.transaction_params.acknowledged_mode_params { + if ack_params.metadata_missing { + return; + } + } + if self.transaction_params.progress == 0 { + // Empty file, no file data PDU. + self.transaction_params + .finished_params + .delivery_code + .set(DeliveryCode::Complete); + return; + } + if self.checksum_verify(self.transaction_params.progress, eof_pdu.file_checksum()) { + self.transaction_params + .finished_params + .delivery_code + .set(DeliveryCode::Complete); + return; + } + self.transaction_params + .finished_params + .delivery_code + .set(DeliveryCode::Incomplete); + } + + fn acknowledge_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<(), DestError> { + let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); + let ack_pdu = AckPdu::new_for_eof_pdu( + pdu_header, + eof_pdu.condition_code(), + TransactionStatus::Active, + ); + let written_len = ack_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?; + self.pdu_sender.send_file_directive_pdu( + FileDirectiveType::AckPdu, + &self.pdu_and_cksum_buffer.borrow()[0..written_len], + )?; + Ok(()) + } + + fn start_deferred_lost_segment_handling(&mut self) {} + fn trigger_notice_of_completion_cancelled( &self, cond_code: ConditionCode, fault_location: EntityIdTlv, ) { self.transaction_params - .transfer_state .completion_disposition .set(CompletionDisposition::Cancelled); self.transaction_params - .transfer_state .finished_params .condition_code .set(cond_code); self.transaction_params - .transfer_state .finished_params .fault_location_finished .set(Some(fault_location)); // For anything larger than 0, we'd have to do a checksum check to verify whether // the delivery is really complete, and we need the EOF checksum for that.. - if self.transaction_params.transfer_state.progress == 0 { + if self.transaction_params.progress == 0 { self.transaction_params - .transfer_state .finished_params .delivery_code .set(DeliveryCode::Complete); @@ -762,9 +996,9 @@ impl< } /// Returns whether the transfer can be completed regularly. - fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result { + fn handle_eof_no_error(&mut self, eof_pdu: &EofPdu) -> Result { // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size - if self.transaction_params.transfer_state.progress > eof_pdu.file_size() { + if self.transaction_params.progress > eof_pdu.file_size() { match self.declare_fault(ConditionCode::FileSizeError) { FaultHandlerCode::IgnoreError => (), FaultHandlerCode::AbandonTransaction => { @@ -775,7 +1009,7 @@ impl< return Ok(false); } } - } else if (self.transaction_params.transfer_state.progress < eof_pdu.file_size()) + } else if (self.transaction_params.progress < eof_pdu.file_size()) && self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged { // CFDP 4.6.4.3.1: The end offset of the last received file segment and the file @@ -787,18 +1021,24 @@ impl< // ) } - self.transaction_params.transfer_state.checksum = eof_pdu.file_checksum(); + self.transaction_params.checksum = eof_pdu.file_checksum(); if self.transaction_params.transmission_mode() == TransmissionMode::Unacknowledged - && !self.checksum_verify(self.transaction_params.transfer_state.checksum) + && !self.checksum_verify( + self.transaction_params.progress, + self.transaction_params.checksum, + ) { - if self.declare_fault(ConditionCode::FileChecksumFailure) - != FaultHandlerCode::IgnoreError - { - return Ok(false); - } self.start_check_limit_handling(); return Ok(false); } + self.transaction_params + .finished_params + .delivery_code + .set(DeliveryCode::Complete); + self.transaction_params + .finished_params + .condition_code + .set(ConditionCode::NoError); Ok(true) } @@ -809,11 +1049,7 @@ impl< let ack_pdu = AckPdu::new( pdu_header, FileDirectiveType::EofPdu, - self.transaction_params - .transfer_state - .finished_params - .condition_code - .get(), + self.transaction_params.finished_params.condition_code.get(), TransactionStatus::Active, ) .unwrap(); @@ -828,82 +1064,61 @@ impl< Ok(sent_packets) } - fn checksum_verify(&mut self, checksum: u32) -> bool { - let mut file_delivery_complete = false; + fn checksum_verify(&mut self, verify_len: u64, checksum: u32) -> bool { if self.transaction_params.metadata_params().checksum_type == ChecksumType::NullChecksum - || self.transaction_params.transfer_state.metadata_only + || self.transaction_params.metadata_only { - file_delivery_complete = true; - } else { - match self.vfs.checksum_verify( - checksum, - // Safety: It was already verified that the path is valid during the transaction start. - unsafe { - from_utf8_unchecked( - &self.transaction_params.file_properties.dest_path_buf - [0..self.transaction_params.file_properties.dest_file_path_len], - ) - }, - self.transaction_params.metadata_params().checksum_type, - self.transaction_params.transfer_state.progress, - self.pdu_and_cksum_buffer.get_mut(), - ) { - Ok(checksum_success) => { - file_delivery_complete = checksum_success; - if !checksum_success { - self.transaction_params - .transfer_state - .finished_params - .delivery_code - .set(DeliveryCode::Incomplete); - self.transaction_params - .transfer_state - .finished_params - .condition_code - .set(ConditionCode::FileChecksumFailure); - } + return true; + } + match self.vfs.checksum_verify( + checksum, + // Safety: It was already verified that the path is valid during the transaction start. + unsafe { + from_utf8_unchecked( + &self.transaction_params.file_names.dest_path_buf + [0..self.transaction_params.file_names.dest_file_path_len], + ) + }, + self.transaction_params.metadata_params().checksum_type, + verify_len, + self.pdu_and_cksum_buffer.get_mut(), + ) { + Ok(false) => { + if self.declare_fault(ConditionCode::FileChecksumFailure) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); } - Err(e) => match e { - FilestoreError::ChecksumTypeNotImplemented(_) => { - if self.declare_fault(ConditionCode::UnsupportedChecksumType) - == FaultHandlerCode::AbandonTransaction - { - self.abandon_transaction(); - } - // For this case, the applicable algorithm shall be the the null checksum, - // which is always succesful. - file_delivery_complete = true; + false + } + Ok(true) => true, + Err(e) => match e { + FilestoreError::ChecksumTypeNotImplemented(_) => { + if self.declare_fault(ConditionCode::UnsupportedChecksumType) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); } - _ => { - if self.declare_fault(ConditionCode::FilestoreRejection) - == FaultHandlerCode::AbandonTransaction - { - self.abandon_transaction(); - } - - // Treat this equivalent to a failed checksum procedure. + // For this case, the applicable algorithm shall be the the null checksum, + // which is always succesful. + true + } + _ => { + if self.declare_fault(ConditionCode::FilestoreRejection) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); } - }, - }; + // Treat this equivalent to a failed checksum procedure. + false + } + }, } - if file_delivery_complete { - self.transaction_params - .transfer_state - .finished_params - .delivery_code - .set(DeliveryCode::Complete); - self.transaction_params - .transfer_state - .finished_params - .condition_code - .set(ConditionCode::NoError); - } - file_delivery_complete } fn start_check_limit_handling(&mut self) { self.set_step(TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling); - self.transaction_params.transfer_state.current_check_timer = Some( + self.transaction_params.current_check_timer = Some( self.check_timer_creator .create_countdown(TimerContext::CheckLimit { local_id: self.local_cfg.id, @@ -911,31 +1126,35 @@ impl< entity_type: EntityType::Receiving, }), ); - self.transaction_params.transfer_state.current_check_count = 0; + self.transaction_params.current_check_count = 0; } - fn check_limit_handling(&mut self) -> Result { - if self - .transaction_params - .transfer_state - .current_check_timer - .is_none() - { - return Ok(0); + fn check_limit_handling(&mut self) -> Result<(), DestError> { + if self.transaction_params.current_check_timer.is_none() { + return Ok(()); } - let mut sent_packets = 0; let check_timer = self .transaction_params - .transfer_state .current_check_timer .as_ref() .unwrap(); if check_timer.has_expired() { - if self.checksum_verify(self.transaction_params.transfer_state.checksum) { - sent_packets += self.file_transfer_complete_transition()?; - return Ok(sent_packets); + if self.checksum_verify( + self.transaction_params.progress, + self.transaction_params.checksum, + ) { + self.transaction_params + .finished_params + .condition_code + .set(ConditionCode::NoError); + self.transaction_params + .finished_params + .delivery_code + .set(DeliveryCode::Complete); + self.set_step(TransactionStep::TransferCompletion); + return Ok(()); } - if self.transaction_params.transfer_state.current_check_count + 1 + if self.transaction_params.current_check_count + 1 >= self.transaction_params.remote_cfg.unwrap().check_limit { if self.declare_fault(ConditionCode::CheckLimitReached) @@ -944,16 +1163,15 @@ impl< self.abandon_transaction(); } } else { - self.transaction_params.transfer_state.current_check_count += 1; + self.transaction_params.current_check_count += 1; self.transaction_params - .transfer_state .current_check_timer .as_mut() .unwrap() .reset(); } } - Ok(sent_packets) + Ok(()) } fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> { @@ -973,7 +1191,7 @@ impl< } } if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { - sent_packets += self.check_limit_handling()?; + self.check_limit_handling()?; } if self.step() == TransactionStep::TransferCompletion { sent_packets += self.transfer_completion(cfdp_user)?; @@ -985,21 +1203,18 @@ impl< } fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + let id = self.transaction_id().unwrap(); let dest_name = from_utf8( - &self.transaction_params.file_properties.dest_file_name - [..self.transaction_params.file_properties.dest_file_name_len], + &self.transaction_params.file_names.dest_file_name + [..self.transaction_params.file_names.dest_file_name_len], )?; - self.transaction_params.file_properties.dest_path_buf[0..dest_name.len()] + self.transaction_params.file_names.dest_path_buf[0..dest_name.len()] .copy_from_slice(dest_name.as_bytes()); - self.transaction_params.file_properties.dest_file_path_len = dest_name.len(); + self.transaction_params.file_names.dest_file_path_len = dest_name.len(); let source_id = self.transaction_params.pdu_conf.source_id(); - let id = TransactionId::new( - source_id, - self.transaction_params.pdu_conf.transaction_seq_num, - ); let src_name = from_utf8( - &self.transaction_params.file_properties.src_file_name - [0..self.transaction_params.file_properties.src_file_name_len], + &self.transaction_params.file_names.src_file_name + [0..self.transaction_params.file_names.src_file_name_len], )?; let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new(); let mut num_msgs_to_user = 0; @@ -1023,7 +1238,6 @@ impl< dest_file_name: dest_name, msgs_to_user: &msgs_to_user[..num_msgs_to_user], }; - self.transaction_params.transfer_state.transaction_id = Some(id); cfdp_user.metadata_recvd_indication(&metadata_recvd_params); if self.vfs.exists(dest_name)? && self.vfs.is_dir(dest_name)? { @@ -1037,25 +1251,22 @@ impl< return Err(DestError::PathConcat); } let source_name = source_file_name.unwrap(); - self.transaction_params.file_properties.dest_path_buf[dest_name.len()] = b'/'; - self.transaction_params.file_properties.dest_path_buf + self.transaction_params.file_names.dest_path_buf[dest_name.len()] = b'/'; + self.transaction_params.file_names.dest_path_buf [dest_name.len() + 1..dest_name.len() + 1 + source_name.len()] .copy_from_slice(source_name.as_bytes()); - self.transaction_params.file_properties.dest_file_path_len += 1 + source_name.len(); + self.transaction_params.file_names.dest_file_path_len += 1 + source_name.len(); } let dest_path_str = from_utf8( - &self.transaction_params.file_properties.dest_path_buf - [0..self.transaction_params.file_properties.dest_file_path_len], + &self.transaction_params.file_names.dest_path_buf + [0..self.transaction_params.file_names.dest_file_path_len], )?; if self.vfs.exists(dest_path_str)? { self.vfs.truncate_file(dest_path_str)?; } else { self.vfs.create_file(dest_path_str)?; } - self.transaction_params - .transfer_state - .finished_params - .file_status = FileStatus::Retained; + self.transaction_params.finished_params.file_status = FileStatus::Retained; drop(msgs_to_user); self.set_step(TransactionStep::ReceivingFileDataPdus); Ok(()) @@ -1079,7 +1290,7 @@ impl< } fn start_positive_ack_procedure(&mut self) { - self.transaction_params.transfer_state.positive_ack_params = Some(PositiveAckParams { + self.transaction_params.positive_ack_params = Some(PositiveAckParams { ack_timer: self .check_timer_creator .create_countdown(TimerContext::PositiveAck { @@ -1096,12 +1307,7 @@ impl< fn handle_positive_ack_procedures(&mut self) -> Result { // 1) Do we have positive-ack params? - let params = match self - .transaction_params - .transfer_state - .positive_ack_params - .as_mut() - { + let params = match self.transaction_params.positive_ack_params.as_mut() { Some(p) => p, None => return Ok(0), }; @@ -1136,7 +1342,8 @@ impl< } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { - if self.tstate().completion_disposition.get() == CompletionDisposition::Completed { + if self.transaction_params.completion_disposition.get() == CompletionDisposition::Completed + { // TODO: Execute any filestore requests } else if self .transaction_params @@ -1144,21 +1351,22 @@ impl< .as_ref() .unwrap() .disposition_on_cancellation - && self.tstate().finished_params.delivery_code.get() == DeliveryCode::Incomplete + && self.transaction_params.finished_params.delivery_code.get() + == DeliveryCode::Incomplete { // Safety: We already verified that the path is valid during the transaction start. let dest_path = unsafe { from_utf8_unchecked( - &self.transaction_params.file_properties.dest_path_buf - [0..self.transaction_params.file_properties.dest_file_path_len], + &self.transaction_params.file_names.dest_path_buf + [0..self.transaction_params.file_names.dest_file_path_len], ) }; if self.vfs.exists(dest_path)? && self.vfs.is_file(dest_path)? { self.vfs.remove_file(dest_path)?; } - self.tstate_mut().finished_params.file_status = FileStatus::DiscardDeliberately; + self.transaction_params.finished_params.file_status = FileStatus::DiscardDeliberately; } - let tstate = self.tstate(); + let tstate = &self.transaction_params; let transaction_finished_params = TransactionFinishedParams { id: tstate.transaction_id.unwrap(), condition_code: tstate.finished_params.condition_code.get(), @@ -1173,8 +1381,8 @@ impl< // must call [Self::abandon_transaction] as soon as it is possible. fn declare_fault(&self, condition_code: ConditionCode) -> FaultHandlerCode { // Cache those, because they might be reset when abandoning the transaction. - let transaction_id = self.tstate().transaction_id.unwrap(); - let progress = self.tstate().progress; + let transaction_id = self.transaction_id().unwrap(); + let progress = self.transaction_params.progress; let fh_code = self .local_cfg .fault_handler @@ -1194,7 +1402,7 @@ impl< fn notice_of_cancellation(&self, condition_code: ConditionCode, fault_location: EntityIdTlv) { self.set_step_internal(TransactionStep::TransferCompletion); - let tstate = self.tstate(); + let tstate = &self.transaction_params; tstate.finished_params.condition_code.set(condition_code); tstate .completion_disposition @@ -1224,7 +1432,7 @@ impl< } fn send_finished_pdu(&mut self) -> Result { - let tstate = self.tstate(); + let tstate = &self.transaction_params; let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); let finished_pdu = if tstate.finished_params.condition_code.get() == ConditionCode::NoError @@ -1252,16 +1460,6 @@ impl< )?; Ok(1) } - - #[inline] - fn tstate(&self) -> &TransferState { - &self.transaction_params.transfer_state - } - - #[inline] - fn tstate_mut(&mut self) -> &mut TransferState { - &mut self.transaction_params.transfer_state - } } #[cfg(test)] @@ -1286,8 +1484,9 @@ mod tests { }; use crate::{ - CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, + CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd, filestore::NativeFilestore, + lost_segments::LostSegmentsList, tests::{ LOCAL_ID, REMOTE_ID, SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, TestCheckTimerCreator, TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table, @@ -1300,9 +1499,10 @@ mod tests { TestCfdpSender, TestFaultHandler, NativeFilestore, - StdRemoteEntityConfigProvider, + RemoteConfigStoreStd, TestCheckTimerCreator, TestCheckTimer, + LostSegmentsList, >; struct DestHandlerTestbench { @@ -1448,11 +1648,9 @@ mod tests { let result = self.handler.state_machine(user, Some(&packet_info)); if self.indication_cfg().file_segment_recv { assert!(!user.file_seg_recvd_queue.is_empty()); - assert_eq!(user.file_seg_recvd_queue.back().unwrap().offset, offset); - assert_eq!( - user.file_seg_recvd_queue.back().unwrap().length, - file_data_chunk.len() - ); + let file_seg = user.file_seg_recvd_queue.pop_front().unwrap(); + assert_eq!(file_seg.offset, offset); + assert_eq!(file_seg.length, file_data_chunk.len()); } result } @@ -1496,7 +1694,17 @@ mod tests { // Specifying some checks in the drop method avoids some boilerplate. impl Drop for DestHandlerTestbench { fn drop(&mut self) { - assert!(self.all_fault_queues_empty()); + if !self.all_fault_queues_empty() { + let fh_queues = self.handler.local_cfg.user_fault_hook().borrow(); + println!( + "fault queues not empyt. cancellation {}, suspension {}, ignored {}, abandon {}", + fh_queues.notice_of_cancellation_queue.len(), + fh_queues.notice_of_suspension_queue.len(), + fh_queues.ignored_queue.len(), + fh_queues.abandoned_queue.len() + ); + } + assert!(self.all_fault_queues_empty(), "fault queues not empty, "); assert!(self.handler.pdu_sender.queue_empty()); if self.check_handler_idle_at_drop { self.state_check(State::Idle, TransactionStep::Idle); @@ -1538,6 +1746,7 @@ mod tests { NativeFilestore::default(), basic_remote_cfg_table(LOCAL_ID, 1024, true), TestCheckTimerCreator::new(expiry_control), + LostSegmentsList::default(), ) } @@ -1634,6 +1843,7 @@ mod tests { tb.generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); + assert!(test_user.indication_queues_empty()); } #[test] @@ -1653,6 +1863,7 @@ mod tests { tb.generic_eof_no_error(&mut test_user, file_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); + assert!(test_user.indication_queues_empty()); } #[test] @@ -1680,6 +1891,7 @@ mod tests { tb.generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); + assert!(test_user.indication_queues_empty()); } #[test] @@ -1702,30 +1914,34 @@ mod tests { .expect("file data insertion 0 failed"); tb.generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); + + // Checksum failure. + let mut fault_handler = tb.handler.local_cfg.fault_handler.user_hook.borrow_mut(); + assert_eq!(fault_handler.ignored_queue.len(), 1); + let cancelled = fault_handler.ignored_queue.pop_front().unwrap(); + assert_eq!(cancelled.0, transaction_id); + assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); + assert_eq!(cancelled.2, segment_len as u64); + + drop(fault_handler); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); + tb.set_check_timer_expired(); + tb.generic_file_data_insert( &mut test_user, segment_len as u64, &random_data[segment_len..], ) .expect("file data insertion 1 failed"); - tb.set_check_timer_expired(); tb.handler .state_machine_no_packet(&mut test_user) .expect("fsm failure"); - let mut fault_handler = tb.handler.local_cfg.fault_handler.user_hook.borrow_mut(); - - assert_eq!(fault_handler.ignored_queue.len(), 1); - let cancelled = fault_handler.ignored_queue.pop_front().unwrap(); - assert_eq!(cancelled.0, transaction_id); - assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); - assert_eq!(cancelled.2, segment_len as u64); - drop(fault_handler); tb.check_completion_indication_success(&mut test_user); + assert!(test_user.indication_queues_empty()); } #[test] @@ -1735,6 +1951,17 @@ mod tests { rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; + let check_checksum_failure = + |testbench: &mut DestHandlerTestbench, transaction_id: TransactionId| { + let mut fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow_mut(); + assert!(fault_hook.notice_of_suspension_queue.is_empty()); + let ignored_queue = &mut fault_hook.ignored_queue; + assert_eq!(ignored_queue.len(), 1); + let ignored = ignored_queue.pop_front().unwrap(); + assert_eq!(ignored.0, transaction_id); + assert_eq!(ignored.1, ConditionCode::FileChecksumFailure); + assert_eq!(ignored.2, segment_len as u64); + }; let fault_handler = TestFaultHandler::default(); let mut testbench = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); @@ -1750,10 +1977,13 @@ mod tests { testbench .generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); + check_checksum_failure(&mut testbench, transaction_id); + testbench.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); + testbench.set_check_timer_expired(); testbench .handler @@ -1763,30 +1993,36 @@ mod tests { State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); + check_checksum_failure(&mut testbench, transaction_id); + testbench.set_check_timer_expired(); testbench .handler .state_machine_no_packet(&mut test_user) .expect("fsm error"); + check_checksum_failure(&mut testbench, transaction_id); + testbench.state_check(State::Idle, TransactionStep::Idle); - let mut fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow_mut(); - assert!(fault_hook.notice_of_suspension_queue.is_empty()); - let ignored_queue = &mut fault_hook.ignored_queue; - assert_eq!(ignored_queue.len(), 1); - let cancelled = ignored_queue.pop_front().unwrap(); - assert_eq!(cancelled.0, transaction_id); - assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); - assert_eq!(cancelled.2, segment_len as u64); + { + let mut fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow_mut(); + let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue; + assert_eq!(cancelled_queue.len(), 1); + let cancelled = cancelled_queue.pop_front().unwrap(); + assert_eq!(cancelled.0, transaction_id); + assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); + assert_eq!(cancelled.2, segment_len as u64); + } - let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue; - assert_eq!(cancelled_queue.len(), 1); - let cancelled = cancelled_queue.pop_front().unwrap(); - assert_eq!(cancelled.0, transaction_id); - assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); - assert_eq!(cancelled.2, segment_len as u64); - - drop(fault_hook); + assert_eq!(test_user.finished_indic_queue.len(), 1); + let finished_indication = test_user.finished_indic_queue.pop_front().unwrap(); + assert_eq!(finished_indication.id, transaction_id); + assert_eq!( + finished_indication.condition_code, + ConditionCode::CheckLimitReached + ); + assert_eq!(finished_indication.delivery_code, DeliveryCode::Incomplete); + assert_eq!(finished_indication.file_status, FileStatus::Retained); assert!(testbench.handler.pdu_sender.queue_empty()); @@ -1797,6 +2033,7 @@ mod tests { assert_eq!(read_content.len(), segment_len); assert_eq!(read_content, &random_data[0..segment_len]); assert!(fs::remove_file(testbench.dest_path().as_path()).is_ok()); + assert!(test_user.indication_queues_empty()); } fn check_finished_pdu_success(sent_pdu: &SentPdu) { @@ -1832,6 +2069,7 @@ mod tests { let sent_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); check_finished_pdu_success(&sent_pdu); tb.check_completion_indication_success(&mut test_user); + assert!(test_user.indication_queues_empty()); } #[test] @@ -1964,6 +2202,19 @@ mod tests { finished_pdu.condition_code(), ConditionCode::CheckLimitReached ); + + let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); + let ignored_queue = &mut fault_hook.ignored_queue; + assert_eq!(ignored_queue.len(), 2); + let mut ignored = ignored_queue.pop_front().unwrap(); + assert_eq!(ignored.0, transaction_id); + assert_eq!(ignored.1, ConditionCode::FileChecksumFailure); + assert_eq!(ignored.2, file_size); + ignored = ignored_queue.pop_front().unwrap(); + assert_eq!(ignored.0, transaction_id); + assert_eq!(ignored.1, ConditionCode::FileChecksumFailure); + assert_eq!(ignored.2, file_size); + assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert!(finished_pdu.fault_location().is_some()); assert_eq!( @@ -1972,9 +2223,29 @@ mod tests { ); assert_eq!(finished_pdu.fs_responses_raw(), &[]); assert!(tb.handler.pdu_sender.queue_empty()); + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CheckLimitReached, + transaction_id, + ); tb.expected_full_data = faulty_file_data.to_vec(); } + fn verify_finished_indication( + user: &mut TestCfdpUser, + delivery_code: DeliveryCode, + cond_code: ConditionCode, + id: TransactionId, + ) { + assert_eq!(user.finished_indic_queue.len(), 1); + let finished_indication = user.finished_indic_queue.pop_front().unwrap(); + assert_eq!(finished_indication.id, id); + assert_eq!(finished_indication.condition_code, cond_code); + assert_eq!(finished_indication.delivery_code, delivery_code); + assert_eq!(finished_indication.file_status, FileStatus::Retained); + } + #[test] fn test_file_copy_to_directory() { let fault_handler = TestFaultHandler::default(); @@ -1990,21 +2261,22 @@ mod tests { ); dest_path_buf.push(src_path.file_name().unwrap()); tb.dest_path = dest_path_buf; - let mut test_user = tb.test_user_from_cached_paths(0); - tb.generic_transfer_init(&mut test_user, 0) + let mut user = tb.test_user_from_cached_paths(0); + tb.generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.generic_eof_no_error(&mut test_user, Vec::new()) + tb.generic_eof_no_error(&mut user, Vec::new()) .expect("EOF no error insertion failed"); - tb.check_completion_indication_success(&mut test_user); + tb.check_completion_indication_success(&mut user); } #[test] fn test_tranfer_cancellation_empty_file_with_eof_pdu() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); - let mut test_user = tb.test_user_from_cached_paths(0); - tb.generic_transfer_init(&mut test_user, 0) + let mut user = tb.test_user_from_cached_paths(0); + let id = tb + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let cancel_eof = EofPdu::new( @@ -2017,28 +2289,38 @@ mod tests { let packets = tb .handler .state_machine( - &mut test_user, + &mut user, Some(&PduRawWithInfo::new(&cancel_eof.to_vec().unwrap()).unwrap()), ) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CancelRequestReceived, + id, + ); } - fn generic_tranfer_cancellation_partial_file_with_eof_pdu(with_closure: bool) { + fn generic_tranfer_cancellation_partial_file_with_eof_pdu( + with_closure: bool, + insert_packet: bool, + ) { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = 5; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, with_closure); - let mut test_user = tb.test_user_from_cached_paths(file_size); - tb.generic_transfer_init(&mut test_user, file_size) + let mut user = tb.test_user_from_cached_paths(file_size); + let id = tb + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.generic_file_data_insert(&mut test_user, 0, &file_data[0..5]) - .expect("file data insertion failed"); - // Checksum is currently ignored on remote side.. we still supply it, according to the - // standard. + if insert_packet { + tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) + .expect("file data insertion failed"); + } let mut digest = CRC_32.digest(); digest.update(&file_data[0..5]); let checksum = digest.finalize(); @@ -2052,7 +2334,7 @@ mod tests { let packets = tb .handler .state_machine( - &mut test_user, + &mut user, Some(&PduRawWithInfo::new(&cancel_eof.to_vec().unwrap()).unwrap()), ) .expect("state machine call with EOF insertion failed"); @@ -2069,7 +2351,32 @@ mod tests { finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); - assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); + if insert_packet { + // Checksum success, so data is complete. + assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CancelRequestReceived, + id, + ); + } else { + // Checksum failure, so data is incomplete. + assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); + tb.check_dest_file = false; + let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); + let ignored_queue = &mut fault_hook.ignored_queue; + let ignored = ignored_queue.pop_front().unwrap(); + assert_eq!(ignored.0, id); + assert_eq!(ignored.1, ConditionCode::FileChecksumFailure); + assert_eq!(ignored.2, 5); + verify_finished_indication( + &mut user, + DeliveryCode::Incomplete, + ConditionCode::CancelRequestReceived, + id, + ); + } assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!( finished_pdu @@ -2078,6 +2385,12 @@ mod tests { EntityIdTlv::new(LOCAL_ID.into()) ); } else { + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CancelRequestReceived, + id, + ); assert_eq!(packets, 0); } tb.expected_file_size = file_size; @@ -2085,28 +2398,40 @@ mod tests { } #[test] - fn test_tranfer_cancellation_partial_file_with_eof_pdu_no_closure() { - generic_tranfer_cancellation_partial_file_with_eof_pdu(false); + fn test_tranfer_cancellation_partial_file_with_eof_pdu_no_closure_complete() { + generic_tranfer_cancellation_partial_file_with_eof_pdu(false, true); } + #[test] - fn test_tranfer_cancellation_partial_file_with_eof_pdu_with_closure() { - generic_tranfer_cancellation_partial_file_with_eof_pdu(true); + fn test_tranfer_cancellation_partial_file_with_eof_pdu_with_closure_complete() { + generic_tranfer_cancellation_partial_file_with_eof_pdu(true, true); + } + + #[test] + fn test_tranfer_cancellation_partial_file_with_eof_pdu_with_closure_incomplete() { + generic_tranfer_cancellation_partial_file_with_eof_pdu(true, false); } #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, false); - let mut test_user = tb.test_user_from_cached_paths(0); - let transaction_id = tb - .generic_transfer_init(&mut test_user, 0) + let mut user = tb.test_user_from_cached_paths(0); + let id = tb + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.handler.cancel_request(&transaction_id); + tb.handler.cancel_request(&id); let packets = tb .handler - .state_machine_no_packet(&mut test_user) + .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CancelRequestReceived, + id, + ); assert_eq!(packets, 0); } @@ -2114,15 +2439,15 @@ mod tests { fn test_tranfer_cancellation_empty_file_with_cancel_api_and_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); - let mut test_user = tb.test_user_from_cached_paths(0); - let transaction_id = tb - .generic_transfer_init(&mut test_user, 0) + let mut user = tb.test_user_from_cached_paths(0); + let id = tb + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.handler.cancel_request(&transaction_id); + tb.handler.cancel_request(&id); let packets = tb .handler - .state_machine_no_packet(&mut test_user) + .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 1); let next_pdu = tb.get_next_pdu().unwrap(); @@ -2144,6 +2469,12 @@ mod tests { finished_pdu.fault_location(), Some(EntityIdTlv::new(REMOTE_ID.into())) ); + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CancelRequestReceived, + id, + ); } #[test] @@ -2154,18 +2485,18 @@ mod tests { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths(fault_handler, true); - let mut test_user = tb.test_user_from_cached_paths(file_size); - let transaction_id = tb - .generic_transfer_init(&mut test_user, file_size) + let mut user = tb.test_user_from_cached_paths(file_size); + let id = tb + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.generic_file_data_insert(&mut test_user, 0, &file_data[0..5]) + tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) .expect("file data insertion failed"); - tb.handler.cancel_request(&transaction_id); + tb.handler.cancel_request(&id); let packets = tb .handler - .state_machine_no_packet(&mut test_user) + .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 1); let next_pdu = tb.get_next_pdu().unwrap(); @@ -2188,6 +2519,12 @@ mod tests { ); tb.expected_file_size = file_size; tb.expected_full_data = file_data[0..file_size as usize].to_vec(); + verify_finished_indication( + &mut user, + DeliveryCode::Incomplete, + ConditionCode::CancelRequestReceived, + id, + ); } // Only incomplete received files will be removed. @@ -2201,18 +2538,24 @@ mod tests { .get_mut(LOCAL_ID.value()) .unwrap(); remote_cfg_mut.disposition_on_cancellation = true; - let mut test_user = tb.test_user_from_cached_paths(0); - let transaction_id = tb - .generic_transfer_init(&mut test_user, 0) + let mut user = tb.test_user_from_cached_paths(0); + let id = tb + .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.handler.cancel_request(&transaction_id); + tb.handler.cancel_request(&id); let packets = tb .handler - .state_machine_no_packet(&mut test_user) + .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); + verify_finished_indication( + &mut user, + DeliveryCode::Complete, + ConditionCode::CancelRequestReceived, + id, + ); } #[test] @@ -2230,18 +2573,18 @@ mod tests { .get_mut(LOCAL_ID.value()) .unwrap(); remote_cfg_mut.disposition_on_cancellation = true; - let mut test_user = tb.test_user_from_cached_paths(file_size); + let mut user = tb.test_user_from_cached_paths(file_size); let transaction_id = tb - .generic_transfer_init(&mut test_user, file_size) + .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); - tb.generic_file_data_insert(&mut test_user, 0, &file_data[0..5]) + tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) .expect("file data insertion failed"); tb.handler.cancel_request(&transaction_id); let packets = tb .handler - .state_machine_no_packet(&mut test_user) + .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); // File was disposed. diff --git a/src/lib.rs b/src/lib.rs index 4854429..51e6f41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,7 +100,7 @@ pub mod source; pub mod time; pub mod user; -use crate::time::CountdownProvider; +use crate::time::Countdown; use core::{cell::RefCell, fmt::Debug, hash::Hash}; use crc::{CRC_32_ISCSI, CRC_32_ISO_HDLC, Crc}; @@ -181,8 +181,8 @@ pub enum TimerContext { /// The timer will be used to perform the Positive Acknowledgement Procedures as specified in /// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer /// interval of the remote entity configuration. -pub trait TimerCreatorProvider { - type Countdown: CountdownProvider; +pub trait TimerCreator { + type Countdown: Countdown; fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown; } @@ -294,16 +294,23 @@ impl RemoteEntityConfig { } } -pub trait RemoteEntityConfigProvider { +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum RemoteConfigStoreError { + #[error("store is full")] + Full, +} + +pub trait RemoteConfigStore { /// Retrieve the remote entity configuration for the given remote ID. fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>; + fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>; + /// Add a new remote configuration. Return [true] if the configuration was /// inserted successfully, and [false] if a configuration already exists. - fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool; - /// Remote a configuration. Returns [true] if the configuration was removed successfully, - /// and [false] if no configuration exists for the given remote ID. - fn remove_config(&mut self, remote_id: u64) -> bool; + fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result; } /// This is a thin wrapper around a [hashbrown::HashMap] to store remote entity configurations. @@ -311,20 +318,26 @@ pub trait RemoteEntityConfigProvider { #[cfg(feature = "alloc")] #[derive(Default, Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct StdRemoteEntityConfigProvider(pub hashbrown::HashMap); +pub struct RemoteConfigStoreStd(pub hashbrown::HashMap); #[cfg(feature = "std")] -impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider { +impl RemoteConfigStore for RemoteConfigStoreStd { fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { self.0.get(&remote_id) } + fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> { self.0.get_mut(&remote_id) } - fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool { - self.0.insert(cfg.entity_id.value(), *cfg).is_some() + + fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result { + Ok(self.0.insert(cfg.entity_id.value(), *cfg).is_some()) } - fn remove_config(&mut self, remote_id: u64) -> bool { +} + +#[cfg(feature = "std")] +impl RemoteConfigStoreStd { + pub fn remove_config(&mut self, remote_id: u64) -> bool { self.0.remove(&remote_id).is_some() } } @@ -335,10 +348,10 @@ impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider { #[derive(Default, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct VecRemoteEntityConfigProvider(pub alloc::vec::Vec); +pub struct RemoteConfigList(pub alloc::vec::Vec); #[cfg(feature = "alloc")] -impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider { +impl RemoteConfigStore for RemoteConfigList { fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { self.0 .iter() @@ -351,12 +364,19 @@ impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider { .find(|cfg| cfg.entity_id.value() == remote_id) } - fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool { + fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result { + for other_cfg in self.0.iter() { + if cfg.entity_id.value() == other_cfg.entity_id.value() { + return Ok(false); + } + } self.0.push(*cfg); - true + Ok(true) } +} - fn remove_config(&mut self, remote_id: u64) -> bool { +impl RemoteConfigList { + pub fn remove_config(&mut self, remote_id: u64) -> bool { for (idx, cfg) in self.0.iter().enumerate() { if cfg.entity_id.value() == remote_id { self.0.remove(idx); @@ -367,10 +387,55 @@ impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider { } } -/// A remote entity configurations also implements the [RemoteEntityConfigProvider], but the -/// [RemoteEntityConfigProvider::add_config] and [RemoteEntityConfigProvider::remove_config] -/// are no-ops and always returns [false]. -impl RemoteEntityConfigProvider for RemoteEntityConfig { +/// This is a thin wrapper around a [alloc::vec::Vec] to store remote entity configurations. +/// It implements the full [RemoteEntityConfigProvider] trait. +#[derive(Default, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct RemoteConfigListHeapless(pub heapless::vec::Vec); + +impl RemoteConfigStore for RemoteConfigListHeapless { + fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { + self.0 + .iter() + .find(|&cfg| cfg.entity_id.value() == remote_id) + } + + fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> { + self.0 + .iter_mut() + .find(|cfg| cfg.entity_id.value() == remote_id) + } + + fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result { + if self.0.is_full() { + return Err(RemoteConfigStoreError::Full); + } + for other_cfg in self.0.iter() { + if cfg.entity_id.value() == other_cfg.entity_id.value() { + return Ok(false); + } + } + self.0.push(*cfg).unwrap(); + Ok(true) + } +} + +impl RemoteConfigListHeapless { + pub fn remove_config(&mut self, remote_id: u64) -> bool { + for (idx, cfg) in self.0.iter().enumerate() { + if cfg.entity_id.value() == remote_id { + self.0.remove(idx); + return true; + } + } + false + } +} + +/// A remote entity configurations also implements the [RemoteConfigStore], but the +/// [RemoteConfigStore::add_config] always returns [RemoteConfigStoreError::Full]. +impl RemoteConfigStore for RemoteEntityConfig { fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { if remote_id == self.entity_id.value() { return Some(self); @@ -385,12 +450,8 @@ impl RemoteEntityConfigProvider for RemoteEntityConfig { None } - fn add_config(&mut self, _cfg: &RemoteEntityConfig) -> bool { - false - } - - fn remove_config(&mut self, _remote_id: u64) -> bool { - false + fn add_config(&mut self, _cfg: &RemoteEntityConfig) -> Result { + Err(RemoteConfigStoreError::Full) } } @@ -404,7 +465,7 @@ impl RemoteEntityConfigProvider for RemoteEntityConfig { /// /// For each error reported by the [FaultHandler], the appropriate fault handler callback /// will be called depending on the [FaultHandlerCode]. -pub trait UserFaultHookProvider { +pub trait UserFaultHook { fn notice_of_suspension_cb( &mut self, transaction_id: TransactionId, @@ -429,7 +490,7 @@ pub trait UserFaultHookProvider { #[derive(Default, Debug, PartialEq, Eq, Copy, Clone)] pub struct DummyFaultHook {} -impl UserFaultHookProvider for DummyFaultHook { +impl UserFaultHook for DummyFaultHook { fn notice_of_suspension_cb( &mut self, _transaction_id: TransactionId, @@ -477,14 +538,14 @@ impl UserFaultHookProvider for DummyFaultHook { /// These defaults can be overriden by using the [Self::set_fault_handler] method. /// Please note that in any case, fault handler overrides can be specified by the sending CFDP /// entity. -pub struct FaultHandler { +pub struct FaultHandler { handler_array: [FaultHandlerCode; 10], // Could also change the user fault handler trait to have non mutable methods, but that limits // flexbility on the user side.. pub user_hook: RefCell, } -impl FaultHandler { +impl FaultHandler { fn condition_code_to_array_index(conditon_code: ConditionCode) -> Option { Some(match conditon_code { ConditionCode::PositiveAckLimitReached => 0, @@ -590,17 +651,17 @@ impl Default for IndicationConfig { } /// Each CFDP entity handler has a [LocalEntityConfig]uration. -pub struct LocalEntityConfig { +pub struct LocalEntityConfig { pub id: UnsignedByteField, pub indication_cfg: IndicationConfig, - pub fault_handler: FaultHandler, + pub fault_handler: FaultHandler, } -impl LocalEntityConfig { +impl LocalEntityConfig { pub fn new( id: UnsignedByteField, indication_cfg: IndicationConfig, - hook: UserFaultHook, + hook: UserFaultHookInstance, ) -> Self { Self { id, @@ -610,12 +671,12 @@ impl LocalEntityConfig { } } -impl LocalEntityConfig { - pub fn user_fault_hook_mut(&mut self) -> &mut RefCell { +impl LocalEntityConfig { + pub fn user_fault_hook_mut(&mut self) -> &mut RefCell { &mut self.fault_handler.user_hook } - pub fn user_fault_hook(&self) -> &RefCell { + pub fn user_fault_hook(&self) -> &RefCell { &self.fault_handler.user_hook } } @@ -692,7 +753,7 @@ pub mod std_mod { } } - impl CountdownProvider for StdCountdown { + impl Countdown for StdCountdown { fn has_expired(&self) -> bool { if self.start_time.elapsed() > self.expiry_time { return true; @@ -723,7 +784,7 @@ pub mod std_mod { } } - impl TimerCreatorProvider for StdTimerCreator { + impl TimerCreator for StdTimerCreator { type Countdown = StdCountdown; fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { @@ -809,7 +870,7 @@ pub enum PacketTarget { pub trait PduProvider { fn pdu_type(&self) -> PduType; fn file_directive_type(&self) -> Option; - fn pdu(&self) -> &[u8]; + fn raw_pdu(&self) -> &[u8]; fn packet_target(&self) -> Result; } @@ -824,7 +885,7 @@ impl PduProvider for DummyPduProvider { None } - fn pdu(&self) -> &[u8] { + fn raw_pdu(&self) -> &[u8] { &[] } @@ -936,7 +997,7 @@ impl PduProvider for PduRawWithInfo<'_> { self.file_directive_type } - fn pdu(&self) -> &[u8] { + fn raw_pdu(&self) -> &[u8] { self.raw_packet } @@ -998,7 +1059,7 @@ pub mod alloc_mod { self.file_directive_type } - fn pdu(&self) -> &[u8] { + fn raw_pdu(&self) -> &[u8] { &self.pdu } @@ -1009,8 +1070,8 @@ pub mod alloc_mod { } #[derive(Debug)] -struct PositiveAckParams { - ack_timer: Countdown, +struct PositiveAckParams { + ack_timer: CountdownInstance, ack_counter: u32, } @@ -1072,7 +1133,7 @@ pub(crate) mod tests { expiry_control: TimerExpiryControl, } - impl CountdownProvider for TestCheckTimer { + impl Countdown for TestCheckTimer { fn has_expired(&self) -> bool { match self.context { TimerContext::CheckLimit { @@ -1132,7 +1193,7 @@ pub(crate) mod tests { } } - impl TimerCreatorProvider for TestCheckTimerCreator { + impl TimerCreator for TestCheckTimerCreator { type Countdown = TestCheckTimer; fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { @@ -1150,6 +1211,7 @@ pub(crate) mod tests { } } + #[derive(Debug)] pub struct FileSegmentRecvdParamsNoSegMetadata { #[allow(dead_code)] pub id: TransactionId, @@ -1157,8 +1219,9 @@ pub(crate) mod tests { pub length: usize, } - #[derive(Default)] + #[derive(Default, Debug)] pub struct TestCfdpUser { + pub check_queues_empty_on_drop: bool, pub next_expected_seq_num: u64, pub expected_full_src_name: String, pub expected_full_dest_name: String, @@ -1179,6 +1242,7 @@ pub(crate) mod tests { expected_file_size: u64, ) -> Self { Self { + check_queues_empty_on_drop: true, next_expected_seq_num, expected_full_src_name, expected_full_dest_name, @@ -1196,6 +1260,12 @@ pub(crate) mod tests { assert_eq!(id.source_id, LOCAL_ID.into()); assert_eq!(id.seq_num().value(), self.next_expected_seq_num); } + + pub fn indication_queues_empty(&self) -> bool { + self.finished_indic_queue.is_empty() + && self.metadata_recv_queue.is_empty() + && self.file_seg_recvd_queue.is_empty() + } } impl CfdpUser for TestCfdpUser { @@ -1285,6 +1355,20 @@ pub(crate) mod tests { } } + impl Drop for TestCfdpUser { + fn drop(&mut self) { + if self.check_queues_empty_on_drop { + assert!( + self.indication_queues_empty(), + "indication queues not empty on drop: finished: {}, metadata: {}, file seg: {}", + self.finished_indic_queue.len(), + self.metadata_recv_queue.len(), + self.file_seg_recvd_queue.len() + ); + } + } + } + #[derive(Default, Debug)] pub(crate) struct TestFaultHandler { pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, @@ -1293,7 +1377,7 @@ pub(crate) mod tests { pub ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>, } - impl UserFaultHookProvider for TestFaultHandler { + impl UserFaultHook for TestFaultHandler { fn notice_of_suspension_cb( &mut self, transaction_id: TransactionId, @@ -1398,8 +1482,8 @@ pub(crate) mod tests { dest_id: impl Into, max_packet_len: usize, crc_on_transmission_by_default: bool, - ) -> StdRemoteEntityConfigProvider { - let mut table = StdRemoteEntityConfigProvider::default(); + ) -> RemoteConfigStoreStd { + let mut table = RemoteConfigStoreStd::default(); let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( dest_id.into(), max_packet_len, @@ -1408,7 +1492,7 @@ pub(crate) mod tests { TransmissionMode::Unacknowledged, ChecksumType::Crc32, ); - table.add_config(&remote_entity_cfg); + table.add_config(&remote_entity_cfg).unwrap(); table } @@ -1549,9 +1633,10 @@ pub(crate) mod tests { TransmissionMode::Unacknowledged, ChecksumType::Crc32, ); - assert!(!remote_entity_cfg.add_config(&dummy)); - // Removal is no-op. - assert!(!remote_entity_cfg.remove_config(REMOTE_ID.value())); + assert_eq!( + remote_entity_cfg.add_config(&dummy).unwrap_err(), + RemoteConfigStoreError::Full + ); let remote_entity_retrieved = remote_entity_cfg.get(REMOTE_ID.value()).unwrap(); assert_eq!(remote_entity_retrieved.entity_id, REMOTE_ID.into()); // Does not exist. @@ -1569,9 +1654,9 @@ pub(crate) mod tests { TransmissionMode::Unacknowledged, ChecksumType::Crc32, ); - let mut remote_cfg_provider = StdRemoteEntityConfigProvider::default(); + let mut remote_cfg_provider = RemoteConfigStoreStd::default(); assert!(remote_cfg_provider.0.is_empty()); - remote_cfg_provider.add_config(&remote_entity_cfg); + remote_cfg_provider.add_config(&remote_entity_cfg).unwrap(); assert_eq!(remote_cfg_provider.0.len(), 1); let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values( LOCAL_ID.into(), @@ -1583,7 +1668,9 @@ pub(crate) mod tests { ); let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap(); assert_eq!(cfg_0.entity_id, REMOTE_ID.into()); - remote_cfg_provider.add_config(&remote_entity_cfg_2); + remote_cfg_provider + .add_config(&remote_entity_cfg_2) + .unwrap(); assert_eq!(remote_cfg_provider.0.len(), 2); let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap(); assert_eq!(cfg_1.entity_id, LOCAL_ID.into()); @@ -1597,7 +1684,7 @@ pub(crate) mod tests { #[test] fn test_remote_cfg_provider_vector() { - let mut remote_cfg_provider = VecRemoteEntityConfigProvider::default(); + let mut remote_cfg_provider = RemoteConfigList::default(); let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( REMOTE_ID.into(), 1024, @@ -1607,7 +1694,7 @@ pub(crate) mod tests { ChecksumType::Crc32, ); assert!(remote_cfg_provider.0.is_empty()); - remote_cfg_provider.add_config(&remote_entity_cfg); + remote_cfg_provider.add_config(&remote_entity_cfg).unwrap(); assert_eq!(remote_cfg_provider.0.len(), 1); let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values( LOCAL_ID.into(), @@ -1619,7 +1706,11 @@ pub(crate) mod tests { ); let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap(); assert_eq!(cfg_0.entity_id, REMOTE_ID.into()); - remote_cfg_provider.add_config(&remote_entity_cfg_2); + assert!( + remote_cfg_provider + .add_config(&remote_entity_cfg_2) + .unwrap() + ); assert_eq!(remote_cfg_provider.0.len(), 2); let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap(); assert_eq!(cfg_1.entity_id, LOCAL_ID.into()); @@ -1649,7 +1740,7 @@ pub(crate) mod tests { let dummy_pdu_provider = DummyPduProvider(()); assert_eq!(dummy_pdu_provider.pdu_type(), PduType::FileData); assert!(dummy_pdu_provider.file_directive_type().is_none()); - assert_eq!(dummy_pdu_provider.pdu(), &[]); + assert_eq!(dummy_pdu_provider.raw_pdu(), &[]); assert_eq!( dummy_pdu_provider.packet_target(), Ok(PacketTarget::SourceEntity) diff --git a/src/lost_segments.rs b/src/lost_segments.rs index da612aa..796fbde 100644 --- a/src/lost_segments.rs +++ b/src/lost_segments.rs @@ -4,9 +4,9 @@ //! //! The two concrete implementations provided are: //! -//! * [LostSegmentsMap]: A hash set based implementation which can grow dynamically andcan +//! * [LostSegmentsList]: A hash set based implementation which can grow dynamically andcan //! optionally be bounded. Suitable for systems where dynamic allocation is allowed. -//! * [LostSegmentsList]: A fixed-size list based implementation where the size +//! * [LostSegmentsListHeapless]: A fixed-size list based implementation where the size //! of the lost segment list is statically known at compile-time. Suitable for resource //! constrained devices where dyanamic allocation is not allowed or possible. #[derive(Debug, PartialEq, Eq, thiserror::Error)] diff --git a/src/source.rs b/src/source.rs index 007a1c3..c0b379a 100644 --- a/src/source.rs +++ b/src/source.rs @@ -67,13 +67,13 @@ use spacepackets::{ use spacepackets::seq_count::SequenceCounter; use crate::{ - DummyPduProvider, EntityType, GenericSendError, PduProvider, PositiveAckParams, - TimerCreatorProvider, time::CountdownProvider, + DummyPduProvider, EntityType, GenericSendError, PduProvider, PositiveAckParams, TimerCreator, + time::Countdown, }; use super::{ - LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig, - RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, + LocalEntityConfig, PacketTarget, PduSendProvider, RemoteConfigStore, RemoteEntityConfig, State, + TransactionId, UserFaultHook, filestore::{FilestoreError, VirtualFilestore}, request::{ReadablePutRequest, StaticPutRequestCacher}, user::{CfdpUser, TransactionFinishedParams}, @@ -244,18 +244,18 @@ pub enum FsmContext { /// thread pool, or move the newly created handler to a new thread. pub struct SourceHandler< PduSender: PduSendProvider, - UserFaultHook: UserFaultHookProvider, + UserFaultHookInstance: UserFaultHook, Vfs: VirtualFilestore, - RemoteCfgTable: RemoteEntityConfigProvider, - TimerCreator: TimerCreatorProvider, - Countdown: CountdownProvider, + RemoteConfigStoreInstance: RemoteConfigStore, + TimerCreatorInstance: TimerCreator, + CountdownInstance: Countdown, SequenceCounterInstance: SequenceCounter, > { - local_cfg: LocalEntityConfig, + local_cfg: LocalEntityConfig, pdu_sender: PduSender, pdu_and_cksum_buffer: RefCell>, put_request_cacher: StaticPutRequestCacher, - remote_cfg_table: RemoteCfgTable, + remote_cfg_table: RemoteConfigStoreInstance, vfs: Vfs, state_helper: StateHelper, // Transfer related state information @@ -264,29 +264,29 @@ pub struct SourceHandler< file_params: FileParams, // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. pdu_conf: CommonPduConfig, - check_timer: RefCell>, - positive_ack_params: RefCell>>, - timer_creator: TimerCreator, + check_timer: RefCell>, + positive_ack_params: RefCell>>, + timer_creator: TimerCreatorInstance, seq_count_provider: SequenceCounterInstance, anomalies: AnomalyTracker, } impl< PduSender: PduSendProvider, - UserFaultHook: UserFaultHookProvider, + UserFaultHookInstance: UserFaultHook, Vfs: VirtualFilestore, - RemoteCfgTable: RemoteEntityConfigProvider, - TimerCreator: TimerCreatorProvider, - Countdown: CountdownProvider, + RemoteConfigStoreInstance: RemoteConfigStore, + TimerCreatorInstance: TimerCreator, + CountdownInstance: Countdown, SequenceCounterInstance: SequenceCounter, > SourceHandler< PduSender, - UserFaultHook, + UserFaultHookInstance, Vfs, - RemoteCfgTable, - TimerCreator, - Countdown, + RemoteConfigStoreInstance, + TimerCreatorInstance, + CountdownInstance, SequenceCounterInstance, > { @@ -314,13 +314,13 @@ impl< /// which contains an incrementing counter. #[allow(clippy::too_many_arguments)] pub fn new( - cfg: LocalEntityConfig, + cfg: LocalEntityConfig, pdu_sender: PduSender, vfs: Vfs, put_request_cacher: StaticPutRequestCacher, pdu_and_cksum_buf_size: usize, - remote_cfg_table: RemoteCfgTable, - timer_creator: TimerCreator, + remote_cfg_table: RemoteConfigStoreInstance, + timer_creator: TimerCreatorInstance, seq_count_provider: SequenceCounterInstance, ) -> Self { Self { @@ -409,7 +409,7 @@ impl< } #[inline] - pub fn local_cfg(&self) -> &LocalEntityConfig { + pub fn local_cfg(&self) -> &LocalEntityConfig { &self.local_cfg } @@ -544,16 +544,16 @@ impl< .expect("PDU directive type unexpectedly not set") { FileDirectiveType::FinishedPdu => { - let finished_pdu = FinishedPduReader::new(packet_to_insert.pdu())?; + let finished_pdu = FinishedPduReader::new(packet_to_insert.raw_pdu())?; self.handle_finished_pdu(&finished_pdu)? } FileDirectiveType::NakPdu => { - let nak_pdu = NakPduReader::new(packet_to_insert.pdu())?; + let nak_pdu = NakPduReader::new(packet_to_insert.raw_pdu())?; sent_packets += self.handle_nak_pdu(&nak_pdu)?; } FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), FileDirectiveType::AckPdu => { - let ack_pdu = AckPdu::from_bytes(packet_to_insert.pdu())?; + let ack_pdu = AckPdu::from_bytes(packet_to_insert.raw_pdu())?; self.handle_ack_pdu(&ack_pdu)? } FileDirectiveType::EofPdu @@ -1196,7 +1196,7 @@ mod tests { use super::*; use crate::{ - CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, + CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd, filestore::NativeFilestore, request::PutRequestOwned, source::TransactionStep, @@ -1222,7 +1222,7 @@ mod tests { TestCfdpSender, TestFaultHandler, NativeFilestore, - StdRemoteEntityConfigProvider, + RemoteConfigStoreStd, TestCheckTimerCreator, TestCheckTimer, SequenceCounterSimple, @@ -1686,18 +1686,15 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transaction_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); + let mut user = tb.create_user(0, file_size); + let transaction_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); tb.common_eof_pdu_check( - &mut cfdp_user, + &mut user, transaction_info.closure_requested, EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, - ) + ); } #[test] @@ -1712,53 +1709,50 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transaction_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); + let mut user = tb.create_user(0, file_size); + let transaction_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); tb.common_eof_pdu_check( - &mut cfdp_user, + &mut user, transaction_info.closure_requested, EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, ); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transaction_info); - tb.finish_handling(&mut cfdp_user, &transaction_info); + tb.acknowledge_eof_pdu(&mut user, &transaction_info); + tb.finish_handling(&mut user, &transaction_info); tb.common_finished_pdu_ack_check(); } #[test] fn test_tiny_file_transfer_not_acked_no_closure() { - let mut cfdp_user = TestCfdpUser::default(); + let mut user = TestCfdpUser::default(); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); - tb.common_tiny_file_transfer(&mut cfdp_user, false); + tb.common_tiny_file_transfer(&mut user, false); } #[test] fn test_tiny_file_transfer_acked() { - let mut cfdp_user = TestCfdpUser::default(); + let mut user = TestCfdpUser::default(); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); - let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, false); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, false); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } #[test] fn test_tiny_file_transfer_not_acked_with_closure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); - let mut cfdp_user = TestCfdpUser::default(); - let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); - tb.finish_handling(&mut cfdp_user, &transfer_info) + let mut user = TestCfdpUser::default(); + let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true); + tb.finish_handling(&mut user, &transfer_info) } #[test] fn test_two_segment_file_transfer_not_acked_no_closure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); - let mut cfdp_user = TestCfdpUser::default(); + let mut user = TestCfdpUser::default(); let mut file = OpenOptions::new() .write(true) .open(&tb.srcfile) @@ -1768,14 +1762,14 @@ mod tests { file.write_all(&rand_data) .expect("writing file content failed"); drop(file); - let (_, fd_pdus) = tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec()); + let (_, fd_pdus) = tb.generic_file_transfer(&mut user, false, rand_data.to_vec()); assert_eq!(fd_pdus, 2); } #[test] fn test_two_segment_file_transfer_not_acked_with_closure() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); - let mut cfdp_user = TestCfdpUser::default(); + let mut user = TestCfdpUser::default(); let mut file = OpenOptions::new() .write(true) .open(&tb.srcfile) @@ -1786,14 +1780,14 @@ mod tests { .expect("writing file content failed"); drop(file); let (transfer_info, fd_pdus) = - tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec()); + tb.generic_file_transfer(&mut user, true, rand_data.to_vec()); assert_eq!(fd_pdus, 2); - tb.finish_handling(&mut cfdp_user, &transfer_info) + tb.finish_handling(&mut user, &transfer_info) } #[test] fn test_two_segment_file_transfer_acked() { - let mut cfdp_user = TestCfdpUser::default(); + let mut user = TestCfdpUser::default(); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); let mut file = OpenOptions::new() .write(true) @@ -1805,10 +1799,10 @@ mod tests { .expect("writing file content failed"); drop(file); let (transfer_info, fd_pdus) = - tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec()); + tb.generic_file_transfer(&mut user, true, rand_data.to_vec()); assert_eq!(fd_pdus, 2); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } @@ -1824,19 +1818,16 @@ mod tests { Some(true), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transaction_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); + let mut user = tb.create_user(0, file_size); + let transaction_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); tb.common_eof_pdu_check( - &mut cfdp_user, + &mut user, transaction_info.closure_requested, EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, ); - tb.finish_handling(&mut cfdp_user, &transaction_info) + tb.finish_handling(&mut user, &transaction_info) } #[test] @@ -1898,15 +1889,12 @@ mod tests { Some(true), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transaction_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); + let mut user = tb.create_user(0, file_size); + let transaction_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); let expected_id = tb.handler.transaction_id().unwrap(); tb.common_eof_pdu_check( - &mut cfdp_user, + &mut user, transaction_info.closure_requested, EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, @@ -1917,10 +1905,7 @@ mod tests { // cancellation -> leads to an EOF PDU with the appropriate error code. tb.expiry_control.set_check_limit_expired(); - assert_eq!( - tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(), - 1 - ); + assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); assert!(!tb.pdu_queue_empty()); let next_pdu = tb.get_next_sent_pdu().unwrap(); let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format"); @@ -1953,9 +1938,9 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - assert_eq!(cfdp_user.transaction_indication_call_count, 0); - assert_eq!(cfdp_user.eof_sent_call_count, 0); + let mut user = tb.create_user(0, filesize); + assert_eq!(user.transaction_indication_call_count, 0); + assert_eq!(user.eof_sent_call_count, 0); tb.put_request(&put_request) .expect("put_request call failed"); @@ -1964,7 +1949,7 @@ mod tests { assert!(tb.get_next_sent_pdu().is_none()); let id = tb.handler.transaction_id().unwrap(); tb.handler - .cancel_request(&mut cfdp_user, &id) + .cancel_request(&mut user, &id) .expect("transaction cancellation failed"); assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.step(), TransactionStep::Idle); @@ -2007,12 +1992,9 @@ mod tests { ) .expect("creating put request failed"); let file_size = rand_data.len() as u64; - let mut cfdp_user = tb.create_user(0, file_size); - let transaction_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); + let mut user = tb.create_user(0, file_size); + let transaction_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); let mut chunks = rand_data.chunks( calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( &transaction_info.pdu_header, @@ -2031,7 +2013,7 @@ mod tests { let expected_id = tb.handler.transaction_id().unwrap(); assert!( tb.handler - .cancel_request(&mut cfdp_user, &expected_id) + .cancel_request(&mut user, &expected_id) .expect("cancellation failed") ); assert_eq!(tb.handler.state(), State::Idle); @@ -2070,18 +2052,10 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transfer_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 1, - ); + let mut user = tb.create_user(0, file_size); + let transfer_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1); assert!(tb.pdu_queue_empty()); @@ -2089,18 +2063,13 @@ mod tests { tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 1); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 2, - ); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } @@ -2117,18 +2086,10 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transfer_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 1, - ); + let mut user = tb.create_user(0, file_size); + let transfer_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1); assert!(tb.pdu_queue_empty()); @@ -2136,33 +2097,23 @@ mod tests { tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 1); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 2, - ); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2); // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 1); eof_params.condition_code = ConditionCode::PositiveAckLimitReached; - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 3, - ); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3); // This boilerplate handling is still expected. In a real-life use-case I would expect // this to fail as well, leading to a transaction abandonment. This is tested separately. - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } @@ -2179,18 +2130,10 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transfer_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 1, - ); + let mut user = tb.create_user(0, file_size); + let transfer_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1); assert!(tb.pdu_queue_empty()); @@ -2198,29 +2141,19 @@ mod tests { tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 1); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 2, - ); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2); // Enforce a postive ack timer expiry -> positive ACK limit reached -> Cancel EOF sent. tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 1); eof_params.condition_code = ConditionCode::PositiveAckLimitReached; - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 3, - ); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3); // Cancellation fault should have been triggered. let fault_handler = tb.test_fault_handler_mut(); let fh_ref_mut = fault_handler.get_mut(); @@ -2236,22 +2169,17 @@ mod tests { tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 1); - tb.common_eof_pdu_check( - &mut cfdp_user, - transfer_info.closure_requested, - eof_params, - 4, - ); + tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 4); // Enforce a postive ack timer expiry -> positive ACK limit reached -> Transaction // abandoned tb.expiry_control.set_positive_ack_expired(); let sent_packets = tb .handler - .state_machine_no_packet(&mut cfdp_user) + .state_machine_no_packet(&mut user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 0); // Abandonment fault should have been triggered. @@ -2269,21 +2197,21 @@ mod tests { #[test] fn test_nak_for_whole_file() { let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); - let mut cfdp_user = TestCfdpUser::default(); - let (data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); + let mut user = TestCfdpUser::default(); + let (data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true); let seg_reqs = &[(0, transfer_info.file_size as u32)]; - tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, seg_reqs); + tb.nak_for_file_segments(&mut user, &transfer_info, seg_reqs); tb.check_next_file_pdu(0, data.as_bytes()); tb.all_fault_queues_empty(); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } #[test] fn test_nak_for_file_segment() { - let mut cfdp_user = TestCfdpUser::default(); + let mut user = TestCfdpUser::default(); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); let mut file = OpenOptions::new() .write(true) @@ -2295,14 +2223,14 @@ mod tests { .expect("writing file content failed"); drop(file); let (transfer_info, fd_pdus) = - tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec()); + tb.generic_file_transfer(&mut user, false, rand_data.to_vec()); assert_eq!(fd_pdus, 2); - tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, &[(0, 90)]); + tb.nak_for_file_segments(&mut user, &transfer_info, &[(0, 90)]); tb.check_next_file_pdu(0, &rand_data[0..90]); tb.all_fault_queues_empty(); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } @@ -2318,14 +2246,11 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, file_size); - let transfer_info = tb.common_file_transfer_init_with_metadata_check( - &mut cfdp_user, - put_request, - file_size, - ); + let mut user = tb.create_user(0, file_size); + let transfer_info = + tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size); tb.common_eof_pdu_check( - &mut cfdp_user, + &mut user, transfer_info.closure_requested, EofParams::new_success(file_size, CRC_32.digest().finalize()), 1, @@ -2343,7 +2268,7 @@ mod tests { let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap(); let sent_packets = tb .handler - .state_machine(&mut cfdp_user, Some(&packet_info)) + .state_machine(&mut user, Some(&packet_info)) .unwrap(); assert_eq!(sent_packets, 1); let next_pdu = tb.get_next_sent_pdu().unwrap(); @@ -2351,8 +2276,8 @@ mod tests { tb.metadata_check(&next_pdu, file_size); tb.all_fault_queues_empty(); - tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); - tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.acknowledge_eof_pdu(&mut user, &transfer_info); + tb.finish_handling(&mut user, &transfer_info); tb.common_finished_pdu_ack_check(); } } diff --git a/src/time.rs b/src/time.rs index 23d69d2..8a8f433 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,7 +1,7 @@ use core::fmt::Debug; /// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone. -pub trait CountdownProvider: Debug { +pub trait Countdown: Debug { fn has_expired(&self) -> bool; fn reset(&mut self); } diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 2f7ba87..b94d51e 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -13,9 +13,10 @@ use std::{ use cfdp::{ EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig, - StdTimerCreator, TransactionId, UserFaultHookProvider, + StdTimerCreator, TransactionId, UserFaultHook, dest::DestinationHandler, filestore::NativeFilestore, + lost_segments::LostSegmentsList, request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, @@ -33,7 +34,7 @@ const FILE_DATA: &str = "Hello World!"; #[derive(Default)] pub struct ExampleFaultHandler {} -impl UserFaultHookProvider for ExampleFaultHandler { +impl UserFaultHook for ExampleFaultHandler { fn notice_of_suspension_cb( &mut self, transaction_id: TransactionId, @@ -230,6 +231,7 @@ fn end_to_end_test(with_closure: bool) { NativeFilestore::default(), remote_cfg_of_source, StdTimerCreator::default(), + LostSegmentsList::default(), ); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);