dest handler cancel handling
Some checks failed
Rust/cfdp/pipeline/head There was a failure building this commit

This commit is contained in:
2024-09-09 21:19:52 +02:00
parent df97b38cc5
commit cd03e5d18a
2 changed files with 238 additions and 61 deletions

View File

@@ -92,9 +92,11 @@ struct TransferState<Countdown: CountdownProvider> {
transaction_id: Option<TransactionId>,
metadata_params: MetadataGenericParams,
progress: u64,
file_size_eof: u64,
metadata_only: bool,
condition_code: ConditionCode,
delivery_code: DeliveryCode,
fault_location_finished: Option<EntityIdTlv>,
file_status: FileStatus,
completion_disposition: CompletionDisposition,
checksum: u32,
@@ -108,9 +110,11 @@ impl<CheckTimer: CountdownProvider> Default for TransferState<CheckTimer> {
transaction_id: None,
metadata_params: Default::default(),
progress: Default::default(),
file_size_eof: Default::default(),
metadata_only: false,
condition_code: ConditionCode::NoError,
delivery_code: DeliveryCode::Incomplete,
fault_location_finished: None,
file_status: FileStatus::Unreported,
completion_disposition: CompletionDisposition::Completed,
checksum: 0,
@@ -241,9 +245,14 @@ pub enum DestError {
/// allocation is prohibited. Furthermore, it uses the [VirtualFilestore] abstraction to allow
/// usage on systems without a [std] filesystem.
///
/// This handler does not support concurrency out of the box. Instead, if concurrent handling
/// is required, it is recommended to create a new handler and run all active handlers inside a
/// thread pool, or move the newly created handler to a new thread.
/// This handler is able to deal with file copy operations to directories, similarly to how the
/// UNIX tool `cp` works. If the destination path is a directory instead of a regular full path,
/// the source path base file name will be appended to the destination path to form the resulting
/// new full path.
///
// This handler also does not support concurrency out of the box but is flexible enough to be used
/// in different concurrent contexts. For example, you can dynamically create new handlers and
/// run them inside a thread pool, or move the newly created handler to a new thread."""
pub struct DestinationHandler<
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
@@ -358,6 +367,10 @@ impl<
}
}
pub fn cancel_request(&mut self, transaction_id: &TransactionId) {
// TODO: Implement.
}
/// Returns [None] if the state machine is IDLE, and the transmission mode of the current
/// request otherwise.
pub fn transmission_mode(&self) -> Option<TransmissionMode> {
@@ -534,7 +547,14 @@ impl<
let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError {
self.handle_no_error_eof_pdu(&eof_pdu)?
} else {
return Err(DestError::NotImplemented);
// This is an EOF (Cancel), perform Cancel Response Procedures according to chapter
// 4.6.6 of the standard.
self.trigger_notice_of_completion_cancelled(eof_pdu.condition_code());
self.tparams.tstate.progress = eof_pdu.file_size();
self.tparams.tstate.delivery_code = DeliveryCode::Incomplete;
self.tparams.tstate.fault_location_finished =
Some(EntityIdTlv::new(self.tparams.remote_cfg.unwrap().entity_id));
true
};
if regular_transfer_finish {
self.file_transfer_complete_transition();
@@ -542,6 +562,11 @@ impl<
Ok(())
}
fn trigger_notice_of_completion_cancelled(&mut self, cond_code: ConditionCode) {
self.tparams.tstate.completion_disposition = CompletionDisposition::Cancelled;
self.tparams.tstate.condition_code = cond_code;
}
/// Returns whether the transfer can be completed regularly.
fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
// CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
@@ -886,14 +911,13 @@ impl<
{
FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status)
} else {
// TODO: Are there cases where this ID is actually the source entity ID?
let entity_id = EntityIdTlv::new(self.local_cfg.id);
FinishedPduCreator::new_with_error(
FinishedPduCreator::new_generic(
pdu_header,
tstate.condition_code,
tstate.delivery_code,
tstate.file_status,
entity_id,
&[],
tstate.fault_location_finished,
)
};
finished_pdu.write_to_bytes(&mut self.packet_buf)?;
@@ -1690,4 +1714,7 @@ mod tests {
.expect("EOF no error insertion failed");
tb.check_completion_indication_success(&mut test_user);
}
#[test]
fn test_tranfer_cancellation() {}
}

View File

@@ -433,20 +433,52 @@ impl<
{
return Err(PutRequestError::FileDoesNotExist);
}
self.tstate = Some(TransferState::new(
TransactionId::new(
self.local_cfg().id,
UnsignedByteField::new(
SeqCountProvider::MAX_BIT_WIDTH / 8,
self.seq_count_provider.get_and_increment().into(),
),
let transaction_id = TransactionId::new(
self.local_cfg().id,
UnsignedByteField::new(
SeqCountProvider::MAX_BIT_WIDTH / 8,
self.seq_count_provider.get_and_increment().into(),
),
*remote_cfg,
);
// Both the source entity and destination entity ID field must have the same size.
// We use the larger of either the Put Request destination ID or the local entity ID
// as the size for the new entity IDs.
let larger_entity_width = core::cmp::max(
self.local_cfg.id.size(),
self.put_request_cacher.static_fields.destination_id.size(),
);
let create_id = |cached_id: &UnsignedByteField| {
if larger_entity_width != cached_id.size() {
UnsignedByteField::new(larger_entity_width, cached_id.value_const())
} else {
*cached_id
}
};
// Set PDU configuration fields which are important for generating PDUs.
self.pdu_conf
.set_source_and_dest_id(
create_id(&self.local_cfg.id),
create_id(&self.put_request_cacher.static_fields.destination_id),
)
.unwrap();
// Set up other PDU configuration fields.
self.pdu_conf.direction = Direction::TowardsReceiver;
self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into();
self.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
self.pdu_conf.trans_mode = transmission_mode;
self.fparams.segment_len = self.calculate_max_file_seg_len(remote_cfg);
// Set up the transfer context structure.
self.tstate = Some(TransferState {
transaction_id,
remote_cfg: *remote_cfg,
transmission_mode,
closure_requested,
None,
None,
));
cond_code_eof: None,
finished_params: None,
});
self.state_helper.state = super::State::Busy;
Ok(())
}
@@ -655,32 +687,6 @@ impl<
self.pdu_conf.file_flag = LargeFileFlag::Normal
}
}
// Both the source entity and destination entity ID field must have the same size.
// We use the larger of either the Put Request destination ID or the local entity ID
// as the size for the new entity IDs.
let larger_entity_width = core::cmp::max(
self.local_cfg.id.size(),
self.put_request_cacher.static_fields.destination_id.size(),
);
let create_id = |cached_id: &UnsignedByteField| {
if larger_entity_width != cached_id.size() {
UnsignedByteField::new(larger_entity_width, cached_id.value_const())
} else {
*cached_id
}
};
self.pdu_conf
.set_source_and_dest_id(
create_id(&self.local_cfg.id),
create_id(&self.put_request_cacher.static_fields.destination_id),
)
.unwrap();
// Set up other PDU configuration fields.
self.pdu_conf.direction = Direction::TowardsReceiver;
self.pdu_conf.crc_flag = tstate.remote_cfg.crc_on_transmission_by_default.into();
self.pdu_conf.transaction_seq_num = *tstate.transaction_id.seq_num();
self.pdu_conf.trans_mode = tstate.transmission_mode;
self.fparams.segment_len = self.calculate_max_file_seg_len(&tstate.remote_cfg);
cfdp_user.transaction_indication(&tstate.transaction_id);
Ok(())
}
@@ -892,7 +898,7 @@ impl<
PduHeader::new_no_file_data(self.pdu_conf, 0),
tstate.cond_code_eof.unwrap_or(ConditionCode::NoError),
checksum,
self.fparams.file_size,
self.fparams.progress,
None,
);
self.pdu_send_helper(&eof_pdu)?;
@@ -1112,6 +1118,7 @@ mod tests {
use crate::{
filestore::NativeFilestore,
request::PutRequestOwned,
source::TransactionStep,
tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler},
FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown,
StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32,
@@ -1149,6 +1156,13 @@ mod tests {
check_idle_on_drop: bool,
}
#[allow(dead_code)]
struct TransferInfo {
id: TransactionId,
closure_requested: bool,
pdu_header: PduHeader,
}
impl SourceHandlerTestbench {
fn new(
crc_on_transmission_by_default: bool,
@@ -1215,6 +1229,15 @@ mod tests {
.all_queues_empty()
}
#[allow(dead_code)]
fn test_fault_handler(&self) -> &RefCell<TestFaultHandler> {
self.handler.local_cfg.user_fault_hook()
}
fn test_fault_handler_mut(&mut self) -> &mut RefCell<TestFaultHandler> {
self.handler.local_cfg.user_fault_hook_mut()
}
fn pdu_queue_empty(&self) -> bool {
self.handler.pdu_sender.queue_empty()
}
@@ -1270,7 +1293,7 @@ mod tests {
Some(with_closure),
)
.expect("creating put request failed");
let (closure_requested, pdu_header) = self.common_no_acked_file_transfer(
let transaction_info = self.common_no_acked_file_transfer(
cfdp_user,
put_request,
cfdp_user.expected_file_size,
@@ -1278,7 +1301,7 @@ mod tests {
let mut current_offset = 0;
let chunks = file_data.chunks(
calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&pdu_header,
&transaction_info.pdu_header,
self.max_packet_len,
None,
),
@@ -1292,21 +1315,19 @@ mod tests {
}
self.common_eof_pdu_check(
cfdp_user,
closure_requested,
transaction_info.closure_requested,
cfdp_user.expected_file_size,
checksum,
);
(pdu_header, fd_pdus)
(transaction_info.pdu_header, fd_pdus)
}
// Returns a tuple. First parameter: Closure requested. Second parameter: PDU header of
// metadata PDU.
fn common_no_acked_file_transfer(
&mut self,
cfdp_user: &mut TestCfdpUser,
put_request: PutRequestOwned,
filesize: u64,
) -> (bool, PduHeader) {
) -> TransferInfo {
assert_eq!(cfdp_user.transaction_indication_call_count, 0);
assert_eq!(cfdp_user.eof_sent_call_count, 0);
@@ -1314,6 +1335,7 @@ mod tests {
.expect("put_request call failed");
assert_eq!(self.handler.state(), State::Busy);
assert_eq!(self.handler.step(), TransactionStep::Idle);
let id = self.handler.transaction_id().unwrap();
let sent_packets = self
.handler
.state_machine_no_packet(cfdp_user)
@@ -1363,7 +1385,11 @@ mod tests {
metadata_pdu.metadata_params().closure_requested
};
assert_eq!(metadata_pdu.options(), &[]);
(closure_requested, *pdu_header)
TransferInfo {
pdu_header: *pdu_header,
closure_requested,
id,
}
}
fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) {
@@ -1485,11 +1511,11 @@ mod tests {
)
.expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize);
let (closure_requested, _) =
let transaction_info =
tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
tb.common_eof_pdu_check(
&mut cfdp_user,
closure_requested,
transaction_info.closure_requested,
filesize,
CRC_32.digest().finalize(),
)
@@ -1569,15 +1595,15 @@ mod tests {
)
.expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize);
let (closure_requested, pdu_header) =
let transaction_info =
tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
tb.common_eof_pdu_check(
&mut cfdp_user,
closure_requested,
transaction_info.closure_requested,
filesize,
CRC_32.digest().finalize(),
);
tb.finish_handling(&mut cfdp_user, pdu_header)
tb.finish_handling(&mut cfdp_user, transaction_info.pdu_header)
}
#[test]
@@ -1647,11 +1673,12 @@ mod tests {
)
.expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize);
let (closure_requested, _pdu_header) =
let transaction_info =
tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
let expected_id = tb.handler.transaction_id().unwrap();
tb.common_eof_pdu_check(
&mut cfdp_user,
closure_requested,
transaction_info.closure_requested,
filesize,
CRC_32.digest().finalize(),
);
@@ -1669,5 +1696,128 @@ mod tests {
assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached);
assert_eq!(eof_pdu.file_size(), 0);
assert_eq!(eof_pdu.file_checksum(), 0);
// Cancellation fault should have been triggered.
let fault_handler = tb.test_fault_handler_mut();
let fh_ref_mut = fault_handler.get_mut();
assert!(!fh_ref_mut.cancellation_queue_empty());
assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
let (id, cond_code, progress) = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
assert_eq!(id, expected_id);
assert_eq!(cond_code, ConditionCode::CheckLimitReached);
assert_eq!(progress, 0);
fh_ref_mut.all_queues_empty();
}
#[test]
fn test_cancelled_transfer_empty_file() {
let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default();
let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
let filesize = 0;
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
&tb.srcfile,
&tb.destfile,
Some(TransmissionMode::Unacknowledged),
Some(false),
)
.expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize);
assert_eq!(cfdp_user.transaction_indication_call_count, 0);
assert_eq!(cfdp_user.eof_sent_call_count, 0);
tb.put_request(&put_request)
.expect("put_request call failed");
assert_eq!(tb.handler.state(), State::Busy);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
assert!(tb.get_next_sent_pdu().is_none());
let id = tb.handler.transaction_id().unwrap();
tb.handler
.cancel_request(&mut cfdp_user, &id)
.expect("transaction cancellation failed");
assert_eq!(tb.handler.state(), State::Idle);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
// EOF (Cancel) PDU will be generated
let eof_pdu = tb
.get_next_sent_pdu()
.expect("no EOF PDU generated like expected");
assert_eq!(
eof_pdu.file_directive_type.unwrap(),
FileDirectiveType::EofPdu
);
let eof_pdu = EofPdu::from_bytes(&eof_pdu.raw_pdu).unwrap();
assert_eq!(
eof_pdu.condition_code(),
ConditionCode::CancelRequestReceived
);
assert_eq!(eof_pdu.file_checksum(), 0);
assert_eq!(eof_pdu.file_size(), 0);
tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
}
#[test]
fn test_cancelled_transfer_mid_transfer() {
let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default();
let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
let mut file = OpenOptions::new()
.write(true)
.open(&tb.srcfile)
.expect("opening file failed");
let mut rand_data = [0u8; 140];
rand::thread_rng().fill(&mut rand_data[..]);
file.write_all(&rand_data)
.expect("writing file content failed");
drop(file);
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
&tb.srcfile,
&tb.destfile,
Some(TransmissionMode::Unacknowledged),
Some(false),
)
.expect("creating put request failed");
let file_size = rand_data.len() as u64;
let mut cfdp_user = tb.create_user(0, file_size);
let transaction_info =
tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, file_size);
let mut chunks = rand_data.chunks(
calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&transaction_info.pdu_header,
tb.max_packet_len,
None,
),
);
let mut digest = CRC_32.digest();
let first_chunk = chunks.next().expect("no chunk found");
digest.update(first_chunk);
let checksum = digest.finalize();
let next_packet = tb.get_next_sent_pdu().unwrap();
assert_eq!(next_packet.pdu_type, PduType::FileData);
let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap();
assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]);
let expected_id = tb.handler.transaction_id().unwrap();
assert!(tb
.handler
.cancel_request(&mut cfdp_user, &expected_id)
.expect("cancellation failed"));
assert_eq!(tb.handler.state(), State::Idle);
assert_eq!(tb.handler.step(), TransactionStep::Idle);
let next_packet = tb.get_next_sent_pdu().unwrap();
assert_eq!(next_packet.pdu_type, PduType::FileDirective);
assert_eq!(
next_packet.file_directive_type.unwrap(),
FileDirectiveType::EofPdu
);
// As specified in 4.11.2.2 of the standard, the file size will be the progress of the
// file copy operation so far, and the checksum is calculated for that progress.
let eof_pdu = EofPdu::from_bytes(&next_packet.raw_pdu).expect("EOF PDU creation failed");
assert_eq!(eof_pdu.file_size(), first_chunk.len() as u64);
assert_eq!(eof_pdu.file_checksum(), checksum);
assert_eq!(
eof_pdu.condition_code(),
ConditionCode::CancelRequestReceived
);
}
}