CFDP extracted to library #201

Closed
muellerr wants to merge 18 commits from continue-cfsp-source-handler into main
3 changed files with 381 additions and 177 deletions
Showing only changes of commit ed4b3ee858 - Show all commits

View File

@ -776,6 +776,7 @@ impl<
fn notice_of_suspension(&mut self) { fn notice_of_suspension(&mut self) {
// TODO: Implement suspension handling. // TODO: Implement suspension handling.
} }
fn abandon_transaction(&mut self) { fn abandon_transaction(&mut self) {
self.reset(); self.reset();
} }
@ -815,6 +816,10 @@ impl<
Ok(1) Ok(1)
} }
pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHook> {
&self.local_cfg
}
fn tstate(&self) -> &TransferState<CheckTimerProvider> { fn tstate(&self) -> &TransferState<CheckTimerProvider> {
&self.tparams.tstate &self.tparams.tstate
} }
@ -831,7 +836,7 @@ mod tests {
#[allow(unused_imports)] #[allow(unused_imports)]
use std::println; use std::println;
use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec}; use alloc::{sync::Arc, vec::Vec};
use rand::Rng; use rand::Rng;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
@ -844,148 +849,18 @@ mod tests {
use crate::cfdp::{ use crate::cfdp::{
filestore::NativeFilestore, filestore::NativeFilestore,
tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestFaultHandler}, tests::{
user::OwnedMetadataRecvdParams, basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler,
LOCAL_ID,
},
CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig, CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig,
StdRemoteEntityConfigProvider, CRC_32, StdRemoteEntityConfigProvider, CRC_32,
}; };
use super::*; use super::*;
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
pub struct FileSegmentRecvdParamsNoSegMetadata {
pub id: TransactionId,
pub offset: u64,
pub length: usize,
}
#[derive(Default)]
struct TestCfdpUser {
next_expected_seq_num: u64,
expected_full_src_name: String,
expected_full_dest_name: String,
expected_file_size: u64,
transaction_indication_call_count: u32,
eof_recvd_call_count: u32,
finished_indic_queue: VecDeque<TransactionFinishedParams>,
metadata_recv_queue: VecDeque<OwnedMetadataRecvdParams>,
file_seg_recvd_queue: VecDeque<FileSegmentRecvdParamsNoSegMetadata>,
}
impl TestCfdpUser {
fn new(
next_expected_seq_num: u64,
expected_full_src_name: String,
expected_full_dest_name: String,
expected_file_size: u64,
) -> Self {
Self {
next_expected_seq_num,
expected_full_src_name,
expected_full_dest_name,
expected_file_size,
transaction_indication_call_count: 0,
eof_recvd_call_count: 0,
finished_indic_queue: VecDeque::new(),
metadata_recv_queue: VecDeque::new(),
file_seg_recvd_queue: VecDeque::new(),
}
}
fn generic_id_check(&self, id: &crate::cfdp::TransactionId) {
assert_eq!(id.source_id, LOCAL_ID.into());
assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
}
}
impl CfdpUser for TestCfdpUser {
fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
self.transaction_indication_call_count += 1;
}
fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn transaction_finished_indication(
&mut self,
finished_params: &crate::cfdp::user::TransactionFinishedParams,
) {
self.generic_id_check(&finished_params.id);
self.finished_indic_queue.push_back(*finished_params);
}
fn metadata_recvd_indication(
&mut self,
md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
) {
self.generic_id_check(&md_recvd_params.id);
assert_eq!(
String::from(md_recvd_params.src_file_name),
self.expected_full_src_name
);
assert_eq!(
String::from(md_recvd_params.dest_file_name),
self.expected_full_dest_name
);
assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
assert_eq!(md_recvd_params.file_size, self.expected_file_size);
self.metadata_recv_queue.push_back(md_recvd_params.into());
}
fn file_segment_recvd_indication(
&mut self,
segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
) {
self.generic_id_check(&segment_recvd_params.id);
self.file_seg_recvd_queue
.push_back(FileSegmentRecvdParamsNoSegMetadata {
id: segment_recvd_params.id,
offset: segment_recvd_params.offset,
length: segment_recvd_params.length,
})
}
fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
fn suspended_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
self.eof_recvd_call_count += 1;
}
}
#[derive(Debug)] #[derive(Debug)]
struct TestCheckTimer { struct TestCheckTimer {
counter: Cell<u32>, counter: Cell<u32>,
@ -1066,7 +941,7 @@ mod tests {
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let dest_handler = let dest_handler =
default_dest_handler(fault_handler, test_sender, check_timer_expired.clone()); default_dest_handler(fault_handler, test_sender, check_timer_expired.clone());
let (src_path, dest_path) = init_full_filenames(); let (src_path, dest_path) = init_full_filepaths_textfile();
assert!(!Path::exists(&dest_path)); assert!(!Path::exists(&dest_path));
let handler = Self { let handler = Self {
check_timer_expired, check_timer_expired,
@ -1205,7 +1080,7 @@ mod tests {
} }
} }
fn init_full_filenames() -> (PathBuf, PathBuf) { fn init_full_filepaths_textfile() -> (PathBuf, PathBuf) {
( (
tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(), tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
tempfile::NamedTempFile::new() tempfile::NamedTempFile::new()
@ -1230,7 +1105,7 @@ mod tests {
2048, 2048,
test_packet_sender, test_packet_sender,
NativeFilestore::default(), NativeFilestore::default(),
basic_remote_cfg_table(), basic_remote_cfg_table(LOCAL_ID, true),
TestCheckTimerCreator::new(check_timer_expired), TestCheckTimerCreator::new(check_timer_expired),
) )
} }
@ -1297,6 +1172,9 @@ mod tests {
.user_hook .user_hook
.borrow() .borrow()
.all_queues_empty()); .all_queues_empty());
assert!(dest_handler.pdu_sender.queue_empty());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);
} }
#[test] #[test]

