more tests and refactoring

This commit is contained in:
Robin Mueller
2025-09-24 17:49:16 +02:00
parent 824ac1ca54
commit de3e5c52cb
3 changed files with 454 additions and 226 deletions

View File

@@ -189,7 +189,8 @@ struct TransactionParams<CountdownInstance: Countdown> {
acked_params: Option<AcknowledgedModeParams>,
deferred_procedure_timer: Option<CountdownInstance>,
finished_params: FinishedParams,
positive_ack_params: Option<PositiveAckParams<CountdownInstance>>,
positive_ack_params: Option<PositiveAckParams>,
ack_timer: Option<CountdownInstance>,
metadata_only: bool,
completion_disposition: Cell<CompletionDisposition>,
checksum: u32,
@@ -243,6 +244,7 @@ impl<CheckTimer: Countdown> Default for TransactionParams<CheckTimer> {
acked_params: None,
metadata_only: false,
positive_ack_params: None,
ack_timer: None,
finished_params: FinishedParams::default(),
completion_disposition: Cell::new(CompletionDisposition::Completed),
checksum: 0,
@@ -1600,6 +1602,11 @@ impl<
}
if self.step() == TransactionStep::WaitingForFinishedAck {
sent_packets += self.handle_positive_ack_procedures()?;
// Little hack because of the state machine handling order to ensure the transfer
// is completed in one go.
if self.step() == TransactionStep::TransferCompletion {
sent_packets += self.transfer_completion(cfdp_user)?;
}
}
Ok(sent_packets)
}
@@ -1692,9 +1699,19 @@ impl<
}
fn start_positive_ack_procedure(&mut self) {
self.transaction_params.positive_ack_params = Some(PositiveAckParams {
ack_timer: self
.check_timer_creator
match &mut self.transaction_params.positive_ack_params {
Some(current) => {
current.ack_counter = 0;
}
None => {
self.transaction_params.positive_ack_params = Some(PositiveAckParams {
ack_counter: 0,
positive_ack_of_cancellation: false,
})
}
}
self.transaction_params.ack_timer = Some(
self.check_timer_creator
.create_countdown(TimerContext::PositiveAck {
expiry_time: self
.transaction_params
@@ -1703,19 +1720,24 @@ impl<
.unwrap()
.positive_ack_timer_interval,
}),
ack_counter: 0,
})
);
}
fn handle_positive_ack_procedures(&mut self) -> Result<u32, DestError> {
// Do we have positive-ack params?
let params = match self.transaction_params.positive_ack_params.as_mut() {
Some(p) => p,
None => return Ok(0),
};
if self.transaction_params.positive_ack_params.is_none() {
return Ok(0);
}
let params = self.transaction_params.positive_ack_params.unwrap();
// Has the timer expired?
if !params.ack_timer.has_expired() {
if !self
.transaction_params
.ack_timer
.as_mut()
.unwrap()
.has_expired()
{
return Ok(0);
}
@@ -1731,12 +1753,23 @@ impl<
== FaultHandlerCode::AbandonTransaction
{
self.abandon_transaction();
return Ok(0);
}
self.transaction_params
.positive_ack_params
.as_mut()
.unwrap()
.positive_ack_of_cancellation = true;
return Ok(0);
}
params.ack_timer.reset();
params.ack_counter += 1;
let params_mut = self
.transaction_params
.positive_ack_params
.as_mut()
.unwrap();
params_mut.ack_counter += 1;
self.transaction_params.ack_timer.as_mut().unwrap().reset();
self.send_finished_pdu()
}
@@ -1782,10 +1815,17 @@ impl<
// Cache those, because they might be reset when abandoning the transaction.
let transaction_id = self.transaction_id().unwrap();
let progress = self.transaction_params.progress;
let fh_code = self
let mut fh_code = self
.local_cfg
.fault_handler
.get_fault_handler(condition_code);
// CFDP 4.11.2.3.2: Any fault declared in the course of transferring the Finished (cancel)
// PDU must result in abadonment of the transaction.
if let Some(positive_ack) = &self.transaction_params.positive_ack_params {
if positive_ack.positive_ack_of_cancellation {
fh_code = FaultHandlerCode::AbandonTransaction;
}
}
match fh_code {
FaultHandlerCode::NoticeOfCancellation => {
self.notice_of_cancellation(condition_code, EntityIdTlv::new(self.local_cfg().id));
@@ -1794,11 +1834,10 @@ impl<
FaultHandlerCode::IgnoreError => (),
FaultHandlerCode::AbandonTransaction => (),
}
self.local_cfg.fault_handler.report_fault(FaultInfo::new(
transaction_id,
condition_code,
progress,
))
self.local_cfg.fault_handler.report_fault(
fh_code,
FaultInfo::new(transaction_id, condition_code, progress),
)
}
fn notice_of_cancellation(&self, condition_code: ConditionCode, fault_location: EntityIdTlv) {
@@ -2039,6 +2078,10 @@ mod tests {
self.expiry_control.set_nak_activity_expired();
}
fn set_positive_ack_expired(&mut self) {
self.expiry_control.set_positive_ack_expired();
}
fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser {
TestCfdpUser::new(
0,
@@ -2151,6 +2194,7 @@ mod tests {
user: &mut TestCfdpUser,
cond_code: ConditionCode,
file_status: FileStatus,
delivery_code: DeliveryCode,
) {
assert_eq!(user.finished_indic_queue.len(), 1);
let finished_indication = user.finished_indic_queue.pop_front().unwrap();
@@ -2159,7 +2203,7 @@ mod tests {
self.handler.transaction_id().unwrap()
);
assert_eq!(finished_indication.file_status, file_status);
assert_eq!(finished_indication.delivery_code, DeliveryCode::Incomplete);
assert_eq!(finished_indication.delivery_code, delivery_code);
assert_eq!(finished_indication.condition_code, cond_code);
}
@@ -2195,6 +2239,7 @@ mod tests {
&mut self,
cond_code: ConditionCode,
file_status: FileStatus,
delivery_code: DeliveryCode,
) {
let pdu = self.get_next_pdu().unwrap();
assert_eq!(pdu.pdu_type, PduType::FileDirective);
@@ -2203,10 +2248,9 @@ mod tests {
FileDirectiveType::FinishedPdu
);
let finished_pdu = FinishedPduReader::from_bytes(&pdu.raw_pdu).unwrap();
assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete);
assert_eq!(finished_pdu.delivery_code(), delivery_code);
assert_eq!(finished_pdu.file_status(), file_status);
assert_eq!(finished_pdu.condition_code(), cond_code);
//assert!(finished_pdu.fault_location().is_none());
}
fn acknowledge_finished_pdu(
@@ -2236,7 +2280,7 @@ mod tests {
if !self.all_fault_queues_empty() {
let fh_queues = self.handler.local_cfg.user_fault_hook().borrow();
println!(
"fault queues not empyt. cancellation {}, suspension {}, ignored {}, abandon {}",
"fault queues not empty. cancellation {}, suspension {}, ignored {}, abandon {}",
fh_queues.notice_of_cancellation_queue.len(),
fh_queues.notice_of_suspension_queue.len(),
fh_queues.ignored_queue.len(),
@@ -3567,8 +3611,13 @@ mod tests {
&mut user,
ConditionCode::NakLimitReached,
FileStatus::Retained,
DeliveryCode::Incomplete,
);
tb.check_finished_pdu_failure(
ConditionCode::NakLimitReached,
FileStatus::Retained,
DeliveryCode::Incomplete,
);
tb.check_finished_pdu_failure(ConditionCode::NakLimitReached, FileStatus::Retained);
{
let mut fault_hook = tb.fault_handler().user_hook.borrow_mut();
@@ -3587,6 +3636,131 @@ mod tests {
#[test]
fn test_positive_ack_procedure() {
// TODO.
let fault_handler = TestFaultHandler::default();
let mut tb = DestHandlerTestbench::new_with_fixed_paths(
fault_handler,
TransmissionMode::Acknowledged,
false,
);
let mut user = tb.test_user_from_cached_paths(0);
let transfer_info = tb
.generic_transfer_init(&mut user, 0)
.expect("transfer init failed");
tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
tb.generic_eof_no_error(&mut user, Vec::new())
.expect("EOF no error insertion failed");
tb.check_completion_indication_success(&mut user);
assert_eq!(tb.pdu_queue_len(), 2);
tb.check_eof_ack_pdu(ConditionCode::NoError);
tb.check_finished_pdu_success();
tb.set_positive_ack_expired();
// This should cause the PDU to be sent again.
assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1);
tb.check_finished_pdu_success();
tb.acknowledge_finished_pdu(&mut user, &transfer_info);
}
fn generic_positive_ack_test(
tb: &mut DestHandlerTestbench,
user: &mut TestCfdpUser,
) -> TransferInfo {
let transfer_info = tb
.generic_transfer_init(user, 0)
.expect("transfer init failed");
tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
tb.generic_eof_no_error(user, Vec::new())
.expect("EOF no error insertion failed");
tb.check_completion_indication_success(user);
assert_eq!(tb.pdu_queue_len(), 2);
tb.check_eof_ack_pdu(ConditionCode::NoError);
tb.check_finished_pdu_success();
// This should cause the PDU to be sent again.
tb.set_positive_ack_expired();
assert_eq!(tb.handler.state_machine_no_packet(user).unwrap(), 1);
tb.check_finished_pdu_success();
// Positive ACK limit reached.
tb.set_positive_ack_expired();
assert_eq!(tb.handler.state_machine_no_packet(user).unwrap(), 1);
tb.check_finished_pdu_failure(
ConditionCode::PositiveAckLimitReached,
FileStatus::Retained,
DeliveryCode::Complete,
);
tb.check_completion_indication_failure(
user,
ConditionCode::PositiveAckLimitReached,
FileStatus::Retained,
DeliveryCode::Complete,
);
{
let mut fault_handler = tb.fault_handler().user_hook.borrow_mut();
assert!(!fault_handler.cancellation_queue_empty());
let cancellation = fault_handler
.notice_of_cancellation_queue
.pop_front()
.unwrap();
assert_eq!(cancellation.transaction_id(), transfer_info.id);
assert_eq!(
cancellation.condition_code(),
ConditionCode::PositiveAckLimitReached
);
assert_eq!(cancellation.progress(), 0);
}
transfer_info
}
#[test]
fn test_positive_ack_limit_reached() {
let fault_handler = TestFaultHandler::default();
let mut tb = DestHandlerTestbench::new_with_fixed_paths(
fault_handler,
TransmissionMode::Acknowledged,
false,
);
let mut user = tb.test_user_from_cached_paths(0);
let transfer_info = generic_positive_ack_test(&mut tb, &mut user);
// Chances are that this one won't work either leading to transfer abandonment, but we
// acknowledge it here
tb.acknowledge_finished_pdu(&mut user, &transfer_info);
}
#[test]
fn test_positive_ack_limit_reached_with_subsequent_abandonment() {
let fault_handler = TestFaultHandler::default();
let mut tb = DestHandlerTestbench::new_with_fixed_paths(
fault_handler,
TransmissionMode::Acknowledged,
false,
);
let mut user = tb.test_user_from_cached_paths(0);
let transfer_info = generic_positive_ack_test(&mut tb, &mut user);
// This should cause the PDU to be sent again.
tb.set_positive_ack_expired();
assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1);
tb.check_finished_pdu_failure(
ConditionCode::PositiveAckLimitReached,
FileStatus::Retained,
DeliveryCode::Complete,
);
// Postive ACK limit reached which leads to abandonment.
tb.set_positive_ack_expired();
assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 0);
{
let mut fault_handler = tb.fault_handler().user_hook.borrow_mut();
assert!(!fault_handler.abandoned_queue_empty());
let cancellation = fault_handler.abandoned_queue.pop_front().unwrap();
assert_eq!(cancellation.transaction_id(), transfer_info.id);
assert_eq!(
cancellation.condition_code(),
ConditionCode::PositiveAckLimitReached
);
assert_eq!(cancellation.progress(), 0);
}
}
}

View File

@@ -603,14 +603,9 @@ impl<UserHandler: UserFaultHook> FaultHandler<UserHandler> {
self.handler_array[array_idx.unwrap()]
}
pub fn report_fault(&self, fault_info: FaultInfo) -> FaultHandlerCode {
let array_idx = Self::condition_code_to_array_index(fault_info.condition_code());
if array_idx.is_none() {
return FaultHandlerCode::IgnoreError;
}
let fh_code = self.handler_array[array_idx.unwrap()];
pub fn report_fault(&self, code: FaultHandlerCode, fault_info: FaultInfo) -> FaultHandlerCode {
let mut handler_mut = self.user_hook.borrow_mut();
match fh_code {
match code {
FaultHandlerCode::NoticeOfCancellation => {
handler_mut.notice_of_cancellation_cb(fault_info);
}
@@ -624,7 +619,7 @@ impl<UserHandler: UserFaultHook> FaultHandler<UserHandler> {
handler_mut.abandoned_cb(fault_info);
}
}
fh_code
code
}
}
@@ -1072,10 +1067,10 @@ pub mod alloc_mod {
}
}
#[derive(Debug)]
struct PositiveAckParams<CountdownInstance: Countdown> {
ack_timer: CountdownInstance,
#[derive(Debug, Clone, Copy)]
struct PositiveAckParams {
ack_counter: u32,
positive_ack_of_cancellation: bool,
}
#[cfg(test)]

View File

@@ -45,8 +45,8 @@ use core::{
use spacepackets::{
ByteConversionError,
cfdp::{
ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl,
TransactionStatus, TransmissionMode,
ConditionCode, Direction, FaultHandlerCode, LargeFileFlag, PduType, SegmentMetadataFlag,
SegmentationControl, TransactionStatus, TransmissionMode,
lv::Lv,
pdu::{
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
@@ -96,7 +96,7 @@ pub enum TransactionStep {
NoticeOfCompletion = 10,
}
#[derive(Default, Copy, Clone)]
#[derive(Default, Debug, Copy, Clone)]
pub struct FileParams {
pub progress: u64,
pub segment_len: u64,
@@ -144,16 +144,6 @@ pub struct FinishedParams {
file_status: FileStatus,
}
#[derive(Debug, derive_new::new)]
pub struct TransferState {
transaction_id: Cell<TransactionId>,
remote_cfg: RefCell<RemoteEntityConfig>,
transmission_mode: Cell<super::TransmissionMode>,
closure_requested: Cell<bool>,
cond_code_eof: Cell<Option<ConditionCode>>,
finished_params: Cell<Option<FinishedParams>>,
}
#[derive(Debug, thiserror::Error)]
pub enum SourceError {
#[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
@@ -214,6 +204,49 @@ pub enum FsmContext {
ResetWhenPossible,
}
#[derive(Debug)]
pub struct TransactionParams<CountdownInstance: Countdown> {
transaction_id: Option<TransactionId>,
remote_cfg: Option<RemoteEntityConfig>,
transmission_mode: Option<super::TransmissionMode>,
closure_requested: bool,
cond_code_eof: Cell<Option<ConditionCode>>,
finished_params: Option<FinishedParams>,
// File specific transfer fields
file_params: FileParams,
// PDU configuration is cached so it can be re-used for all PDUs generated for file transfers.
pdu_conf: CommonPduConfig,
check_timer: Option<CountdownInstance>,
positive_ack_params: Cell<Option<PositiveAckParams>>,
ack_timer: RefCell<Option<CountdownInstance>>,
}
impl<CountdownInstance: Countdown> Default for TransactionParams<CountdownInstance> {
fn default() -> Self {
Self {
transaction_id: Default::default(),
remote_cfg: Default::default(),
transmission_mode: Default::default(),
closure_requested: Default::default(),
cond_code_eof: Default::default(),
finished_params: Default::default(),
file_params: Default::default(),
pdu_conf: Default::default(),
check_timer: Default::default(),
positive_ack_params: Default::default(),
ack_timer: Default::default(),
}
}
}
impl<CountdownInstance: Countdown> TransactionParams<CountdownInstance> {
#[inline]
fn reset(&mut self) {
self.transaction_id = None;
self.transmission_mode = None;
}
}
/// This is the primary CFDP source handler. It models the CFDP source entity, which is
/// primarily responsible for handling put requests to send files to another CFDP destination
/// entity.
@@ -257,14 +290,7 @@ pub struct SourceHandler<
remote_cfg_table: RemoteConfigStoreInstance,
vfs: Vfs,
state_helper: StateHelper,
// Transfer related state information
transfer_state: Option<TransferState>,
// File specific transfer fields
file_params: FileParams,
// PDU configuration is cached so it can be re-used for all PDUs generated for file transfers.
pdu_conf: CommonPduConfig,
check_timer: RefCell<Option<CountdownInstance>>,
positive_ack_params: RefCell<Option<PositiveAckParams<CountdownInstance>>>,
transaction_params: TransactionParams<CountdownInstance>,
timer_creator: TimerCreatorInstance,
seq_count_provider: SequenceCounterInstance,
anomalies: AnomalyTracker,
@@ -330,12 +356,8 @@ impl<
vfs,
put_request_cacher,
state_helper: Default::default(),
transfer_state: Default::default(),
file_params: Default::default(),
pdu_conf: Default::default(),
transaction_params: Default::default(),
anomalies: Default::default(),
check_timer: RefCell::new(None),
positive_ack_params: RefCell::new(None),
timer_creator,
seq_count_provider,
}
@@ -384,15 +406,13 @@ impl<
#[inline]
pub fn transaction_id(&self) -> Option<TransactionId> {
self.transfer_state.as_ref().map(|v| v.transaction_id.get())
self.transaction_params.transaction_id
}
/// Returns the [TransmissionMode] for the active file operation.
#[inline]
pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
self.transfer_state
.as_ref()
.map(|v| v.transmission_mode.get())
self.transaction_params.transmission_mode
}
/// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when
@@ -487,28 +507,29 @@ impl<
};
// Set PDU configuration fields which are important for generating PDUs.
self.pdu_conf
self.transaction_params
.pdu_conf
.set_source_and_dest_id(
create_id(&self.local_cfg.id),
create_id(&self.put_request_cacher.static_fields.destination_id),
)
.unwrap();
// Set up other PDU configuration fields.
self.pdu_conf.direction = Direction::TowardsReceiver;
self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into();
self.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
self.pdu_conf.trans_mode = transmission_mode;
self.file_params.segment_len = self.calculate_max_file_seg_len(remote_cfg);
self.transaction_params.pdu_conf.direction = Direction::TowardsReceiver;
self.transaction_params.pdu_conf.crc_flag =
remote_cfg.crc_on_transmission_by_default.into();
self.transaction_params.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
self.transaction_params.pdu_conf.trans_mode = transmission_mode;
self.transaction_params.file_params.segment_len =
self.calculate_max_file_seg_len(remote_cfg);
self.transaction_params.transaction_id = Some(transaction_id);
self.transaction_params.remote_cfg = Some(*remote_cfg);
self.transaction_params.transmission_mode = Some(transmission_mode);
self.transaction_params.closure_requested = closure_requested;
self.transaction_params.cond_code_eof.set(None);
self.transaction_params.finished_params = None;
// Set up the transfer context structure.
self.transfer_state = Some(TransferState {
transaction_id: Cell::new(transaction_id),
remote_cfg: RefCell::new(*remote_cfg),
transmission_mode: Cell::new(transmission_mode),
closure_requested: Cell::new(closure_requested),
cond_code_eof: Cell::new(None),
finished_params: Cell::new(None),
});
self.state_helper.state.set(super::State::Busy);
Ok(())
}
@@ -602,9 +623,7 @@ impl<
/// behaviour.
pub fn reset(&mut self) {
self.state_helper = Default::default();
self.transfer_state = None;
self.file_params = Default::default();
*self.check_timer.borrow_mut() = None;
self.transaction_params.reset();
}
#[inline]
@@ -668,36 +687,54 @@ impl<
user: &mut impl CfdpUser,
) -> Result<u32, SourceError> {
let mut sent_packets = 0;
let mut positive_ack_limit_reached = false;
if let Some(positive_ack_params) = self.positive_ack_params.borrow_mut().as_mut() {
if positive_ack_params.ack_timer.has_expired() {
let current_params = self.transaction_params.positive_ack_params.get();
if let Some(mut positive_ack_params) = current_params {
if self
.transaction_params
.ack_timer
.borrow_mut()
.as_ref()
.unwrap()
.has_expired()
{
let ack_timer_exp_limit = self
.transfer_state
.transaction_params
.remote_cfg
.as_ref()
.unwrap()
.remote_cfg
.borrow()
.positive_ack_timer_expiration_limit;
if positive_ack_params.ack_counter + 1 >= ack_timer_exp_limit {
positive_ack_limit_reached = true;
let (fault_packets_sent, ctx) =
self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?;
sent_packets += fault_packets_sent;
if ctx == FsmContext::ResetWhenPossible {
self.reset();
} else {
positive_ack_params.ack_counter = 0;
positive_ack_params.positive_ack_of_cancellation = true;
}
} else {
positive_ack_params.ack_timer.reset();
self.transaction_params
.ack_timer
.borrow_mut()
.as_mut()
.unwrap()
.reset();
positive_ack_params.ack_counter += 1;
self.prepare_and_send_eof_pdu(
user,
self.file_params.checksum_completed_file.unwrap(),
self.transaction_params
.file_params
.checksum_completed_file
.unwrap(),
)?;
sent_packets += 1;
}
}
}
if positive_ack_limit_reached {
let (fault_packets_sent, ctx) =
self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?;
if ctx == FsmContext::ResetWhenPossible {
self.reset();
}
sent_packets += fault_packets_sent;
self.transaction_params
.positive_ack_params
.set(Some(positive_ack_params));
}
Ok(sent_packets)
}
@@ -712,14 +749,18 @@ impl<
sent_packets += 1;
continue;
} else {
if (segment_req.1 < segment_req.0) || (segment_req.0 > self.file_params.progress) {
if (segment_req.1 < segment_req.0)
|| (segment_req.0 > self.transaction_params.file_params.progress)
{
return Err(SourceError::InvalidNakPdu);
}
let mut missing_chunk_len = segment_req.1 - segment_req.0;
let current_offset = segment_req.0;
while missing_chunk_len > 0 {
let chunk_size =
core::cmp::min(missing_chunk_len, self.file_params.segment_len);
let chunk_size = core::cmp::min(
missing_chunk_len,
self.transaction_params.file_params.segment_len,
);
self.prepare_and_send_file_data_pdu(current_offset, chunk_size)?;
sent_packets += 1;
missing_chunk_len -= missing_chunk_len;
@@ -736,7 +777,12 @@ impl<
// If we reach this state, countdown definitely is set.
#[allow(clippy::collapsible_if)]
if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged
&& self.check_timer.borrow().as_ref().unwrap().has_expired()
&& self
.transaction_params
.check_timer
.as_ref()
.unwrap()
.has_expired()
{
let (sent_packets, ctx) = self.declare_fault(user, ConditionCode::CheckLimitReached)?;
if ctx == FsmContext::ResetWhenPossible {
@@ -750,21 +796,31 @@ impl<
fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> {
let checksum = self.vfs.calculate_checksum(
self.put_request_cacher.source_file().unwrap(),
self.tstate_ref().remote_cfg.borrow().default_crc_type,
self.file_params.file_size,
self.transaction_params
.remote_cfg
.as_ref()
.unwrap()
.default_crc_type,
self.transaction_params.file_params.file_size,
&mut self.pdu_and_cksum_buffer.borrow_mut(),
)?;
self.file_params.checksum_completed_file = Some(checksum);
self.transaction_params.file_params.checksum_completed_file = Some(checksum);
self.prepare_and_send_eof_pdu(user, checksum)?;
if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
if self.tstate_ref().closure_requested.get() {
*self.check_timer.borrow_mut() = Some(self.timer_creator.create_countdown(
crate::TimerContext::CheckLimit {
local_id: self.local_cfg.id,
remote_id: self.tstate_ref().remote_cfg.borrow().entity_id,
entity_type: EntityType::Sending,
},
));
if self.transaction_params.closure_requested {
self.transaction_params.check_timer = Some(
self.timer_creator
.create_countdown(crate::TimerContext::CheckLimit {
local_id: self.local_cfg.id,
remote_id: self
.transaction_params
.remote_cfg
.as_ref()
.unwrap()
.entity_id,
entity_type: EntityType::Sending,
}),
);
self.set_step(TransactionStep::WaitingForFinished);
} else {
self.set_step(TransactionStep::NoticeOfCompletion);
@@ -777,18 +833,33 @@ impl<
fn start_positive_ack_procedure(&self) {
self.set_step_internal(TransactionStep::WaitingForEofAck);
*self.positive_ack_params.borrow_mut() = Some(PositiveAckParams {
ack_timer: self
.timer_creator
match self.transaction_params.positive_ack_params.get() {
Some(mut current) => {
current.ack_counter = 0;
self.transaction_params
.positive_ack_params
.set(Some(current));
}
None => self
.transaction_params
.positive_ack_params
.set(Some(PositiveAckParams {
ack_counter: 0,
positive_ack_of_cancellation: false,
})),
}
*self.transaction_params.ack_timer.borrow_mut() = Some(
self.timer_creator
.create_countdown(crate::TimerContext::PositiveAck {
expiry_time: self
.tstate_ref()
.transaction_params
.remote_cfg
.borrow()
.as_ref()
.unwrap()
.positive_ack_timer_interval,
}),
ack_counter: 0,
})
);
}
fn handle_transaction_start(
@@ -796,7 +867,7 @@ impl<
cfdp_user: &mut impl CfdpUser,
) -> Result<(), SourceError> {
if !self.put_request_cacher.has_source_file() {
self.file_params.metadata_only = true;
self.transaction_params.file_params.metadata_only = true;
} else {
let source_file = self
.put_request_cacher
@@ -811,14 +882,14 @@ impl<
self.put_request_cacher
.dest_file()
.map_err(SourceError::DestFileNotValidUtf8)?;
self.file_params.file_size = self.vfs.file_size(source_file)?;
if self.file_params.file_size > u32::MAX as u64 {
self.pdu_conf.file_flag = LargeFileFlag::Large
self.transaction_params.file_params.file_size = self.vfs.file_size(source_file)?;
if self.transaction_params.file_params.file_size > u32::MAX as u64 {
self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Large
} else {
if self.file_params.file_size == 0 {
self.file_params.empty_file = true;
if self.transaction_params.file_params.file_size == 0 {
self.transaction_params.file_params.empty_file = true;
}
self.pdu_conf.file_flag = LargeFileFlag::Normal
self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Normal
}
}
cfdp_user.transaction_indication(&self.transaction_id().unwrap());
@@ -831,7 +902,7 @@ impl<
transaction_status: TransactionStatus,
) -> Result<(), SourceError> {
let ack_pdu = AckPdu::new(
PduHeader::new_for_file_directive(self.pdu_conf, 0),
PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
FileDirectiveType::FinishedPdu,
condition_code,
transaction_status,
@@ -842,15 +913,18 @@ impl<
}
fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> {
let tstate = self.tstate_ref();
let metadata_params = MetadataGenericParams::new(
tstate.closure_requested.get(),
tstate.remote_cfg.borrow().default_crc_type,
self.file_params.file_size,
self.transaction_params.closure_requested,
self.transaction_params
.remote_cfg
.as_ref()
.unwrap()
.default_crc_type,
self.transaction_params.file_params.file_size,
);
if self.file_params.metadata_only {
if self.transaction_params.file_params.metadata_only {
let metadata_pdu = MetadataPduCreator::new(
PduHeader::new_for_file_directive(self.pdu_conf, 0),
PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
metadata_params,
Lv::new_empty(),
Lv::new_empty(),
@@ -859,7 +933,7 @@ impl<
return self.pdu_send_helper(&metadata_pdu);
}
let metadata_pdu = MetadataPduCreator::new(
PduHeader::new_for_file_directive(self.pdu_conf, 0),
PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
metadata_params,
Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(),
Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(),
@@ -869,24 +943,25 @@ impl<
}
fn file_data_fsm(&mut self) -> Result<ControlFlow<u32>, SourceError> {
//if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged {
// TODO: Handle re-transmission
//}
if !self.file_params.metadata_only
&& self.file_params.progress < self.file_params.file_size
if !self.transaction_params.file_params.metadata_only
&& self.transaction_params.file_params.progress
< self.transaction_params.file_params.file_size
&& self.send_progressing_file_data_pdu()?
{
return Ok(ControlFlow::Break(1));
}
if self.file_params.empty_file || self.file_params.progress >= self.file_params.file_size {
if self.transaction_params.file_params.empty_file
|| self.transaction_params.file_params.progress
>= self.transaction_params.file_params.file_size
{
// EOF is still expected.
self.set_step(TransactionStep::SendingEof);
self.tstate_ref()
self.transaction_params
.cond_code_eof
.set(Some(ConditionCode::NoError));
} else if self.file_params.metadata_only {
} else if self.transaction_params.file_params.metadata_only {
// Special case: Metadata Only, no EOF required.
if self.tstate_ref().closure_requested.get() {
if self.transaction_params.closure_requested {
self.set_step(TransactionStep::WaitingForFinished);
} else {
self.set_step(TransactionStep::NoticeOfCompletion);
@@ -898,7 +973,7 @@ impl<
fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
if self.local_cfg.indication_cfg.transaction_finished {
// The first case happens for unacknowledged file copy operation with no closure.
let finished_params = match self.tstate_ref().finished_params.get() {
let finished_params = match self.transaction_params.finished_params {
Some(finished_params) => TransactionFinishedParams {
id: self.transaction_id().unwrap(),
condition_code: finished_params.condition_code,
@@ -918,7 +993,7 @@ impl<
fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&PduHeader::new_for_file_directive(self.pdu_conf, 0),
&PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
remote_cfg.max_packet_len,
None,
);
@@ -933,14 +1008,19 @@ impl<
fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
// Should never be called, but use defensive programming here.
if self.file_params.progress >= self.file_params.file_size {
if self.transaction_params.file_params.progress
>= self.transaction_params.file_params.file_size
{
return Ok(false);
}
let read_len = self
.file_params
.segment_len
.min(self.file_params.file_size - self.file_params.progress);
self.prepare_and_send_file_data_pdu(self.file_params.progress, read_len)?;
let read_len = self.transaction_params.file_params.segment_len.min(
self.transaction_params.file_params.file_size
- self.transaction_params.file_params.progress,
);
self.prepare_and_send_file_data_pdu(
self.transaction_params.file_params.progress,
read_len,
)?;
Ok(true)
}
@@ -951,7 +1031,7 @@ impl<
) -> Result<(), SourceError> {
let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata(
PduHeader::new_for_file_data(
self.pdu_conf,
self.transaction_params.pdu_conf,
0,
SegmentMetadataFlag::NotPresent,
SegmentationControl::NoRecordBoundaryPreservation,
@@ -973,7 +1053,7 @@ impl<
None,
&self.pdu_and_cksum_buffer.borrow()[0..written_len],
)?;
self.file_params.progress += size;
self.transaction_params.file_params.progress += size;
Ok(())
}
@@ -983,13 +1063,13 @@ impl<
checksum: u32,
) -> Result<(), SourceError> {
let eof_pdu = EofPdu::new(
PduHeader::new_for_file_directive(self.pdu_conf, 0),
self.tstate_ref()
PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
self.transaction_params
.cond_code_eof
.get()
.unwrap_or(ConditionCode::NoError),
checksum,
self.file_params.progress,
self.transaction_params.file_params.progress,
None,
);
self.pdu_send_helper(&eof_pdu)?;
@@ -1021,15 +1101,14 @@ impl<
directive_type: Some(FileDirectiveType::FinishedPdu),
});
}
let tstate_ref = self.tstate_ref();
// Unwrapping should be fine here, the transfer state is valid when we are not in IDLE
// mode.
tstate_ref.finished_params.set(Some(FinishedParams {
self.transaction_params.finished_params = Some(FinishedParams {
condition_code: finished_pdu.condition_code(),
delivery_code: finished_pdu.delivery_code(),
file_status: finished_pdu.file_status(),
}));
if tstate_ref.transmission_mode.get() == TransmissionMode::Acknowledged {
});
if let Some(TransmissionMode::Acknowledged) = self.transmission_mode() {
self.prepare_and_send_ack_pdu(
finished_pdu.condition_code(),
TransactionStatus::Active,
@@ -1066,12 +1145,9 @@ impl<
condition_code: ConditionCode,
) -> Result<u32, SourceError> {
let mut sent_packets = 0;
match self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)? {
ControlFlow::Continue(ctx) | ControlFlow::Break(ctx) => {
if ctx == FsmContext::ResetWhenPossible {
self.reset();
}
}
let ctx = self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)?;
if ctx == FsmContext::ResetWhenPossible {
self.reset();
}
Ok(sent_packets)
}
@@ -1081,43 +1157,30 @@ impl<
user: &mut impl CfdpUser,
condition_code: ConditionCode,
sent_packets: &mut u32,
) -> Result<ControlFlow<FsmContext, FsmContext>, SourceError> {
let transaction_id = self.transaction_id().unwrap();
// CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
// the EOF (cancel) PDU must result in abandonment of the transaction.
if let Some(cond_code_eof) = self.tstate_ref().cond_code_eof.get() {
if cond_code_eof != ConditionCode::NoError {
// Still call the abandonment callback to ensure the fault is logged.
self.local_cfg
.fault_handler
.user_hook
.borrow_mut()
.abandoned_cb(FaultInfo::new(
transaction_id,
cond_code_eof,
self.file_params.progress,
));
return Ok(ControlFlow::Break(FsmContext::ResetWhenPossible));
}
}
self.tstate_ref().cond_code_eof.set(Some(condition_code));
) -> Result<FsmContext, SourceError> {
self.transaction_params
.cond_code_eof
.set(Some(condition_code));
// As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply
// the checksum for the file copy progress sent so far.
let checksum = self.vfs.calculate_checksum(
self.put_request_cacher.source_file().unwrap(),
self.tstate_ref().remote_cfg.borrow().default_crc_type,
self.file_params.progress,
self.transaction_params
.remote_cfg
.as_ref()
.unwrap()
.default_crc_type,
self.transaction_params.file_params.progress,
&mut self.pdu_and_cksum_buffer.borrow_mut(),
)?;
self.prepare_and_send_eof_pdu(user, checksum)?;
*sent_packets += 1;
if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
// We are done.
Ok(ControlFlow::Continue(FsmContext::ResetWhenPossible))
Ok(FsmContext::ResetWhenPossible)
} else {
self.start_positive_ack_procedure();
Ok(ControlFlow::Continue(FsmContext::default()))
Ok(FsmContext::default())
}
}
@@ -1140,42 +1203,38 @@ impl<
cond: ConditionCode,
) -> Result<(u32, FsmContext), SourceError> {
let mut sent_packets = 0;
let fh = self.local_cfg.fault_handler.get_fault_handler(cond);
let mut ctx = FsmContext::default();
match fh {
spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => {
match self.notice_of_cancellation_internal(user, cond, &mut sent_packets)? {
ControlFlow::Continue(ctx_cancellation) => {
ctx = ctx_cancellation;
}
ControlFlow::Break(ctx_cancellation) => {
return Ok((sent_packets, ctx_cancellation));
}
}
}
spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => {
self.notice_of_suspension_internal();
}
spacepackets::cfdp::FaultHandlerCode::IgnoreError => (),
spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => {
return Ok((sent_packets, FsmContext::ResetWhenPossible));
let mut fh = self.local_cfg.fault_handler.get_fault_handler(cond);
// CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
// the EOF (cancel) PDU must result in abandonment of the transaction.
if let Some(positive_ack_params) = self.transaction_params.positive_ack_params.get() {
if positive_ack_params.positive_ack_of_cancellation {
fh = FaultHandlerCode::AbandonTransaction;
}
}
self.local_cfg.fault_handler.report_fault(FaultInfo::new(
self.transaction_id().unwrap(),
cond,
self.file_params.progress,
));
let mut ctx = FsmContext::default();
match fh {
FaultHandlerCode::NoticeOfCancellation => {
ctx = self.notice_of_cancellation_internal(user, cond, &mut sent_packets)?;
}
FaultHandlerCode::NoticeOfSuspension => {
self.notice_of_suspension_internal();
}
FaultHandlerCode::IgnoreError => (),
FaultHandlerCode::AbandonTransaction => {
ctx = FsmContext::ResetWhenPossible;
}
}
self.local_cfg.fault_handler.report_fault(
fh,
FaultInfo::new(
self.transaction_id().unwrap(),
cond,
self.transaction_params.file_params.progress,
),
);
Ok((sent_packets, ctx))
}
// Internal helper function.
fn tstate_ref(&self) -> &TransferState {
self.transfer_state
.as_ref()
.expect("transfer state should be set in busy state")
}
fn handle_keep_alive_pdu(&mut self) {}
}