some more unittest cleanups
Some checks failed
Rust/cfdp/pipeline/head There was a failure building this commit

This commit is contained in:
Robin Müller 2024-08-23 14:00:17 +02:00
parent 480fbb69eb
commit 53078b5760
4 changed files with 348 additions and 329 deletions

View File

@ -552,8 +552,6 @@ impl<
|| self.tparams.tstate.metadata_only || self.tparams.tstate.metadata_only
{ {
file_delivery_complete = true; file_delivery_complete = true;
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
self.tparams.tstate.condition_code = ConditionCode::NoError;
} else { } else {
match self.vfs.checksum_verify( match self.vfs.checksum_verify(
self.tparams.file_properties.dest_path_buf.to_str().unwrap(), self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
@ -578,6 +576,10 @@ impl<
}, },
}; };
} }
if file_delivery_complete {
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
self.tparams.tstate.condition_code = ConditionCode::NoError;
}
file_delivery_complete file_delivery_complete
} }
@ -849,9 +851,9 @@ impl<
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use core::{cell::Cell, sync::atomic::AtomicBool}; use core::{cell::Cell, sync::atomic::AtomicBool};
use std::fs;
#[allow(unused_imports)] #[allow(unused_imports)]
use std::println; use std::println;
use std::{fs, string::String};
use alloc::{sync::Arc, vec::Vec}; use alloc::{sync::Arc, vec::Vec};
use rand::Rng; use rand::Rng;
@ -938,7 +940,7 @@ mod tests {
TestCheckTimer, TestCheckTimer,
>; >;
struct DestHandlerTester { struct DestHandlerTestbench {
check_timer_expired: Arc<AtomicBool>, check_timer_expired: Arc<AtomicBool>,
handler: TestDestHandler, handler: TestDestHandler,
src_path: PathBuf, src_path: PathBuf,
@ -952,7 +954,7 @@ mod tests {
buf: [u8; 512], buf: [u8; 512],
} }
impl DestHandlerTester { impl DestHandlerTestbench {
fn new(fault_handler: TestFaultHandler, closure_requested: bool) -> Self { fn new(fault_handler: TestFaultHandler, closure_requested: bool) -> Self {
let check_timer_expired = Arc::new(AtomicBool::new(false)); let check_timer_expired = Arc::new(AtomicBool::new(false));
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
@ -967,7 +969,7 @@ mod tests {
closure_requested, closure_requested,
dest_path, dest_path,
check_dest_file: false, check_dest_file: false,
check_handler_idle_at_drop: false, check_handler_idle_at_drop: true,
expected_file_size: 0, expected_file_size: 0,
pdu_header: create_pdu_header(UbfU16::new(0)), pdu_header: create_pdu_header(UbfU16::new(0)),
expected_full_data: Vec::new(), expected_full_data: Vec::new(),
@ -1018,6 +1020,8 @@ mod tests {
file_size: u64, file_size: u64,
) -> Result<TransactionId, DestError> { ) -> Result<TransactionId, DestError> {
self.expected_file_size = file_size; self.expected_file_size = file_size;
assert_eq!(user.transaction_indication_call_count, 0);
assert_eq!(user.metadata_recv_queue.len(), 0);
let metadata_pdu = create_metadata_pdu( let metadata_pdu = create_metadata_pdu(
&self.pdu_header, &self.pdu_header,
self.src_path.as_path(), self.src_path.as_path(),
@ -1032,6 +1036,21 @@ mod tests {
self.handler.transmission_mode().unwrap(), self.handler.transmission_mode().unwrap(),
TransmissionMode::Unacknowledged TransmissionMode::Unacknowledged
); );
assert_eq!(user.transaction_indication_call_count, 0);
assert_eq!(user.metadata_recv_queue.len(), 1);
let metadata_recvd = user.metadata_recv_queue.pop_front().unwrap();
assert_eq!(metadata_recvd.source_id, LOCAL_ID.into());
assert_eq!(
metadata_recvd.src_file_name,
String::from(self.src_path.to_str().unwrap())
);
assert_eq!(
metadata_recvd.dest_file_name,
String::from(self.dest_path().to_str().unwrap())
);
assert_eq!(metadata_recvd.id, self.handler.transaction_id().unwrap());
assert_eq!(metadata_recvd.file_size, file_size);
assert!(metadata_recvd.msgs_to_user.is_empty());
Ok(self.handler.transaction_id().unwrap()) Ok(self.handler.transaction_id().unwrap())
} }
@ -1065,6 +1084,7 @@ mod tests {
expected_full_data: Vec<u8>, expected_full_data: Vec<u8>,
) -> Result<u32, DestError> { ) -> Result<u32, DestError> {
self.expected_full_data = expected_full_data; self.expected_full_data = expected_full_data;
assert_eq!(user.finished_indic_queue.len(), 0);
let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header); let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header);
let packet_info = create_packet_info(&eof_pdu, &mut self.buf); let packet_info = create_packet_info(&eof_pdu, &mut self.buf);
self.check_handler_idle_at_drop = true; self.check_handler_idle_at_drop = true;
@ -1076,14 +1096,29 @@ mod tests {
result result
} }
fn check_completion_indication_success(&mut self, user: &mut TestCfdpUser) {
assert_eq!(user.finished_indic_queue.len(), 1);
let finished_indication = user.finished_indic_queue.pop_front().unwrap();
assert_eq!(
finished_indication.id,
self.handler.transaction_id().unwrap()
);
assert_eq!(finished_indication.file_status, FileStatus::Retained);
assert_eq!(finished_indication.delivery_code, DeliveryCode::Complete);
assert_eq!(finished_indication.condition_code, ConditionCode::NoError);
}
fn state_check(&self, state: State, step: TransactionStep) { fn state_check(&self, state: State, step: TransactionStep) {
assert_eq!(self.handler.state(), state); assert_eq!(self.handler.state(), state);
assert_eq!(self.handler.step(), step); assert_eq!(self.handler.step(), step);
} }
} }
impl Drop for DestHandlerTester { // Specifying some checks in the drop method avoids some boilerplate.
impl Drop for DestHandlerTestbench {
fn drop(&mut self) { fn drop(&mut self) {
assert!(self.all_fault_queues_empty());
assert!(self.handler.pdu_sender.queue_empty());
if self.check_handler_idle_at_drop { if self.check_handler_idle_at_drop {
self.state_check(State::Idle, TransactionStep::Idle); self.state_check(State::Idle, TransactionStep::Idle);
} }
@ -1197,18 +1232,14 @@ mod tests {
#[test] #[test]
fn test_empty_file_transfer_not_acked_no_closure() { fn test_empty_file_transfer_not_acked_no_closure() {
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler, false); let mut tb = DestHandlerTestbench::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(0); let mut test_user = tb.test_user_from_cached_paths(0);
testbench tb.generic_transfer_init(&mut test_user, 0)
.generic_transfer_init(&mut test_user, 0)
.expect("transfer init failed"); .expect("transfer init failed");
testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
testbench tb.generic_eof_no_error(&mut test_user, Vec::new())
.generic_eof_no_error(&mut test_user, Vec::new())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert!(testbench.all_fault_queues_empty()); tb.check_completion_indication_success(&mut test_user);
assert!(testbench.handler.pdu_sender.queue_empty());
testbench.state_check(State::Idle, TransactionStep::Idle);
} }
#[test] #[test]
@ -1218,21 +1249,16 @@ mod tests {
let file_size = file_data.len() as u64; let file_size = file_data.len() as u64;
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler, false); let mut tb = DestHandlerTestbench::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size); let mut test_user = tb.test_user_from_cached_paths(file_size);
testbench tb.generic_transfer_init(&mut test_user, file_size)
.generic_transfer_init(&mut test_user, file_size)
.expect("transfer init failed"); .expect("transfer init failed");
testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
testbench tb.generic_file_data_insert(&mut test_user, 0, file_data)
.generic_file_data_insert(&mut test_user, 0, file_data)
.expect("file data insertion failed"); .expect("file data insertion failed");
testbench tb.generic_eof_no_error(&mut test_user, file_data.to_vec())
.generic_eof_no_error(&mut test_user, file_data.to_vec())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert!(testbench.all_fault_queues_empty()); tb.check_completion_indication_success(&mut test_user);
assert!(testbench.handler.pdu_sender.queue_empty());
testbench.state_check(State::Idle, TransactionStep::Idle);
} }
#[test] #[test]
@ -1244,28 +1270,22 @@ mod tests {
let segment_len = 256; let segment_len = 256;
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler, false); let mut tb = DestHandlerTestbench::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size); let mut test_user = tb.test_user_from_cached_paths(file_size);
testbench tb.generic_transfer_init(&mut test_user, file_size)
.generic_transfer_init(&mut test_user, file_size)
.expect("transfer init failed"); .expect("transfer init failed");
testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
testbench tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
.expect("file data insertion failed"); .expect("file data insertion failed");
testbench tb.generic_file_data_insert(
.generic_file_data_insert( &mut test_user,
&mut test_user, segment_len as u64,
segment_len as u64, &random_data[segment_len..],
&random_data[segment_len..], )
) .expect("file data insertion failed");
.expect("file data insertion failed"); tb.generic_eof_no_error(&mut test_user, random_data.to_vec())
testbench
.generic_eof_no_error(&mut test_user, random_data.to_vec())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert!(testbench.all_fault_queues_empty()); tb.check_completion_indication_success(&mut test_user);
assert!(testbench.handler.pdu_sender.queue_empty());
testbench.state_check(State::Idle, TransactionStep::Idle);
} }
#[test] #[test]
@ -1277,44 +1297,41 @@ mod tests {
let segment_len = 256; let segment_len = 256;
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler, false); let mut tb = DestHandlerTestbench::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size); let mut test_user = tb.test_user_from_cached_paths(file_size);
let transaction_id = testbench let transaction_id = tb
.generic_transfer_init(&mut test_user, file_size) .generic_transfer_init(&mut test_user, file_size)
.expect("transfer init failed"); .expect("transfer init failed");
testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
testbench tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
.expect("file data insertion 0 failed"); .expect("file data insertion 0 failed");
testbench tb.generic_eof_no_error(&mut test_user, random_data.to_vec())
.generic_eof_no_error(&mut test_user, random_data.to_vec())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
testbench.state_check( tb.state_check(
State::Busy, State::Busy,
TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
); );
testbench tb.generic_file_data_insert(
.generic_file_data_insert( &mut test_user,
&mut test_user, segment_len as u64,
segment_len as u64, &random_data[segment_len..],
&random_data[segment_len..], )
) .expect("file data insertion 1 failed");
.expect("file data insertion 1 failed"); tb.set_check_timer_expired();
testbench.set_check_timer_expired(); tb.handler
testbench
.handler
.state_machine_no_packet(&mut test_user) .state_machine_no_packet(&mut test_user)
.expect("fsm failure"); .expect("fsm failure");
let fault_handler = testbench.handler.local_cfg.fault_handler.user_hook.borrow(); let mut fault_handler = tb.handler.local_cfg.fault_handler.user_hook.borrow_mut();
assert_eq!(fault_handler.ignored_queue.len(), 1); assert_eq!(fault_handler.ignored_queue.len(), 1);
let cancelled = fault_handler.ignored_queue.front().unwrap(); let cancelled = fault_handler.ignored_queue.pop_front().unwrap();
assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
assert_eq!(cancelled.2, segment_len as u64); assert_eq!(cancelled.2, segment_len as u64);
assert!(testbench.handler.pdu_sender.queue_empty()); drop(fault_handler);
testbench.state_check(State::Idle, TransactionStep::Idle);
tb.check_completion_indication_success(&mut test_user);
} }
#[test] #[test]
@ -1326,7 +1343,7 @@ mod tests {
let segment_len = 256; let segment_len = 256;
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler, false); let mut testbench = DestHandlerTestbench::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size); let mut test_user = testbench.test_user_from_cached_paths(file_size);
let transaction_id = testbench let transaction_id = testbench
.generic_transfer_init(&mut test_user, file_size) .generic_transfer_init(&mut test_user, file_size)
@ -1359,20 +1376,18 @@ mod tests {
.expect("fsm error"); .expect("fsm error");
testbench.state_check(State::Idle, TransactionStep::Idle); testbench.state_check(State::Idle, TransactionStep::Idle);
let fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow(); let mut fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow_mut();
assert!(fault_hook.notice_of_suspension_queue.is_empty()); assert!(fault_hook.notice_of_suspension_queue.is_empty());
let ignored_queue = &fault_hook.ignored_queue; let ignored_queue = &mut fault_hook.ignored_queue;
assert_eq!(ignored_queue.len(), 1); assert_eq!(ignored_queue.len(), 1);
let cancelled = ignored_queue.front().unwrap(); let cancelled = ignored_queue.pop_front().unwrap();
assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
assert_eq!(cancelled.2, segment_len as u64); assert_eq!(cancelled.2, segment_len as u64);
let fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow(); let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue;
let cancelled_queue = &fault_hook.notice_of_cancellation_queue;
assert_eq!(cancelled_queue.len(), 1); assert_eq!(cancelled_queue.len(), 1);
let cancelled = *cancelled_queue.front().unwrap(); let cancelled = cancelled_queue.pop_front().unwrap();
assert_eq!(cancelled.0, transaction_id); assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); assert_eq!(cancelled.1, ConditionCode::CheckLimitReached);
assert_eq!(cancelled.2, segment_len as u64); assert_eq!(cancelled.2, segment_len as u64);
@ -1407,22 +1422,22 @@ mod tests {
#[test] #[test]
fn test_file_transfer_with_closure() { fn test_file_transfer_with_closure() {
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler, true); let mut tb = DestHandlerTestbench::new(fault_handler, true);
let mut test_user = testbench.test_user_from_cached_paths(0); let mut test_user = tb.test_user_from_cached_paths(0);
testbench tb.generic_transfer_init(&mut test_user, 0)
.generic_transfer_init(&mut test_user, 0)
.expect("transfer init failed"); .expect("transfer init failed");
testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
let sent_packets = testbench let sent_packets = tb
.generic_eof_no_error(&mut test_user, Vec::new()) .generic_eof_no_error(&mut test_user, Vec::new())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
assert!(testbench.all_fault_queues_empty()); assert!(tb.all_fault_queues_empty());
// The Finished PDU was sent, so the state machine is done. // The Finished PDU was sent, so the state machine is done.
testbench.state_check(State::Idle, TransactionStep::Idle); tb.state_check(State::Idle, TransactionStep::Idle);
assert!(!testbench.handler.pdu_sender.queue_empty()); assert!(!tb.handler.pdu_sender.queue_empty());
let sent_pdu = testbench.handler.pdu_sender.retrieve_next_pdu().unwrap(); let sent_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap();
check_finished_pdu_success(&sent_pdu); check_finished_pdu_success(&sent_pdu);
tb.check_completion_indication_success(&mut test_user);
} }
#[test] #[test]