View File

@ -667,7 +667,7 @@ impl<'raw> PacketInfo<'raw> {
pub(crate) mod tests { pub(crate) mod tests {
use core::cell::RefCell; use core::cell::RefCell;
use alloc::{collections::VecDeque, vec::Vec}; use alloc::{collections::VecDeque, string::String, vec::Vec};
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
lv::Lv, lv::Lv,
@ -679,16 +679,150 @@ pub(crate) mod tests {
}, },
ChecksumType, ConditionCode, PduType, TransmissionMode, ChecksumType, ConditionCode, PduType, TransmissionMode,
}, },
util::UnsignedByteFieldU16, util::{UnsignedByteField, UnsignedByteFieldU16, UnsignedEnum},
}; };
use crate::cfdp::PacketTarget; use crate::cfdp::PacketTarget;
use super::{ use super::{
user::{CfdpUser, OwnedMetadataRecvdParams, TransactionFinishedParams},
PacketInfo, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, PacketInfo, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider,
StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider,
}; };
pub const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
pub struct FileSegmentRecvdParamsNoSegMetadata {
pub id: TransactionId,
pub offset: u64,
pub length: usize,
}
#[derive(Default)]
pub struct TestCfdpUser {
pub next_expected_seq_num: u64,
pub expected_full_src_name: String,
pub expected_full_dest_name: String,
pub expected_file_size: u64,
pub transaction_indication_call_count: u32,
pub eof_recvd_call_count: u32,
pub finished_indic_queue: VecDeque<TransactionFinishedParams>,
pub metadata_recv_queue: VecDeque<OwnedMetadataRecvdParams>,
pub file_seg_recvd_queue: VecDeque<FileSegmentRecvdParamsNoSegMetadata>,
}
impl TestCfdpUser {
pub fn new(
next_expected_seq_num: u64,
expected_full_src_name: String,
expected_full_dest_name: String,
expected_file_size: u64,
) -> Self {
Self {
next_expected_seq_num,
expected_full_src_name,
expected_full_dest_name,
expected_file_size,
transaction_indication_call_count: 0,
eof_recvd_call_count: 0,
finished_indic_queue: VecDeque::new(),
metadata_recv_queue: VecDeque::new(),
file_seg_recvd_queue: VecDeque::new(),
}
}
pub fn generic_id_check(&self, id: &crate::cfdp::TransactionId) {
assert_eq!(id.source_id, LOCAL_ID.into());
assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
}
}
impl CfdpUser for TestCfdpUser {
fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
self.transaction_indication_call_count += 1;
}
fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn transaction_finished_indication(
&mut self,
finished_params: &crate::cfdp::user::TransactionFinishedParams,
) {
self.generic_id_check(&finished_params.id);
self.finished_indic_queue.push_back(*finished_params);
}
fn metadata_recvd_indication(
&mut self,
md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
) {
self.generic_id_check(&md_recvd_params.id);
assert_eq!(
String::from(md_recvd_params.src_file_name),
self.expected_full_src_name
);
assert_eq!(
String::from(md_recvd_params.dest_file_name),
self.expected_full_dest_name
);
assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
assert_eq!(md_recvd_params.file_size, self.expected_file_size);
self.metadata_recv_queue.push_back(md_recvd_params.into());
}
fn file_segment_recvd_indication(
&mut self,
segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
) {
self.generic_id_check(&segment_recvd_params.id);
self.file_seg_recvd_queue
.push_back(FileSegmentRecvdParamsNoSegMetadata {
id: segment_recvd_params.id,
offset: segment_recvd_params.offset,
length: segment_recvd_params.length,
})
}
fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
fn suspended_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
self.eof_recvd_call_count += 1;
}
}
#[derive(Default)] #[derive(Default)]
pub(crate) struct TestFaultHandler { pub(crate) struct TestFaultHandler {
pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
@ -791,14 +925,17 @@ pub(crate) mod tests {
} }
} }
pub fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider { pub fn basic_remote_cfg_table(
dest_id: impl Into<UnsignedByteField>,
crc_on_transmission_by_default: bool,
) -> StdRemoteEntityConfigProvider {
let mut table = StdRemoteEntityConfigProvider::default(); let mut table = StdRemoteEntityConfigProvider::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
UnsignedByteFieldU16::new(1).into(), dest_id.into(),
1024, 1024,
1024, 1024,
true, true,
true, crc_on_transmission_by_default,
TransmissionMode::Unacknowledged, TransmissionMode::Unacknowledged,
ChecksumType::Crc32, ChecksumType::Crc32,
); );

