use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error}; use spacepackets::{ cfdp::{ lv::Lv, pdu::{ eof::EofPdu, file_data::FileDataPduCreatorWithReservedDatafield, finished::{DeliveryCode, FileStatus, FinishedPduReader}, metadata::{MetadataGenericParams, MetadataPduCreator}, CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, }, ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl, TransmissionMode, }, util::{UnsignedByteField, UnsignedEnum}, ByteConversionError, }; use crate::seq_count::SequenceCountProvider; use super::{ filestore::{FilestoreError, VirtualFilestore}, request::{ReadablePutRequest, StaticPutRequestCacher}, user::{CfdpUser, TransactionFinishedParams}, LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, }; #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum TransactionStep { Idle = 0, TransactionStart = 1, SendingMetadata = 3, SendingFileData = 4, /// Re-transmitting missing packets in acknowledged mode Retransmitting = 5, SendingEof = 6, WaitingForEofAck = 7, WaitingForFinished = 8, // SendingAckOfFinished = 9, NoticeOfCompletion = 10, } #[derive(Default)] pub struct FileParams { pub progress: u64, pub segment_len: u64, pub crc32: u32, pub metadata_only: bool, pub file_size: u64, pub empty_file: bool, } pub struct StateHelper { state: super::State, step: TransactionStep, num_packets_ready: u32, } #[derive(Debug)] pub struct FinishedParams { condition_code: ConditionCode, delivery_code: DeliveryCode, file_status: FileStatus, } #[derive(Debug, derive_new::new)] pub struct TransferState { transaction_id: TransactionId, remote_cfg: RemoteEntityConfig, transmission_mode: super::TransmissionMode, closure_requested: bool, cond_code_eof: Option, finished_params: Option, } impl Default for StateHelper { fn default() -> Self { Self { state: super::State::Idle, step: TransactionStep::Idle, num_packets_ready: 0, } } } #[derive(Debug, thiserror::Error)] pub enum SourceError { #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")] CantProcessPacketType { pdu_type: PduType, directive_type: Option, }, #[error("unexpected PDU")] UnexpectedPdu { pdu_type: PduType, directive_type: Option, }, #[error("source handler is already busy with put request")] PutRequestAlreadyActive, #[error("error caching put request")] PutRequestCaching(ByteConversionError), #[error("filestore error: {0}")] FilestoreError(#[from] FilestoreError), #[error("source file does not have valid UTF8 format: {0}")] SourceFileNotValidUtf8(Utf8Error), #[error("destination file does not have valid UTF8 format: {0}")] DestFileNotValidUtf8(Utf8Error), #[error("error related to PDU creation")] Pdu(#[from] PduError), } #[derive(Debug, thiserror::Error)] pub enum PutRequestError { #[error("error caching put request: {0}")] Storage(#[from] ByteConversionError), #[error("already busy with put request")] AlreadyBusy, #[error("no remote entity configuration found for {0:?}")] NoRemoteCfgFound(UnsignedByteField), } pub struct SourceHandler< PduSender: PduSendProvider, UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, SeqCountProvider: SequenceCountProvider, > { local_cfg: LocalEntityConfig, pdu_sender: PduSender, pdu_and_cksum_buffer: RefCell>, put_request_cacher: StaticPutRequestCacher, remote_cfg_table: RemoteCfgTable, vfs: Vfs, state_helper: StateHelper, // Transfer related state information tstate: Option, // File specific transfer fields fparams: FileParams, // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. pdu_conf: CommonPduConfig, seq_count_provider: SeqCountProvider, } impl< PduSender: PduSendProvider, UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, SeqCountProvider: SequenceCountProvider, > SourceHandler { pub fn new( cfg: LocalEntityConfig, pdu_sender: PduSender, vfs: Vfs, put_request_cacher: StaticPutRequestCacher, pdu_and_cksum_buf_size: usize, remote_cfg_table: RemoteCfgTable, seq_count_provider: SeqCountProvider, ) -> Self { Self { local_cfg: cfg, remote_cfg_table, pdu_sender, pdu_and_cksum_buffer: RefCell::new(alloc::vec![0; pdu_and_cksum_buf_size]), vfs, put_request_cacher, state_helper: Default::default(), tstate: Default::default(), fparams: Default::default(), pdu_conf: Default::default(), seq_count_provider, } } /// This is the core function to drive the source handler. It is also used to insert /// packets into the source handler. /// /// The state machine should either be called if a packet with the appropriate destination ID /// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example /// checking for timeouts or missed file segments. /// /// The function returns the number of sent PDU packets on success. pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, packet_to_insert: Option<&PacketInfo>, ) -> Result { if let Some(packet) = packet_to_insert { self.insert_packet(cfdp_user, packet)?; } match self.state_helper.state { super::State::Idle => todo!(), super::State::Busy => self.fsm_busy(cfdp_user), super::State::Suspended => todo!(), } } fn insert_packet( &mut self, cfdp_user: &mut impl CfdpUser, packet_info: &PacketInfo, ) -> Result<(), SourceError> { if packet_info.target() != PacketTarget::SourceEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. return Err(SourceError::CantProcessPacketType { pdu_type: packet_info.pdu_type(), directive_type: packet_info.pdu_directive(), }); } if packet_info.pdu_type() == PduType::FileData { // The [PacketInfo] API should ensure that file data PDUs can not be passed // into a source entity, so this should never happen. return Err(SourceError::UnexpectedPdu { pdu_type: PduType::FileData, directive_type: None, }); } // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is // always a valid value. match packet_info .pdu_directive() .expect("PDU directive type unexpectedly not set") { FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_info)?, FileDirectiveType::NakPdu => self.handle_nak_pdu(), FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), FileDirectiveType::AckPdu => todo!("acknowledged mode not implemented yet"), FileDirectiveType::EofPdu | FileDirectiveType::PromptPdu | FileDirectiveType::MetadataPdu => { return Err(SourceError::CantProcessPacketType { pdu_type: packet_info.pdu_type(), directive_type: packet_info.pdu_directive(), }); } } Ok(()) } pub fn put_request( &mut self, put_request: &impl ReadablePutRequest, ) -> Result<(), PutRequestError> { if self.state_helper.state != super::State::Idle { return Err(PutRequestError::AlreadyBusy); } self.put_request_cacher.set(put_request)?; self.state_helper.state = super::State::Busy; let remote_cfg = self.remote_cfg_table.get( self.put_request_cacher .static_fields .destination_id .value_const(), ); if remote_cfg.is_none() { // TODO: Specific error. return Err(PutRequestError::NoRemoteCfgFound( self.put_request_cacher.static_fields.destination_id, )); } let remote_cfg = remote_cfg.unwrap(); self.state_helper.num_packets_ready = 0; let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() { self.put_request_cacher.static_fields.trans_mode.unwrap() } else { remote_cfg.default_transmission_mode }; let closure_requested = if self .put_request_cacher .static_fields .closure_requested .is_some() { self.put_request_cacher .static_fields .closure_requested .unwrap() } else { remote_cfg.closure_requested_by_default }; self.tstate = Some(TransferState::new( TransactionId::new( self.local_cfg().id, UnsignedByteField::new( SeqCountProvider::MAX_BIT_WIDTH / 8, self.seq_count_provider.get_and_increment().into(), ), ), *remote_cfg, transmission_mode, closure_requested, None, None, )); Ok(()) } #[inline] pub fn transmission_mode(&self) -> Option { self.tstate.as_ref().map(|v| v.transmission_mode) } fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { let mut sent_packets = 0; if self.state_helper.step == TransactionStep::Idle { self.state_helper.step = TransactionStep::TransactionStart; } if self.state_helper.step == TransactionStep::TransactionStart { self.handle_transaction_start(cfdp_user)?; self.state_helper.step = TransactionStep::SendingMetadata; } if self.state_helper.step == TransactionStep::SendingMetadata { self.prepare_and_send_metadata_pdu()?; self.state_helper.step = TransactionStep::SendingFileData; sent_packets += 1; // return Ok(1); } if self.state_helper.step == TransactionStep::SendingFileData { if let ControlFlow::Break(result) = self.file_data_fsm()? { return Ok(result); } } if self.state_helper.step == TransactionStep::SendingEof { self.eof_fsm(cfdp_user)?; sent_packets += 1; // return Ok(1); } if self.state_helper.step == TransactionStep::WaitingForFinished { /* def _handle_wait_for_finish(self): if ( self.transmission_mode == TransmissionMode.ACKNOWLEDGED and self.__handle_retransmission() ): return if ( self._inserted_pdu.pdu is None or self._inserted_pdu.pdu_directive_type is None or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU ): if self._params.check_timer is not None: if self._params.check_timer.timed_out(): self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED) return finished_pdu = self._inserted_pdu.to_finished_pdu() self._inserted_pdu.pdu = None self._params.finished_params = finished_pdu.finished_params if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: self._prepare_finished_ack_packet(finished_pdu.condition_code) self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED else: self.states.step = TransactionStep.NOTICE_OF_COMPLETION */ } if self.state_helper.step == TransactionStep::NoticeOfCompletion { self.notice_of_completion(cfdp_user); self.reset(); /* def _notice_of_completion(self): if self.cfg.indication_cfg.transaction_finished_indication_required: assert self._params.transaction_id is not None # This happens for unacknowledged file copy operation with no closure. if self._params.finished_params is None: self._params.finished_params = FinishedParams( condition_code=ConditionCode.NO_ERROR, delivery_code=DeliveryCode.DATA_COMPLETE, file_status=FileStatus.FILE_STATUS_UNREPORTED, ) indication_params = TransactionFinishedParams( transaction_id=self._params.transaction_id, finished_params=self._params.finished_params, ) self.user.transaction_finished_indication(indication_params) # Transaction finished self.reset() */ } Ok(sent_packets) } fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> { let tstate = self.tstate.as_ref().unwrap(); let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), tstate.remote_cfg.default_crc_type, self.pdu_and_cksum_buffer.get_mut(), )?; self.prepare_and_send_eof_pdu(checksum)?; let tstate = self.tstate.as_ref().unwrap(); if self.local_cfg.indication_cfg.eof_sent { cfdp_user.eof_sent_indication(&tstate.transaction_id); } if tstate.transmission_mode == TransmissionMode::Unacknowledged { if tstate.closure_requested { // TODO: Check timer handling. self.state_helper.step = TransactionStep::WaitingForFinished; } else { self.state_helper.step = TransactionStep::NoticeOfCompletion; } } else { // TODO: Start positive ACK procedure. } /* if self.cfg.indication_cfg.eof_sent_indication_required: assert self._params.transaction_id is not None self.user.eof_sent_indication(self._params.transaction_id) if self.transmission_mode == TransmissionMode.UNACKNOWLEDGED: if self._params.closure_requested: assert self._params.remote_cfg is not None self._params.check_timer = ( self.check_timer_provider.provide_check_timer( local_entity_id=self.cfg.local_entity_id, remote_entity_id=self._params.remote_cfg.entity_id, entity_type=EntityType.SENDING, ) ) self.states.step = TransactionStep.WAITING_FOR_FINISHED else: self.states.step = TransactionStep.NOTICE_OF_COMPLETION else: self._start_positive_ack_procedure() */ Ok(()) } fn handle_transaction_start( &mut self, cfdp_user: &mut impl CfdpUser, ) -> Result<(), SourceError> { let tstate = self .tstate .as_ref() .expect("transfer state unexpectedly empty"); if !self.put_request_cacher.has_source_file() { self.fparams.metadata_only = true; } else { let source_file = self .put_request_cacher .source_file() .map_err(SourceError::SourceFileNotValidUtf8)?; if !self.vfs.exists(source_file)? { return Err(SourceError::FilestoreError( FilestoreError::FileDoesNotExist, )); } // We expect the destination file path to consist of valid UTF-8 characters as well. self.put_request_cacher .dest_file() .map_err(SourceError::DestFileNotValidUtf8)?; let file_size = self.vfs.file_size(source_file)?; if file_size > u32::MAX as u64 { self.pdu_conf.file_flag = LargeFileFlag::Large } else { if file_size == 0 { self.fparams.empty_file = true; } self.pdu_conf.file_flag = LargeFileFlag::Normal } } // Both the source entity and destination entity ID field must have the same size. // We use the larger of either the Put Request destination ID or the local entity ID // as the size for the new entity IDs. let larger_entity_width = core::cmp::max( self.local_cfg.id.size(), self.put_request_cacher.static_fields.destination_id.size(), ); let create_id = |cached_id: &UnsignedByteField| { if larger_entity_width != cached_id.size() { UnsignedByteField::new(larger_entity_width, cached_id.value_const()) } else { *cached_id } }; self.pdu_conf .set_source_and_dest_id( create_id(&self.local_cfg.id), create_id(&self.put_request_cacher.static_fields.destination_id), ) .unwrap(); // Set up other PDU configuration fields. self.pdu_conf.direction = Direction::TowardsReceiver; self.pdu_conf.crc_flag = tstate.remote_cfg.crc_on_transmission_by_default.into(); self.pdu_conf.transaction_seq_num = *tstate.transaction_id.seq_num(); self.pdu_conf.trans_mode = tstate.transmission_mode; cfdp_user.transaction_indication(&tstate.transaction_id); Ok(()) } fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), PduError> { let tstate = self .tstate .as_ref() .expect("transfer state unexpectedly empty"); let metadata_params = MetadataGenericParams::new( tstate.closure_requested, tstate.remote_cfg.default_crc_type, self.fparams.file_size, ); if self.fparams.metadata_only { let metadata_pdu = MetadataPduCreator::new( PduHeader::new_no_file_data(self.pdu_conf, 0), metadata_params, Lv::new_empty(), Lv::new_empty(), self.put_request_cacher.opts_slice(), ); return self.pdu_send_helper(&metadata_pdu); } let metadata_pdu = MetadataPduCreator::new( PduHeader::new_no_file_data(self.pdu_conf, 0), metadata_params, Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(), Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(), self.put_request_cacher.opts_slice(), ); self.pdu_send_helper(&metadata_pdu) } fn file_data_fsm(&mut self) -> Result, SourceError> { if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged { // TODO: Handle re-transmission } if !self.fparams.metadata_only && self.fparams.progress < self.fparams.file_size && self.send_progressing_file_data_pdu()? { return Ok(ControlFlow::Break(1)); } if self.fparams.empty_file || self.fparams.progress >= self.fparams.file_size { // EOF is still expected. self.state_helper.step = TransactionStep::SendingEof; self.tstate.as_mut().unwrap().cond_code_eof = Some(ConditionCode::NoError); } else if self.fparams.metadata_only { // Special case: Metadata Only, no EOF required. if self.tstate.as_ref().unwrap().closure_requested { self.state_helper.step = TransactionStep::WaitingForFinished; } else { self.state_helper.step = TransactionStep::NoticeOfCompletion; } } Ok(ControlFlow::Continue(())) } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) { let tstate = self.tstate.as_ref().unwrap(); if self.local_cfg.indication_cfg.transaction_finished { // The first case happens for unacknowledged file copy operation with no closure. let finished_params = if tstate.finished_params.is_none() { TransactionFinishedParams { id: tstate.transaction_id, condition_code: ConditionCode::NoError, delivery_code: DeliveryCode::Complete, file_status: FileStatus::Unreported, } } else { let finished_params = tstate.finished_params.as_ref().unwrap(); TransactionFinishedParams { id: tstate.transaction_id, condition_code: finished_params.condition_code, delivery_code: finished_params.delivery_code, file_status: finished_params.file_status, } }; cfdp_user.transaction_finished_indication(&finished_params); } } fn send_progressing_file_data_pdu(&mut self) -> Result { // Should never be called, but use defensive programming here. if self.fparams.progress >= self.fparams.file_size { return Ok(false); } let read_len = if self.fparams.file_size < self.fparams.segment_len { self.fparams.file_size } else if self.fparams.progress + self.fparams.segment_len > self.fparams.file_size { self.fparams.file_size - self.fparams.progress } else { self.fparams.segment_len }; let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata( PduHeader::new_for_file_data( self.pdu_conf, 0, SegmentMetadataFlag::NotPresent, SegmentationControl::NoRecordBoundaryPreservation, ), self.fparams.progress, read_len, ); let mut unwritten_pdu = pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?; self.vfs.read_data( self.put_request_cacher.source_file().unwrap(), self.fparams.progress, read_len, unwritten_pdu.file_data_field_mut(), )?; let written_len = unwritten_pdu.finish(); self.pdu_sender.send_pdu( PduType::FileData, None, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; self.fparams.progress += read_len; /* """Generic function to prepare a file data PDU. This function can also be used to re-transmit file data PDUs of segments which were already sent.""" assert self._put_req is not None assert self._put_req.source_file is not None with open(self._put_req.source_file, "rb") as of: file_data = self.user.vfs.read_from_opened_file(of, offset, read_len) # TODO: Support for record continuation state not implemented yet. Segment metadata # flag is therefore always set to False. Segment metadata support also omitted # for now. Implementing those generically could be done in form of a callback, # e.g. abstractmethod of this handler as a first way, another one being # to expect the user to supply some helper class to split up a file fd_params = FileDataParams( file_data=file_data, offset=offset, segment_metadata=None ) file_data_pdu = FileDataPdu( pdu_conf=self._params.pdu_conf, params=fd_params ) self._add_packet_to_be_sent(file_data_pdu) */ /* """Prepare the next file data PDU, which also progresses the file copy operation. :return: True if a packet was prepared, False if PDU handling is done and the next steps in the Copy File procedure can be performed """ # This function should only be called if file segments still need to be sent. assert self._params.fp.progress < self._params.fp.file_size if self._params.fp.file_size < self._params.fp.segment_len: read_len = self._params.fp.file_size else: if ( self._params.fp.progress + self._params.fp.segment_len > self._params.fp.file_size ): read_len = self._params.fp.file_size - self._params.fp.progress else: read_len = self._params.fp.segment_len self._prepare_file_data_pdu(self._params.fp.progress, read_len) self._params.fp.progress += read_len */ Ok(true) } fn prepare_and_send_eof_pdu(&mut self, checksum: u32) -> Result<(), PduError> { let tstate = self .tstate .as_ref() .expect("transfer state unexpectedly empty"); let eof_pdu = EofPdu::new( PduHeader::new_no_file_data(self.pdu_conf, 0), tstate.cond_code_eof.unwrap_or(ConditionCode::NoError), checksum, self.fparams.file_size, None, ); self.pdu_send_helper(&eof_pdu)?; Ok(()) } fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), PduError> { let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut(); let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?; self.pdu_sender.send_pdu( pdu.pdu_type(), pdu.file_directive_type(), &pdu_buffer_mut[0..written_len], )?; Ok(()) } fn handle_finished_pdu(&mut self, packet_info: &PacketInfo) -> Result<(), SourceError> { // Ignore this packet when we are idle. if self.state_helper.state == State::Idle { return Ok(()); } if self.state_helper.step != TransactionStep::WaitingForFinished { return Err(SourceError::UnexpectedPdu { pdu_type: PduType::FileDirective, directive_type: Some(FileDirectiveType::FinishedPdu), }); } let finished_pdu = FinishedPduReader::new(packet_info.raw_packet())?; // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE // mode. self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams { condition_code: finished_pdu.condition_code(), delivery_code: finished_pdu.delivery_code(), file_status: finished_pdu.file_status(), }); if self.tstate.as_ref().unwrap().transmission_mode == TransmissionMode::Acknowledged { // TODO: Send ACK packet here immediately and continue. //self.state_helper.step = TransactionStep::SendingAckOfFinished; } self.state_helper.step = TransactionStep::NoticeOfCompletion; /* if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: self._prepare_finished_ack_packet(finished_pdu.condition_code) self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED else: self.states.step = TransactionStep.NOTICE_OF_COMPLETION */ Ok(()) } fn handle_nak_pdu(&mut self) {} fn handle_keep_alive_pdu(&mut self) {} /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. pub fn step(&self) -> TransactionStep { self.state_helper.step } /// Get the step, which denotes whether the CFDP handler is active, and which CFDP class /// is used if it is active. pub fn state(&self) -> State { self.state_helper.state } pub fn local_cfg(&self) -> &LocalEntityConfig { &self.local_cfg } /// This function is public to allow completely resetting the handler, but it is explicitely /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. /// Resetting the handler might interfere with these mechanisms and lead to unexpected /// behaviour. pub fn reset(&mut self) { self.state_helper = Default::default(); self.tstate = None; self.fparams = Default::default(); } } #[cfg(test)] mod tests { use std::{path::PathBuf, string::ToString}; use alloc::string::String; use spacepackets::{ cfdp::{ pdu::{finished::FinishedPduCreator, metadata::MetadataPduReader}, ChecksumType, CrcFlag, }, util::UnsignedByteFieldU16, }; use tempfile::TempPath; use super::*; use crate::{ cfdp::{ filestore::NativeFilestore, request::PutRequestOwned, tests::{ basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler, }, FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, CRC_32, }, seq_count::SeqCountProviderSimple, }; const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); fn init_full_filepaths_textfile() -> (TempPath, PathBuf) { ( tempfile::NamedTempFile::new().unwrap().into_temp_path(), tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(), ) } type TestSourceHandler = SourceHandler< TestCfdpSender, TestFaultHandler, NativeFilestore, StdRemoteEntityConfigProvider, SeqCountProviderSimple, >; struct SourceHandlerTestbench { handler: TestSourceHandler, srcfile: Option, destfile: Option, } impl SourceHandlerTestbench { fn new( crc_on_transmission_by_default: bool, test_fault_handler: TestFaultHandler, test_packet_sender: TestCfdpSender, ) -> Self { let local_entity_cfg = LocalEntityConfig { id: LOCAL_ID.into(), indication_cfg: IndicationConfig::default(), fault_handler: FaultHandler::new(test_fault_handler), }; let static_put_request_cacher = StaticPutRequestCacher::new(2048); Self { handler: SourceHandler::new( local_entity_cfg, test_packet_sender, NativeFilestore::default(), static_put_request_cacher, 1024, basic_remote_cfg_table(REMOTE_ID, crc_on_transmission_by_default), SeqCountProviderSimple::default(), ), srcfile: None, destfile: None, } } fn put_request( &mut self, put_request: &impl ReadablePutRequest, ) -> Result<(), PutRequestError> { if self.srcfile.is_some() || self.destfile.is_some() { self.srcfile = Some(put_request.source_file().unwrap().to_string()); self.destfile = Some(put_request.dest_file().unwrap().to_string()); } self.handler.put_request(put_request) } fn all_fault_queues_empty(&self) -> bool { self.handler .local_cfg .user_fault_hook() .borrow() .all_queues_empty() } fn pdu_queue_empty(&self) -> bool { self.handler.pdu_sender.queue_empty() } fn get_next_sent_pdu(&self) -> Option { self.handler.pdu_sender.retrieve_next_pdu() } fn common_pdu_check_for_file_transfer(&self, pdu_header: &PduHeader, crc_flag: CrcFlag) { assert_eq!( pdu_header.seg_ctrl(), SegmentationControl::NoRecordBoundaryPreservation ); assert_eq!( pdu_header.seg_metadata_flag(), SegmentMetadataFlag::NotPresent ); assert_eq!(pdu_header.common_pdu_conf().source_id(), LOCAL_ID.into()); assert_eq!(pdu_header.common_pdu_conf().dest_id(), REMOTE_ID.into()); assert_eq!(pdu_header.common_pdu_conf().crc_flag, crc_flag); assert_eq!( pdu_header.common_pdu_conf().trans_mode, TransmissionMode::Unacknowledged ); assert_eq!( pdu_header.common_pdu_conf().direction, Direction::TowardsReceiver ); assert_eq!( pdu_header.common_pdu_conf().file_flag, LargeFileFlag::Normal ); assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2); } } #[test] fn test_basic() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let tb = SourceHandlerTestbench::new(false, fault_handler, test_sender); assert!(tb.handler.transmission_mode().is_none()); assert!(tb.all_fault_queues_empty()); assert!(tb.pdu_queue_empty()); assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.step(), TransactionStep::Idle); } fn common_no_acked_file_transfer( tb: &mut SourceHandlerTestbench, cfdp_user: &mut TestCfdpUser, put_request: PutRequestOwned, srcfile_str: String, destfile_str: String, ) -> PduHeader { tb.handler .put_request(&put_request) .expect("put_request call failed"); assert_eq!(tb.handler.state(), State::Busy); assert_eq!(tb.handler.step(), TransactionStep::Idle); let sent_packets = tb .handler .state_machine(cfdp_user, None) .expect("source handler FSM failure"); assert_eq!(sent_packets, 2); assert!(!tb.pdu_queue_empty()); let next_pdu = tb.get_next_sent_pdu().unwrap(); assert!(!tb.pdu_queue_empty()); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type, Some(FileDirectiveType::MetadataPdu) ); let metadata_pdu = MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format"); let pdu_header = metadata_pdu.pdu_header(); tb.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc); assert_eq!( metadata_pdu .src_file_name() .value_as_str() .unwrap() .unwrap(), srcfile_str ); assert_eq!( metadata_pdu .dest_file_name() .value_as_str() .unwrap() .unwrap(), destfile_str ); assert_eq!(metadata_pdu.metadata_params().file_size, 0); assert_eq!( metadata_pdu.metadata_params().checksum_type, ChecksumType::Crc32 ); let closure_requested = if let Some(closure_requested) = put_request.closure_requested { assert_eq!( metadata_pdu.metadata_params().closure_requested, closure_requested ); closure_requested } else { assert!(metadata_pdu.metadata_params().closure_requested); metadata_pdu.metadata_params().closure_requested }; assert_eq!(metadata_pdu.options(), &[]); let next_pdu = tb.get_next_sent_pdu().unwrap(); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type, Some(FileDirectiveType::EofPdu) ); let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid metadata PDU format"); tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc); assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError); assert_eq!(eof_pdu.file_size(), 0); assert_eq!(eof_pdu.file_checksum(), CRC_32.digest().finalize()); assert_eq!( eof_pdu .pdu_header() .common_pdu_conf() .transaction_seq_num .value_const(), 0 ); if !closure_requested { assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.step(), TransactionStep::Idle); } else { assert_eq!(tb.handler.state(), State::Busy); assert_eq!(tb.handler.step(), TransactionStep::WaitingForFinished); } *pdu_header } #[test] fn test_empty_file_transfer_not_acked_no_closure() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender); let (srcfile, destfile) = init_full_filepaths_textfile(); let srcfile_str = String::from(srcfile.to_str().unwrap()); let destfile_str = String::from(destfile.to_str().unwrap()); let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &srcfile_str, &destfile_str, Some(TransmissionMode::Unacknowledged), Some(false), ) .expect("creating put request failed"); let mut cfdp_user = TestCfdpUser::new(0, srcfile_str.clone(), destfile_str.clone(), 0); common_no_acked_file_transfer( &mut tb, &mut cfdp_user, put_request, srcfile_str, destfile_str, ); } #[test] fn test_empty_file_transfer_not_acked_with_closure() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender); let (srcfile, destfile) = init_full_filepaths_textfile(); let srcfile_str = String::from(srcfile.to_str().unwrap()); let destfile_str = String::from(destfile.to_str().unwrap()); let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &srcfile_str, &destfile_str, Some(TransmissionMode::Unacknowledged), Some(true), ) .expect("creating put request failed"); let mut cfdp_user = TestCfdpUser::new(0, srcfile_str.clone(), destfile_str.clone(), 0); let pdu_header = common_no_acked_file_transfer( &mut tb, &mut cfdp_user, put_request, srcfile_str, destfile_str, ); let finished_pdu = FinishedPduCreator::new_default( pdu_header, DeliveryCode::Complete, FileStatus::Retained, ); let finished_pdu_vec = finished_pdu.to_vec().unwrap(); let packet_info = PacketInfo::new(&finished_pdu_vec).unwrap(); tb.handler .state_machine(&mut cfdp_user, Some(&packet_info)) .unwrap(); assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.step(), TransactionStep::Idle); } }