View File

@ -1027,7 +1027,7 @@ pub(crate) mod tests {
} }
} }
#[derive(Default)] #[derive(Default, Debug)]
pub(crate) struct TestFaultHandler { pub(crate) struct TestFaultHandler {
pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>, pub notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>,

View File

@ -410,25 +410,6 @@ impl<
if self.state_helper.step == TransactionStep::NoticeOfCompletion { if self.state_helper.step == TransactionStep::NoticeOfCompletion {
self.notice_of_completion(cfdp_user); self.notice_of_completion(cfdp_user);
self.reset(); self.reset();
/*
def _notice_of_completion(self):
if self.cfg.indication_cfg.transaction_finished_indication_required:
assert self._params.transaction_id is not None
# This happens for unacknowledged file copy operation with no closure.
if self._params.finished_params is None:
self._params.finished_params = FinishedParams(
condition_code=ConditionCode.NO_ERROR,
delivery_code=DeliveryCode.DATA_COMPLETE,
file_status=FileStatus.FILE_STATUS_UNREPORTED,
)
indication_params = TransactionFinishedParams(
transaction_id=self._params.transaction_id,
finished_params=self._params.finished_params,
)
self.user.transaction_finished_indication(indication_params)
# Transaction finished
self.reset()
*/
} }
Ok(sent_packets) Ok(sent_packets)
} }
@ -598,6 +579,25 @@ impl<
} }
fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) { fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
/*
def _notice_of_completion(self):
if self.cfg.indication_cfg.transaction_finished_indication_required:
assert self._params.transaction_id is not None
# This happens for unacknowledged file copy operation with no closure.
if self._params.finished_params is None:
self._params.finished_params = FinishedParams(
condition_code=ConditionCode.NO_ERROR,
delivery_code=DeliveryCode.DATA_COMPLETE,
file_status=FileStatus.FILE_STATUS_UNREPORTED,
)
indication_params = TransactionFinishedParams(
transaction_id=self._params.transaction_id,
finished_params=self._params.finished_params,
)
self.user.transaction_finished_indication(indication_params)
# Transaction finished
self.reset()
*/
let tstate = self.tstate.as_ref().unwrap(); let tstate = self.tstate.as_ref().unwrap();
if self.local_cfg.indication_cfg.transaction_finished { if self.local_cfg.indication_cfg.transaction_finished {
// The first case happens for unacknowledged file copy operation with no closure. // The first case happens for unacknowledged file copy operation with no closure.
@ -944,6 +944,206 @@ mod tests {
); );
assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2); assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2);
} }
fn generic_file_transfer(
&mut self,
cfdp_user: &mut TestCfdpUser,
with_closure: bool,
file_data: Vec<u8>,
) -> (PduHeader, u32) {
let mut digest = CRC_32.digest();
digest.update(&file_data);
let checksum = digest.finalize();
cfdp_user.expected_full_src_name = self.srcfile.clone();
cfdp_user.expected_full_dest_name = self.destfile.clone();
cfdp_user.expected_file_size = file_data.len() as u64;
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
&self.srcfile,
&self.destfile,
Some(TransmissionMode::Unacknowledged),
Some(with_closure),
)
.expect("creating put request failed");
let (closure_requested, pdu_header) = self.common_no_acked_file_transfer(
cfdp_user,
put_request,
cfdp_user.expected_file_size,
);
let mut current_offset = 0;
let chunks = file_data.chunks(
calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&pdu_header,
self.max_packet_len,
None,
),
);
let mut fd_pdus = 0;
for segment in chunks {
self.check_next_file_pdu(current_offset, segment);
self.handler.state_machine_no_packet(cfdp_user).unwrap();
fd_pdus += 1;
current_offset += segment.len() as u64;
}
self.common_eof_pdu_check(
cfdp_user,
closure_requested,
cfdp_user.expected_file_size,
checksum,
);
(pdu_header, fd_pdus)
}
// Returns a tuple. First parameter: Closure requested. Second parameter: PDU header of
// metadata PDU.
fn common_no_acked_file_transfer(
&mut self,
cfdp_user: &mut TestCfdpUser,
put_request: PutRequestOwned,
filesize: u64,
) -> (bool, PduHeader) {
assert_eq!(cfdp_user.transaction_indication_call_count, 0);
assert_eq!(cfdp_user.eof_sent_call_count, 0);
self.put_request(&put_request)
.expect("put_request call failed");
assert_eq!(self.handler.state(), State::Busy);
assert_eq!(self.handler.step(), TransactionStep::Idle);
let sent_packets = self
.handler
.state_machine_no_packet(cfdp_user)
.expect("source handler FSM failure");
assert_eq!(sent_packets, 2);
assert!(!self.pdu_queue_empty());
let next_pdu = self.get_next_sent_pdu().unwrap();
assert!(!self.pdu_queue_empty());
assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
assert_eq!(
next_pdu.file_directive_type,
Some(FileDirectiveType::MetadataPdu)
);
let metadata_pdu =
MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
let pdu_header = metadata_pdu.pdu_header();
self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc);
assert_eq!(
metadata_pdu
.src_file_name()
.value_as_str()
.unwrap()
.unwrap(),
self.srcfile
);
assert_eq!(
metadata_pdu
.dest_file_name()
.value_as_str()
.unwrap()
.unwrap(),
self.destfile
);
assert_eq!(metadata_pdu.metadata_params().file_size, filesize);
assert_eq!(
metadata_pdu.metadata_params().checksum_type,
ChecksumType::Crc32
);
let closure_requested = if let Some(closure_requested) = put_request.closure_requested {
assert_eq!(
metadata_pdu.metadata_params().closure_requested,
closure_requested
);
closure_requested
} else {
assert!(metadata_pdu.metadata_params().closure_requested);
metadata_pdu.metadata_params().closure_requested
};
assert_eq!(metadata_pdu.options(), &[]);
(closure_requested, *pdu_header)
}
fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) {
let next_pdu = self.get_next_sent_pdu().unwrap();
assert_eq!(next_pdu.pdu_type, PduType::FileData);
assert!(next_pdu.file_directive_type.is_none());
let fd_pdu =
FileDataPdu::from_bytes(&next_pdu.raw_pdu).expect("reading file data PDU failed");
assert_eq!(fd_pdu.offset(), expected_offset);
assert_eq!(fd_pdu.file_data(), expected_data);
assert!(fd_pdu.segment_metadata().is_none());
}
fn common_eof_pdu_check(
&mut self,
cfdp_user: &mut TestCfdpUser,
closure_requested: bool,
filesize: u64,
checksum: u32,
) {
let next_pdu = self.get_next_sent_pdu().unwrap();
assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
assert_eq!(
next_pdu.file_directive_type,
Some(FileDirectiveType::EofPdu)
);
let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError);
assert_eq!(eof_pdu.file_size(), filesize);
assert_eq!(eof_pdu.file_checksum(), checksum);
assert_eq!(
eof_pdu
.pdu_header()
.common_pdu_conf()
.transaction_seq_num
.value_const(),
0
);
if !closure_requested {
assert_eq!(self.handler.state(), State::Idle);
assert_eq!(self.handler.step(), TransactionStep::Idle);
} else {
assert_eq!(self.handler.state(), State::Busy);
assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished);
}
assert_eq!(cfdp_user.transaction_indication_call_count, 1);
assert_eq!(cfdp_user.eof_sent_call_count, 1);
self.all_fault_queues_empty();
}
fn common_tiny_file_transfer(
&mut self,
cfdp_user: &mut TestCfdpUser,
with_closure: bool,
) -> PduHeader {
let mut file = OpenOptions::new()
.write(true)
.open(&self.srcfile)
.expect("opening file failed");
let content_str = "Hello World!";
file.write_all(content_str.as_bytes())
.expect("writing file content failed");
drop(file);
let (pdu_header, fd_pdus) = self.generic_file_transfer(
cfdp_user,
with_closure,
content_str.as_bytes().to_vec(),
);
assert_eq!(fd_pdus, 1);
pdu_header
}
fn finish_handling(&mut self, user: &mut TestCfdpUser, pdu_header: PduHeader) {
let finished_pdu = FinishedPduCreator::new_default(
pdu_header,
DeliveryCode::Complete,
FileStatus::Retained,
);
let finished_pdu_vec = finished_pdu.to_vec().unwrap();
let packet_info = PacketInfo::new(&finished_pdu_vec).unwrap();
self.handler
.state_machine(user, Some(&packet_info))
.unwrap();
}
} }
impl Drop for SourceHandlerTestbench { impl Drop for SourceHandlerTestbench {
@ -965,209 +1165,6 @@ mod tests {
assert!(tb.pdu_queue_empty()); assert!(tb.pdu_queue_empty());
} }
// Returns a tuple. First parameter: Closure requested. Second parameter: PDU header of
// metadata PDU.
fn common_no_acked_file_transfer(
tb: &mut SourceHandlerTestbench,
cfdp_user: &mut TestCfdpUser,
put_request: PutRequestOwned,
filesize: u64,
) -> (bool, PduHeader) {
assert_eq!(cfdp_user.transaction_indication_call_count, 0);
assert_eq!(cfdp_user.eof_sent_call_count, 0);
tb.put_request(&put_request)
.expect("put_request call failed");
assert_eq!(tb.handler.state(), State::Busy);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
let sent_packets = tb
.handler
.state_machine_no_packet(cfdp_user)
.expect("source handler FSM failure");
assert_eq!(sent_packets, 2);
assert!(!tb.pdu_queue_empty());
let next_pdu = tb.get_next_sent_pdu().unwrap();
assert!(!tb.pdu_queue_empty());
assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
assert_eq!(
next_pdu.file_directive_type,
Some(FileDirectiveType::MetadataPdu)
);
let metadata_pdu =
MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
let pdu_header = metadata_pdu.pdu_header();
tb.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc);
assert_eq!(
metadata_pdu
.src_file_name()
.value_as_str()
.unwrap()
.unwrap(),
tb.srcfile
);
assert_eq!(
metadata_pdu
.dest_file_name()
.value_as_str()
.unwrap()
.unwrap(),
tb.destfile
);
assert_eq!(metadata_pdu.metadata_params().file_size, filesize);
assert_eq!(
metadata_pdu.metadata_params().checksum_type,
ChecksumType::Crc32
);
let closure_requested = if let Some(closure_requested) = put_request.closure_requested {
assert_eq!(
metadata_pdu.metadata_params().closure_requested,
closure_requested
);
closure_requested
} else {
assert!(metadata_pdu.metadata_params().closure_requested);
metadata_pdu.metadata_params().closure_requested
};
assert_eq!(metadata_pdu.options(), &[]);
(closure_requested, *pdu_header)
}
fn common_eof_pdu_check(
tb: &mut SourceHandlerTestbench,
cfdp_user: &mut TestCfdpUser,
closure_requested: bool,
filesize: u64,
checksum: u32,
) {
let next_pdu = tb.get_next_sent_pdu().unwrap();
assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
assert_eq!(
next_pdu.file_directive_type,
Some(FileDirectiveType::EofPdu)
);
let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError);
assert_eq!(eof_pdu.file_size(), filesize);
assert_eq!(eof_pdu.file_checksum(), checksum);
assert_eq!(
eof_pdu
.pdu_header()
.common_pdu_conf()
.transaction_seq_num
.value_const(),
0
);
if !closure_requested {
assert_eq!(tb.handler.state(), State::Idle);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
} else {
assert_eq!(tb.handler.state(), State::Busy);
assert_eq!(tb.handler.step(), TransactionStep::WaitingForFinished);
}
assert_eq!(cfdp_user.transaction_indication_call_count, 1);
assert_eq!(cfdp_user.eof_sent_call_count, 1);
tb.all_fault_queues_empty();
}
fn check_next_file_pdu(
tb: &mut SourceHandlerTestbench,
expected_offset: u64,
expected_data: &[u8],
) {
let next_pdu = tb.get_next_sent_pdu().unwrap();
assert_eq!(next_pdu.pdu_type, PduType::FileData);
assert!(next_pdu.file_directive_type.is_none());
let fd_pdu =
FileDataPdu::from_bytes(&next_pdu.raw_pdu).expect("reading file data PDU failed");
assert_eq!(fd_pdu.offset(), expected_offset);
assert_eq!(fd_pdu.file_data(), expected_data);
assert!(fd_pdu.segment_metadata().is_none());
}
fn common_file_transfer(
tb: &mut SourceHandlerTestbench,
cfdp_user: &mut TestCfdpUser,
with_closure: bool,
file_data: Vec<u8>,
) -> (PduHeader, u32) {
let mut digest = CRC_32.digest();
digest.update(&file_data);
let checksum = digest.finalize();
cfdp_user.expected_full_src_name = tb.srcfile.clone();
cfdp_user.expected_full_dest_name = tb.destfile.clone();
cfdp_user.expected_file_size = file_data.len() as u64;
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
&tb.srcfile,
&tb.destfile,
Some(TransmissionMode::Unacknowledged),
Some(with_closure),
)
.expect("creating put request failed");
let (closure_requested, pdu_header) =
common_no_acked_file_transfer(tb, cfdp_user, put_request, cfdp_user.expected_file_size);
let mut current_offset = 0;
let chunks = file_data.chunks(
calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&pdu_header,
tb.max_packet_len,
None,
),
);
let mut fd_pdus = 0;
for segment in chunks {
check_next_file_pdu(tb, current_offset, segment);
tb.handler.state_machine_no_packet(cfdp_user).unwrap();
fd_pdus += 1;
current_offset += segment.len() as u64;
}
common_eof_pdu_check(
tb,
cfdp_user,
closure_requested,
cfdp_user.expected_file_size,
checksum,
);
(pdu_header, fd_pdus)
}
fn common_tiny_file_transfer(
tb: &mut SourceHandlerTestbench,
cfdp_user: &mut TestCfdpUser,
with_closure: bool,
) -> PduHeader {
let mut file = OpenOptions::new()
.write(true)
.open(&tb.srcfile)
.expect("opening file failed");
let content_str = "Hello World!";
file.write_all(content_str.as_bytes())
.expect("writing file content failed");
drop(file);
let (pdu_header, fd_pdus) =
common_file_transfer(tb, cfdp_user, with_closure, content_str.as_bytes().to_vec());
assert_eq!(fd_pdus, 1);
pdu_header
}
fn common_finish_handling(
tb: &mut SourceHandlerTestbench,
cfdp_user: &mut TestCfdpUser,
pdu_header: PduHeader,
) {
let finished_pdu = FinishedPduCreator::new_default(
pdu_header,
DeliveryCode::Complete,
FileStatus::Retained,
);
let finished_pdu_vec = finished_pdu.to_vec().unwrap();
let packet_info = PacketInfo::new(&finished_pdu_vec).unwrap();
tb.handler
.state_machine(cfdp_user, Some(&packet_info))
.unwrap();
}
#[test] #[test]
fn test_empty_file_transfer_not_acked_no_closure() { fn test_empty_file_transfer_not_acked_no_closure() {
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
@ -1184,9 +1181,8 @@ mod tests {
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize); let mut cfdp_user = tb.create_user(0, filesize);
let (closure_requested, _) = let (closure_requested, _) =
common_no_acked_file_transfer(&mut tb, &mut cfdp_user, put_request, filesize); tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
common_eof_pdu_check( tb.common_eof_pdu_check(
&mut tb,
&mut cfdp_user, &mut cfdp_user,
closure_requested, closure_requested,
filesize, filesize,
@ -1200,7 +1196,7 @@ mod tests {
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let mut cfdp_user = TestCfdpUser::default(); let mut cfdp_user = TestCfdpUser::default();
let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
common_tiny_file_transfer(&mut tb, &mut cfdp_user, false); tb.common_tiny_file_transfer(&mut cfdp_user, false);
} }
#[test] #[test]
@ -1209,8 +1205,8 @@ mod tests {
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
let mut cfdp_user = TestCfdpUser::default(); let mut cfdp_user = TestCfdpUser::default();
let pdu_header = common_tiny_file_transfer(&mut tb, &mut cfdp_user, true); let pdu_header = tb.common_tiny_file_transfer(&mut cfdp_user, true);
common_finish_handling(&mut tb, &mut cfdp_user, pdu_header) tb.finish_handling(&mut cfdp_user, pdu_header)
} }
#[test] #[test]
@ -1228,7 +1224,7 @@ mod tests {
file.write_all(&rand_data) file.write_all(&rand_data)
.expect("writing file content failed"); .expect("writing file content failed");
drop(file); drop(file);
let (_, fd_pdus) = common_file_transfer(&mut tb, &mut cfdp_user, false, rand_data.to_vec()); let (_, fd_pdus) = tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec());
assert_eq!(fd_pdus, 2); assert_eq!(fd_pdus, 2);
} }
@ -1248,9 +1244,9 @@ mod tests {
.expect("writing file content failed"); .expect("writing file content failed");
drop(file); drop(file);
let (pdu_header, fd_pdus) = let (pdu_header, fd_pdus) =
common_file_transfer(&mut tb, &mut cfdp_user, false, rand_data.to_vec()); tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec());
assert_eq!(fd_pdus, 2); assert_eq!(fd_pdus, 2);
common_finish_handling(&mut tb, &mut cfdp_user, pdu_header) tb.finish_handling(&mut cfdp_user, pdu_header)
} }
#[test] #[test]
@ -1269,15 +1265,14 @@ mod tests {
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize); let mut cfdp_user = tb.create_user(0, filesize);
let (closure_requested, pdu_header) = let (closure_requested, pdu_header) =
common_no_acked_file_transfer(&mut tb, &mut cfdp_user, put_request, filesize); tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
common_eof_pdu_check( tb.common_eof_pdu_check(
&mut tb,
&mut cfdp_user, &mut cfdp_user,
closure_requested, closure_requested,
filesize, filesize,
CRC_32.digest().finalize(), CRC_32.digest().finalize(),
); );
common_finish_handling(&mut tb, &mut cfdp_user, pdu_header) tb.finish_handling(&mut cfdp_user, pdu_header)
} }
#[test] #[test]