View File

@ -1,4 +1,5 @@
use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error}; use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error};
use std::println;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
@ -118,6 +119,8 @@ pub enum PutRequestError {
Storage(#[from] ByteConversionError), Storage(#[from] ByteConversionError),
#[error("already busy with put request")] #[error("already busy with put request")]
AlreadyBusy, AlreadyBusy,
#[error("no remote entity configuration found for {0:?}")]
NoRemoteCfgFound(UnsignedByteField),
} }
pub struct SourceHandler< pub struct SourceHandler<
@ -258,6 +261,9 @@ impl<
); );
if remote_cfg.is_none() { if remote_cfg.is_none() {
// TODO: Specific error. // TODO: Specific error.
return Err(PutRequestError::NoRemoteCfgFound(
self.put_request_cacher.static_fields.destination_id,
));
} }
let remote_cfg = remote_cfg.unwrap(); let remote_cfg = remote_cfg.unwrap();
self.state_helper.num_packets_ready = 0; self.state_helper.num_packets_ready = 0;
@ -281,7 +287,7 @@ impl<
}; };
self.tstate = Some(TransferState::new( self.tstate = Some(TransferState::new(
TransactionId::new( TransactionId::new(
self.put_request_cacher.static_fields.destination_id, self.local_cfg().id,
UnsignedByteField::new( UnsignedByteField::new(
SeqCountProvider::MAX_BIT_WIDTH / 8, SeqCountProvider::MAX_BIT_WIDTH / 8,
self.seq_count_provider.get_and_increment().into(), self.seq_count_provider.get_and_increment().into(),
@ -302,6 +308,7 @@ impl<
} }
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> { fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> {
let mut sent_packets = 0;
if self.state_helper.step == TransactionStep::Idle { if self.state_helper.step == TransactionStep::Idle {
self.state_helper.step = TransactionStep::TransactionStart; self.state_helper.step = TransactionStep::TransactionStart;
} }
@ -312,7 +319,8 @@ impl<
if self.state_helper.step == TransactionStep::SendingMetadata { if self.state_helper.step == TransactionStep::SendingMetadata {
self.prepare_and_send_metadata_pdu()?; self.prepare_and_send_metadata_pdu()?;
self.state_helper.step = TransactionStep::SendingFileData; self.state_helper.step = TransactionStep::SendingFileData;
return Ok(1); sent_packets += 1;
// return Ok(1);
} }
if self.state_helper.step == TransactionStep::SendingFileData { if self.state_helper.step == TransactionStep::SendingFileData {
if let ControlFlow::Break(result) = self.file_data_fsm()? { if let ControlFlow::Break(result) = self.file_data_fsm()? {
@ -321,7 +329,8 @@ impl<
} }
if self.state_helper.step == TransactionStep::SendingEof { if self.state_helper.step == TransactionStep::SendingEof {
self.eof_fsm(cfdp_user)?; self.eof_fsm(cfdp_user)?;
return Ok(1); sent_packets += 1;
// return Ok(1);
} }
if self.state_helper.step == TransactionStep::WaitingForFinished { if self.state_helper.step == TransactionStep::WaitingForFinished {
/* /*
@ -373,7 +382,7 @@ impl<
self.reset() self.reset()
*/ */
} }
Ok(0) Ok(sent_packets)
} }
fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> { fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> {
@ -431,7 +440,6 @@ impl<
.expect("transfer state unexpectedly empty"); .expect("transfer state unexpectedly empty");
if !self.put_request_cacher.has_source_file() { if !self.put_request_cacher.has_source_file() {
self.fparams.metadata_only = true; self.fparams.metadata_only = true;
self.fparams.empty_file = true;
} else { } else {
let source_file = self let source_file = self
.put_request_cacher .put_request_cacher
@ -446,9 +454,13 @@ impl<
self.put_request_cacher self.put_request_cacher
.dest_file() .dest_file()
.map_err(SourceError::DestFileNotValidUtf8)?; .map_err(SourceError::DestFileNotValidUtf8)?;
if self.vfs.file_size(source_file)? > u32::MAX as u64 { let file_size = self.vfs.file_size(source_file)?;
if file_size > u32::MAX as u64 {
self.pdu_conf.file_flag = LargeFileFlag::Large self.pdu_conf.file_flag = LargeFileFlag::Large
} else { } else {
if file_size == 0 {
self.fparams.empty_file = true;
}
self.pdu_conf.file_flag = LargeFileFlag::Normal self.pdu_conf.file_flag = LargeFileFlag::Normal
} }
} }
@ -463,7 +475,7 @@ impl<
if larger_entity_width != cached_id.size() { if larger_entity_width != cached_id.size() {
UnsignedByteField::new(larger_entity_width, cached_id.value_const()) UnsignedByteField::new(larger_entity_width, cached_id.value_const())
} else { } else {
self.local_cfg.id *cached_id
} }
}; };
self.pdu_conf self.pdu_conf
@ -662,8 +674,8 @@ impl<
let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut(); let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut();
let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?; let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?;
self.pdu_sender.send_pdu( self.pdu_sender.send_pdu(
PduType::FileDirective, pdu.pdu_type(),
Some(FileDirectiveType::MetadataPdu), pdu.file_directive_type(),
&pdu_buffer_mut[0..written_len], &pdu_buffer_mut[0..written_len],
)?; )?;
Ok(()) Ok(())
@ -708,6 +720,21 @@ impl<
fn handle_keep_alive_pdu(&mut self) {} fn handle_keep_alive_pdu(&mut self) {}
/// Get the step, which denotes the exact step of a pending CFDP transaction when applicable.
pub fn step(&self) -> TransactionStep {
self.state_helper.step
}
/// Get the step, which denotes whether the CFDP handler is active, and which CFDP class
/// is used if it is active.
pub fn state(&self) -> State {
self.state_helper.state
}
pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHook> {
&self.local_cfg
}
/// This function is public to allow completely resetting the handler, but it is explicitely /// This function is public to allow completely resetting the handler, but it is explicitely
/// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself.
/// Resetting the handler might interfere with these mechanisms and lead to unexpected /// Resetting the handler might interfere with these mechanisms and lead to unexpected
@ -721,14 +748,24 @@ impl<
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use spacepackets::util::UnsignedByteFieldU16; use std::path::PathBuf;
use alloc::string::String;
use spacepackets::{
cfdp::{pdu::metadata::MetadataPduReader, ChecksumType, CrcFlag},
util::UnsignedByteFieldU16,
};
use tempfile::TempPath;
use super::*; use super::*;
use crate::{ use crate::{
cfdp::{ cfdp::{
filestore::NativeFilestore, filestore::NativeFilestore,
tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler}, request::PutRequestOwned,
FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, tests::{
basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler,
},
FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, CRC_32,
}, },
seq_count::SeqCountProviderSimple, seq_count::SeqCountProviderSimple,
}; };
@ -736,6 +773,13 @@ mod tests {
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
fn init_full_filepaths_textfile() -> (TempPath, PathBuf) {
(
tempfile::NamedTempFile::new().unwrap().into_temp_path(),
tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
)
}
type TestSourceHandler = SourceHandler< type TestSourceHandler = SourceHandler<
TestCfdpSender, TestCfdpSender,
TestFaultHandler, TestFaultHandler,
@ -744,33 +788,178 @@ mod tests {
SeqCountProviderSimple<u16>, SeqCountProviderSimple<u16>,
>; >;
fn default_source_handler( struct SourceHandlerTestbench {
handler: TestSourceHandler,
// src_path: PathBuf,
// dest_path: PathBuf,
}
impl SourceHandlerTestbench {
fn new(
crc_on_transmission_by_default: bool,
test_fault_handler: TestFaultHandler, test_fault_handler: TestFaultHandler,
test_packet_sender: TestCfdpSender, test_packet_sender: TestCfdpSender,
) -> TestSourceHandler { ) -> Self {
let local_entity_cfg = LocalEntityConfig { let local_entity_cfg = LocalEntityConfig {
id: REMOTE_ID.into(), id: LOCAL_ID.into(),
indication_cfg: IndicationConfig::default(), indication_cfg: IndicationConfig::default(),
fault_handler: FaultHandler::new(test_fault_handler), fault_handler: FaultHandler::new(test_fault_handler),
}; };
let static_put_request_cacher = StaticPutRequestCacher::new(1024); let static_put_request_cacher = StaticPutRequestCacher::new(2048);
SourceHandler::new( Self {
handler: SourceHandler::new(
local_entity_cfg, local_entity_cfg,
test_packet_sender, test_packet_sender,
NativeFilestore::default(), NativeFilestore::default(),
static_put_request_cacher, static_put_request_cacher,
1024, 1024,
basic_remote_cfg_table(), basic_remote_cfg_table(REMOTE_ID, crc_on_transmission_by_default),
SeqCountProviderSimple::default(), SeqCountProviderSimple::default(),
) ),
}
}
fn all_fault_queues_empty(&self) -> bool {
self.handler
.local_cfg
.user_fault_hook()
.borrow()
.all_queues_empty()
}
fn pdu_queue_empty(&self) -> bool {
self.handler.pdu_sender.queue_empty()
}
fn get_next_sent_pdu(&self) -> Option<SentPdu> {
self.handler.pdu_sender.retrieve_next_pdu()
}
} }
#[test] #[test]
fn test_basic() { fn test_basic() {
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let source_handler = default_source_handler(fault_handler, test_sender); let tb = SourceHandlerTestbench::new(false, fault_handler, test_sender);
assert!(source_handler.transmission_mode().is_none()); assert!(tb.handler.transmission_mode().is_none());
// assert!(fault_handler.all_queues_empty()); assert!(tb.all_fault_queues_empty());
assert!(tb.pdu_queue_empty());
assert_eq!(tb.handler.state(), State::Idle);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
}
#[test]
fn test_empty_file_transfer_not_acked_no_closure() {
let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default();
let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender);
let (srcfile, destfile) = init_full_filepaths_textfile();
let srcfile_str = String::from(srcfile.to_str().unwrap());
let destfile_str = String::from(destfile.to_str().unwrap());
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
&srcfile_str,
&destfile_str,
Some(TransmissionMode::Unacknowledged),
Some(false),
)
.expect("creating put request failed");
tb.handler
.put_request(&put_request)
.expect("put_request call failed");
assert_eq!(tb.handler.state(), State::Busy);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
let mut cfdp_user = TestCfdpUser::new(0, srcfile_str.clone(), destfile_str.clone(), 0);
let sent_packets = tb
.handler
.state_machine(&mut cfdp_user, None)
.expect("source handler FSM failure");
assert_eq!(sent_packets, 2);
assert!(!tb.pdu_queue_empty());
let next_pdu = tb.get_next_sent_pdu().unwrap();
assert!(!tb.pdu_queue_empty());
assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
assert_eq!(
next_pdu.file_directive_type,
Some(FileDirectiveType::MetadataPdu)
);
let metadata_pdu =
MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
assert_eq!(
metadata_pdu
.src_file_name()
.value_as_str()
.unwrap()
.unwrap(),
srcfile_str
);
assert_eq!(
metadata_pdu
.dest_file_name()
.value_as_str()
.unwrap()
.unwrap(),
destfile_str
);
assert_eq!(metadata_pdu.metadata_params().file_size, 0);
assert_eq!(
metadata_pdu.metadata_params().checksum_type,
ChecksumType::Crc32
);
assert!(!metadata_pdu.metadata_params().closure_requested);
assert_eq!(metadata_pdu.options(), &[]);
let next_pdu = tb.get_next_sent_pdu().unwrap();
assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
assert_eq!(
next_pdu.file_directive_type,
Some(FileDirectiveType::EofPdu)
);
let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError);
assert_eq!(eof_pdu.file_size(), 0);
assert_eq!(eof_pdu.file_checksum(), CRC_32.digest().finalize());
assert_eq!(
eof_pdu.pdu_header().common_pdu_conf().source_id(),
LOCAL_ID.into()
);
assert_eq!(
eof_pdu.pdu_header().common_pdu_conf().dest_id(),
REMOTE_ID.into()
);
assert_eq!(
eof_pdu.pdu_header().common_pdu_conf().crc_flag,
CrcFlag::NoCrc
);
assert_eq!(
eof_pdu.pdu_header().common_pdu_conf().direction,
Direction::TowardsReceiver
);
assert_eq!(
eof_pdu.pdu_header().common_pdu_conf().file_flag,
LargeFileFlag::Normal
);
assert_eq!(
eof_pdu.pdu_header().common_pdu_conf().trans_mode,
TransmissionMode::Unacknowledged
);
assert_eq!(
eof_pdu
.pdu_header()
.common_pdu_conf()
.transaction_seq_num
.value_const(),
0
);
assert_eq!(
eof_pdu
.pdu_header()
.common_pdu_conf()
.transaction_seq_num
.size(),
2
);
assert_eq!(tb.handler.state(), State::Idle);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
} }
} }