View File

@ -155,8 +155,7 @@ impl CfdpUser for ExampleCfdpUser {
} }
} }
#[test] fn end_to_end_test(with_closure: bool) {
fn end_to_end_test() {
// Simplified event handling using atomic signals. // Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false)); let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone(); let stop_signal_dest = stop_signal_source.clone();
@ -189,7 +188,7 @@ fn end_to_end_test() {
let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values( let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(), REMOTE_ID.into(),
1024, 1024,
true, with_closure,
false, false,
spacepackets::cfdp::TransmissionMode::Unacknowledged, spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32, ChecksumType::Crc32,
@ -234,7 +233,7 @@ fn end_to_end_test() {
srcfile.to_str().expect("invaid path string"), srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"), destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged), Some(TransmissionMode::Unacknowledged),
Some(true), Some(with_closure),
) )
.expect("put request creation failed"); .expect("put request creation failed");
@ -340,3 +339,13 @@ fn end_to_end_test() {
jh_source.join().unwrap(); jh_source.join().unwrap();
jh_dest.join().unwrap(); jh_dest.join().unwrap();
} }
#[test]
fn end_to_end_test_no_closure() {
end_to_end_test(false);
}
#[test]
fn end_to_end_test_with_closure() {
end_to_end_test(true);
}