continue dest handler

This commit is contained in:
Robin Mueller
2025-09-15 14:47:23 +02:00
parent d239af7179
commit 73cbcfdd2a
3 changed files with 275 additions and 166 deletions

View File

@@ -31,7 +31,7 @@
//! 5. A Finished PDU ACK was received.
use crate::{DummyPduProvider, GenericSendError, PduProvider, user::TransactionFinishedParams};
use core::{
cell::Cell,
cell::{Cell, RefCell},
str::{Utf8Error, from_utf8, from_utf8_unchecked},
};
@@ -45,9 +45,11 @@ use super::{
use smallvec::SmallVec;
use spacepackets::{
cfdp::{
ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode,
ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransactionStatus,
TransmissionMode,
pdu::{
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader,
ack::AckPdu,
eof::EofPdu,
file_data::FileDataPdu,
finished::{DeliveryCode, FileStatus, FinishedPduCreator},
@@ -68,6 +70,21 @@ struct FileProperties {
dest_file_path_len: usize,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct AnomalyTracker {
invalid_ack_directive_code: u8,
}
impl AnomalyTracker {
pub fn invalid_ack_directive_code(&mut self) -> u8 {
self.invalid_ack_directive_code
}
pub fn reset(&mut self) {
self.invalid_ack_directive_code = 0;
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum CompletionDisposition {
Completed = 0,
@@ -79,13 +96,22 @@ enum CompletionDisposition {
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum TransactionStep {
Idle = 0,
TransactionStart = 1,
ReceivingFileDataPdus = 2,
ReceivingFileDataPdusWithCheckLimitHandling = 3,
SendingAckPdu = 4,
TransferCompletion = 5,
SendingFinishedPdu = 6,
Idle,
TransactionStart,
/// Special state which is only used for acknowledged mode. The CFDP entity is still waiting
/// for a missing metadata PDU to be re-sent. Until then, all arriving file data PDUs will only
/// update the internal lost segment tracker. When the EOF PDU arrives, the state will be left.
/// Please note that deferred lost segment handling might also be active when this state is set.
WaitingForMetadata,
ReceivingFileDataPdus,
/// This is the check timer step as specified in chapter 4.6.3.3 b) of the standard.
/// The destination handler will still check for file data PDUs which might lead to a full
/// file transfer completion.
ReceivingFileDataPdusWithCheckLimitHandling,
WaitingForMissingData,
//SendingAckPdu,
TransferCompletion,
WaitingForFinishedAck,
}
// This contains transfer state parameters for destination transaction.
@@ -103,6 +129,7 @@ struct TransferState<Countdown: CountdownProvider> {
file_status: FileStatus,
completion_disposition: Cell<CompletionDisposition>,
checksum: u32,
anomaly_tracker: AnomalyTracker,
current_check_count: u32,
current_check_timer: Option<Countdown>,
}
@@ -122,6 +149,7 @@ impl<CheckTimer: CountdownProvider> Default for TransferState<CheckTimer> {
completion_disposition: Cell::new(CompletionDisposition::Completed),
checksum: 0,
current_check_count: 0,
anomaly_tracker: AnomalyTracker::default(),
current_check_timer: None,
}
}
@@ -130,10 +158,9 @@ impl<CheckTimer: CountdownProvider> Default for TransferState<CheckTimer> {
// This contains parameters for destination transaction.
#[derive(Debug)]
struct TransactionParams<Countdown: CountdownProvider> {
tstate: TransferState<Countdown>,
transfer_state: TransferState<Countdown>,
pdu_conf: CommonPduConfig,
file_properties: FileProperties,
cksum_buf: [u8; 1024],
msgs_to_user_size: usize,
// TODO: Should we make this configurable?
msgs_to_user_buf: [u8; 1024],
@@ -161,11 +188,11 @@ impl Default for FileProperties {
impl<CheckTimer: CountdownProvider> TransactionParams<CheckTimer> {
fn file_size(&self) -> u64 {
self.tstate.metadata_params.file_size
self.transfer_state.metadata_params.file_size
}
fn metadata_params(&self) -> &MetadataGenericParams {
&self.tstate.metadata_params
&self.transfer_state.metadata_params
}
}
@@ -173,10 +200,9 @@ impl<CheckTimer: CountdownProvider> Default for TransactionParams<CheckTimer> {
fn default() -> Self {
Self {
pdu_conf: Default::default(),
cksum_buf: [0; 1024],
msgs_to_user_size: 0,
msgs_to_user_buf: [0; 1024],
tstate: Default::default(),
transfer_state: Default::default(),
file_properties: Default::default(),
remote_cfg: None,
}
@@ -185,13 +211,19 @@ impl<CheckTimer: CountdownProvider> Default for TransactionParams<CheckTimer> {
impl<CheckTimer: CountdownProvider> TransactionParams<CheckTimer> {
fn reset(&mut self) {
self.tstate.condition_code.set(ConditionCode::NoError);
self.tstate.delivery_code.set(DeliveryCode::Incomplete);
self.tstate.file_status = FileStatus::Unreported;
self.transfer_state
.condition_code
.set(ConditionCode::NoError);
self.transfer_state
.delivery_code
.set(DeliveryCode::Incomplete);
self.transfer_state.file_status = FileStatus::Unreported;
self.transfer_state.anomaly_tracker.reset();
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum DestError {
/// File directive expected, but none specified
#[error("expected file directive")]
@@ -268,8 +300,8 @@ pub struct DestinationHandler<
local_cfg: LocalEntityConfig<UserFaultHook>,
step: core::cell::Cell<TransactionStep>,
state: State,
tparams: TransactionParams<CheckTimerProvider>,
packet_buf: alloc::vec::Vec<u8>,
transaction_params: TransactionParams<CheckTimerProvider>,
pdu_and_cksum_buffer: RefCell<alloc::vec::Vec<u8>>,
pub pdu_sender: PduSender,
pub vfs: Vfs,
pub remote_cfg_table: RemoteCfgTable,
@@ -300,11 +332,12 @@ impl<
/// # Arguments
///
/// * `local_cfg` - The local CFDP entity configuration.
/// * `max_packet_len` - The maximum expected generated packet size in bytes. Each time a
/// packet is sent, it will be buffered inside an internal buffer. The length of this buffer
/// will be determined by this parameter. This parameter can either be a known upper bound,
/// or it can specifically be determined by the largest packet size parameter of all remote
/// entity configurations in the passed `remote_cfg_table`.
///
/// * `pdu_and_cksum_buf_size` - The handler requires a buffer to generate PDUs and perform
/// checksum calculations. This parameter can either be a known upper bound for the packet
/// size, for example 2048 or 4096 bytes.
/// It can also specifically be determined by the largest packet size parameter of all
/// remote entity configurations in the passed `remote_cfg_table`.
/// * `pdu_sender` - [PduSendProvider] used to send generated PDU packets.
/// * `vfs` - [VirtualFilestore] implementation used by the handler, which decouples the CFDP
/// implementation from the underlying filestore/filesystem. This allows to use this handler
@@ -316,7 +349,7 @@ impl<
/// where the standard time APIs might not be available.
pub fn new(
local_cfg: LocalEntityConfig<UserFaultHook>,
max_packet_len: usize,
pdu_and_cksum_buf_size: usize,
pdu_sender: PduSender,
vfs: Vfs,
remote_cfg_table: RemoteCfgTable,
@@ -326,8 +359,8 @@ impl<
local_cfg,
step: Cell::new(TransactionStep::Idle),
state: State::Idle,
tparams: Default::default(),
packet_buf: alloc::vec![0; max_packet_len],
transaction_params: Default::default(),
pdu_and_cksum_buffer: core::cell::RefCell::new(alloc::vec![0; pdu_and_cksum_buf_size]),
pdu_sender,
vfs,
remote_cfg_table,
@@ -403,7 +436,7 @@ impl<
if self.state == State::Idle {
return None;
}
Some(self.tparams.transmission_mode())
Some(self.transaction_params.transmission_mode())
}
pub fn transaction_id(&self) -> Option<TransactionId> {
@@ -414,7 +447,8 @@ impl<
&mut self,
cfdp_user: &mut impl CfdpUser,
packet_to_insert: &impl PduProvider,
) -> Result<(), DestError> {
) -> Result<u32, DestError> {
let mut sent_packets = 0;
if packet_to_insert.packet_target()? != PacketTarget::DestEntity {
// Unwrap is okay here, a PacketInfo for a file data PDU should always have the
// destination as the target.
@@ -428,14 +462,17 @@ impl<
if packet_to_insert.file_directive_type().is_none() {
return Err(DestError::DirectiveFieldEmpty);
}
self.handle_file_directive(
sent_packets += self.handle_file_directive(
cfdp_user,
packet_to_insert.file_directive_type().unwrap(),
packet_to_insert.pdu(),
)
)?;
}
PduType::FileData => {
self.handle_file_data(cfdp_user, packet_to_insert.pdu())?;
}
PduType::FileData => self.handle_file_data(cfdp_user, packet_to_insert.pdu()),
}
Ok(sent_packets)
}
fn handle_file_directive(
@@ -443,9 +480,12 @@ impl<
cfdp_user: &mut impl CfdpUser,
pdu_directive: FileDirectiveType,
raw_packet: &[u8],
) -> Result<(), DestError> {
) -> Result<u32, DestError> {
let mut sent_packets = 0;
match pdu_directive {
FileDirectiveType::EofPdu => self.handle_eof_pdu(cfdp_user, raw_packet)?,
FileDirectiveType::EofPdu => {
sent_packets += self.handle_eof_pdu(cfdp_user, raw_packet)?
}
FileDirectiveType::FinishedPdu
| FileDirectiveType::NakPdu
| FileDirectiveType::KeepAlivePdu => {
@@ -455,61 +495,81 @@ impl<
});
}
FileDirectiveType::AckPdu => {
return Err(DestError::NotImplemented);
let ack_pdu = AckPdu::from_bytes(raw_packet)?;
self.handle_ack_pdu(ack_pdu)?;
}
FileDirectiveType::MetadataPdu => {
let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?;
self.handle_metadata_pdu(metadata_pdu)?
}
FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?,
FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?,
};
Ok(sent_packets)
}
fn handle_ack_pdu(&mut self, ack_pdu: AckPdu) -> Result<(), DestError> {
if ack_pdu.directive_code_of_acked_pdu() != FileDirectiveType::FinishedPdu {
self.transaction_params
.transfer_state
.anomaly_tracker
.invalid_ack_directive_code = self
.transaction_params
.transfer_state
.anomaly_tracker
.invalid_ack_directive_code
.wrapping_add(1);
}
// We are done.
self.reset();
Ok(())
}
fn handle_metadata_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
fn handle_metadata_pdu(&mut self, metadata_pdu: MetadataPduReader) -> Result<(), DestError> {
if self.state != State::Idle {
return Err(DestError::RecvdMetadataButIsBusy);
}
let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?;
self.tparams.reset();
self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params();
self.transaction_params.reset();
self.transaction_params.transfer_state.metadata_params = *metadata_pdu.metadata_params();
let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value());
if remote_cfg.is_none() {
return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id()));
}
self.tparams.remote_cfg = Some(*remote_cfg.unwrap());
self.transaction_params.remote_cfg = Some(*remote_cfg.unwrap());
// TODO: Support for metadata only PDUs.
let src_name = metadata_pdu.src_file_name();
let dest_name = metadata_pdu.dest_file_name();
if src_name.is_empty() && dest_name.is_empty() {
self.tparams.tstate.metadata_only = true;
self.transaction_params.transfer_state.metadata_only = true;
}
if !self.tparams.tstate.metadata_only && src_name.is_empty() {
if !self.transaction_params.transfer_state.metadata_only && src_name.is_empty() {
return Err(DestError::EmptySrcFileField);
}
if !self.tparams.tstate.metadata_only && dest_name.is_empty() {
if !self.transaction_params.transfer_state.metadata_only && dest_name.is_empty() {
return Err(DestError::EmptyDestFileField);
}
if !self.tparams.tstate.metadata_only {
self.tparams.file_properties.src_file_name[..src_name.len_value()]
if !self.transaction_params.transfer_state.metadata_only {
self.transaction_params.file_properties.src_file_name[..src_name.len_value()]
.copy_from_slice(src_name.value());
self.tparams.file_properties.src_file_name_len = src_name.len_value();
self.transaction_params.file_properties.src_file_name_len = src_name.len_value();
if dest_name.is_empty() {
return Err(DestError::EmptyDestFileField);
}
self.tparams.file_properties.dest_file_name[..dest_name.len_value()]
self.transaction_params.file_properties.dest_file_name[..dest_name.len_value()]
.copy_from_slice(dest_name.value());
self.tparams.file_properties.dest_file_name_len = dest_name.len_value();
self.tparams.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf();
self.tparams.msgs_to_user_size = 0;
self.transaction_params.file_properties.dest_file_name_len = dest_name.len_value();
self.transaction_params.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf();
self.transaction_params.msgs_to_user_size = 0;
}
if !metadata_pdu.options().is_empty() {
for option_tlv in metadata_pdu.options_iter().unwrap() {
if option_tlv.is_standard_tlv()
&& option_tlv.tlv_type().unwrap() == TlvType::MsgToUser
{
self.tparams
self.transaction_params
.msgs_to_user_buf
.copy_from_slice(option_tlv.raw_data().unwrap());
self.tparams.msgs_to_user_size += option_tlv.len_full();
self.transaction_params.msgs_to_user_size += option_tlv.len_full();
}
}
}
@@ -544,8 +604,8 @@ impl<
unsafe {
from_utf8_unchecked(
//from_utf8(
&self.tparams.file_properties.dest_path_buf
[0..self.tparams.file_properties.dest_file_path_len],
&self.transaction_params.file_properties.dest_path_buf
[0..self.transaction_params.file_properties.dest_file_path_len],
)
},
fd_pdu.offset(),
@@ -566,14 +626,21 @@ impl<
&mut self,
cfdp_user: &mut impl CfdpUser,
raw_packet: &[u8],
) -> Result<(), DestError> {
) -> Result<u32, DestError> {
let mut sent_packets = 0;
if self.state == State::Idle || self.step() != TransactionStep::ReceivingFileDataPdus {
return Err(DestError::WrongStateForEof);
}
let eof_pdu = EofPdu::from_bytes(raw_packet)?;
if self.local_cfg.indication_cfg.eof_recv {
// Unwrap is okay here, application logic ensures that transaction ID is valid here.
cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap());
cfdp_user.eof_recvd_indication(
self.transaction_params
.transfer_state
.transaction_id
.as_ref()
.unwrap(),
);
}
let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError {
self.handle_no_error_eof_pdu(&eof_pdu)?
@@ -582,17 +649,17 @@ impl<
// 4.6.6 of the standard.
self.trigger_notice_of_completion_cancelled(
eof_pdu.condition_code(),
EntityIdTlv::new(self.tparams.remote_cfg.unwrap().entity_id),
EntityIdTlv::new(self.transaction_params.remote_cfg.unwrap().entity_id),
);
self.tparams.tstate.progress = eof_pdu.file_size();
self.transaction_params.transfer_state.progress = eof_pdu.file_size();
if eof_pdu.file_size() > 0 {
self.tparams
.tstate
self.transaction_params
.transfer_state
.delivery_code
.set(DeliveryCode::Incomplete);
} else {
self.tparams
.tstate
self.transaction_params
.transfer_state
.delivery_code
.set(DeliveryCode::Complete);
}
@@ -603,9 +670,9 @@ impl<
true
};
if regular_transfer_finish {
self.file_transfer_complete_transition();
sent_packets += self.file_transfer_complete_transition()?;
}
Ok(())
Ok(sent_packets)
}
fn trigger_notice_of_completion_cancelled(
@@ -613,20 +680,23 @@ impl<
cond_code: ConditionCode,
fault_location: EntityIdTlv,
) {
self.tparams
.tstate
self.transaction_params
.transfer_state
.completion_disposition
.set(CompletionDisposition::Cancelled);
self.tparams.tstate.condition_code.set(cond_code);
self.tparams
.tstate
self.transaction_params
.transfer_state
.condition_code
.set(cond_code);
self.transaction_params
.transfer_state
.fault_location_finished
.set(Some(fault_location));
// For anything larger than 0, we'd have to do a checksum check to verify whether
// the delivery is really complete, and we need the EOF checksum for that..
if self.tparams.tstate.progress == 0 {
self.tparams
.tstate
if self.transaction_params.transfer_state.progress == 0 {
self.transaction_params
.transfer_state
.delivery_code
.set(DeliveryCode::Complete);
}
@@ -635,7 +705,7 @@ impl<
/// Returns whether the transfer can be completed regularly.
fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
// CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
if self.tparams.tstate.progress > eof_pdu.file_size() {
if self.transaction_params.transfer_state.progress > eof_pdu.file_size() {
match self.declare_fault(ConditionCode::FileSizeError) {
FaultHandlerCode::IgnoreError => (),
FaultHandlerCode::AbandonTransaction => {
@@ -646,8 +716,8 @@ impl<
return Ok(false);
}
}
} else if (self.tparams.tstate.progress < eof_pdu.file_size())
&& self.tparams.transmission_mode() == TransmissionMode::Acknowledged
} else if (self.transaction_params.transfer_state.progress < eof_pdu.file_size())
&& self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged
{
// CFDP 4.6.4.3.1: The end offset of the last received file segment and the file
// size as stated in the EOF PDU is not the same, so we need to add that segment to
@@ -658,9 +728,9 @@ impl<
// )
}
self.tparams.tstate.checksum = eof_pdu.file_checksum();
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged
&& !self.checksum_verify(self.tparams.tstate.checksum)
self.transaction_params.transfer_state.checksum = eof_pdu.file_checksum();
if self.transaction_params.transmission_mode() == TransmissionMode::Unacknowledged
&& !self.checksum_verify(self.transaction_params.transfer_state.checksum)
{
if self.declare_fault(ConditionCode::FileChecksumFailure)
!= FaultHandlerCode::IgnoreError
@@ -673,19 +743,32 @@ impl<
Ok(true)
}
fn file_transfer_complete_transition(&mut self) {
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged {
self.set_step(TransactionStep::TransferCompletion);
} else {
// TODO: Prepare ACK PDU somehow.
self.set_step(TransactionStep::SendingAckPdu);
fn file_transfer_complete_transition(&mut self) -> Result<u32, DestError> {
let mut sent_packets = 0;
if self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged {
let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0);
let ack_pdu = AckPdu::new(
pdu_header,
FileDirectiveType::EofPdu,
self.transaction_params.transfer_state.condition_code.get(),
TransactionStatus::Active,
)
.unwrap();
let written_len = ack_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?;
self.pdu_sender.send_file_directive_pdu(
FileDirectiveType::AckPdu,
&self.pdu_and_cksum_buffer.borrow()[0..written_len],
)?;
sent_packets += 1;
}
self.set_step(TransactionStep::TransferCompletion);
Ok(sent_packets)
}
fn checksum_verify(&mut self, checksum: u32) -> bool {
let mut file_delivery_complete = false;
if self.tparams.metadata_params().checksum_type == ChecksumType::NullChecksum
|| self.tparams.tstate.metadata_only
if self.transaction_params.metadata_params().checksum_type == ChecksumType::NullChecksum
|| self.transaction_params.transfer_state.metadata_only
{
file_delivery_complete = true;
} else {
@@ -694,23 +777,23 @@ impl<
// Safety: It was already verified that the path is valid during the transaction start.
unsafe {
from_utf8_unchecked(
&self.tparams.file_properties.dest_path_buf
[0..self.tparams.file_properties.dest_file_path_len],
&self.transaction_params.file_properties.dest_path_buf
[0..self.transaction_params.file_properties.dest_file_path_len],
)
},
self.tparams.metadata_params().checksum_type,
self.tparams.tstate.progress,
&mut self.tparams.cksum_buf,
self.transaction_params.metadata_params().checksum_type,
self.transaction_params.transfer_state.progress,
self.pdu_and_cksum_buffer.get_mut(),
) {
Ok(checksum_success) => {
file_delivery_complete = checksum_success;
if !checksum_success {
self.tparams
.tstate
self.transaction_params
.transfer_state
.delivery_code
.set(DeliveryCode::Incomplete);
self.tparams
.tstate
self.transaction_params
.transfer_state
.condition_code
.set(ConditionCode::FileChecksumFailure);
}
@@ -739,12 +822,12 @@ impl<
};
}
if file_delivery_complete {
self.tparams
.tstate
self.transaction_params
.transfer_state
.delivery_code
.set(DeliveryCode::Complete);
self.tparams
.tstate
self.transaction_params
.transfer_state
.condition_code
.set(ConditionCode::NoError);
}
@@ -753,28 +836,40 @@ impl<
fn start_check_limit_handling(&mut self) {
self.set_step(TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling);
self.tparams.tstate.current_check_timer = Some(self.check_timer_creator.create_countdown(
TimerContext::CheckLimit {
local_id: self.local_cfg.id,
remote_id: self.tparams.remote_cfg.unwrap().entity_id,
entity_type: EntityType::Receiving,
},
));
self.tparams.tstate.current_check_count = 0;
self.transaction_params.transfer_state.current_check_timer = Some(
self.check_timer_creator
.create_countdown(TimerContext::CheckLimit {
local_id: self.local_cfg.id,
remote_id: self.transaction_params.remote_cfg.unwrap().entity_id,
entity_type: EntityType::Receiving,
}),
);
self.transaction_params.transfer_state.current_check_count = 0;
}
fn check_limit_handling(&mut self) {
if self.tparams.tstate.current_check_timer.is_none() {
return;
fn check_limit_handling(&mut self) -> Result<u32, DestError> {
if self
.transaction_params
.transfer_state
.current_check_timer
.is_none()
{
return Ok(0);
}
let check_timer = self.tparams.tstate.current_check_timer.as_ref().unwrap();
let mut sent_packets = 0;
let check_timer = self
.transaction_params
.transfer_state
.current_check_timer
.as_ref()
.unwrap();
if check_timer.has_expired() {
if self.checksum_verify(self.tparams.tstate.checksum) {
self.file_transfer_complete_transition();
return;
if self.checksum_verify(self.transaction_params.transfer_state.checksum) {
sent_packets += self.file_transfer_complete_transition()?;
return Ok(sent_packets);
}
if self.tparams.tstate.current_check_count + 1
>= self.tparams.remote_cfg.unwrap().check_limit
if self.transaction_params.transfer_state.current_check_count + 1
>= self.transaction_params.remote_cfg.unwrap().check_limit
{
if self.declare_fault(ConditionCode::CheckLimitReached)
== FaultHandlerCode::AbandonTransaction
@@ -782,15 +877,16 @@ impl<
self.abandon_transaction();
}
} else {
self.tparams.tstate.current_check_count += 1;
self.tparams
.tstate
self.transaction_params.transfer_state.current_check_count += 1;
self.transaction_params
.transfer_state
.current_check_timer
.as_mut()
.unwrap()
.reset();
}
}
Ok(sent_packets)
}
pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> {
@@ -810,16 +906,13 @@ impl<
}
}
if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
self.check_limit_handling();
sent_packets += self.check_limit_handling()?;
}
if self.step() == TransactionStep::TransferCompletion {
sent_packets += self.transfer_completion(cfdp_user)?;
}
if self.step() == TransactionStep::SendingAckPdu {
return Err(DestError::NotImplemented);
}
if self.step() == TransactionStep::SendingFinishedPdu {
self.reset();
if self.step() == TransactionStep::WaitingForFinishedAck {
// TODO: Positive ACK handling procedures.
}
Ok(sent_packets)
}
@@ -839,26 +932,29 @@ impl<
fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
let dest_name = from_utf8(
&self.tparams.file_properties.dest_file_name
[..self.tparams.file_properties.dest_file_name_len],
&self.transaction_params.file_properties.dest_file_name
[..self.transaction_params.file_properties.dest_file_name_len],
)?;
self.tparams.file_properties.dest_path_buf[0..dest_name.len()]
self.transaction_params.file_properties.dest_path_buf[0..dest_name.len()]
.copy_from_slice(dest_name.as_bytes());
self.tparams.file_properties.dest_file_path_len = dest_name.len();
let source_id = self.tparams.pdu_conf.source_id();
let id = TransactionId::new(source_id, self.tparams.pdu_conf.transaction_seq_num);
self.transaction_params.file_properties.dest_file_path_len = dest_name.len();
let source_id = self.transaction_params.pdu_conf.source_id();
let id = TransactionId::new(
source_id,
self.transaction_params.pdu_conf.transaction_seq_num,
);
let src_name = from_utf8(
&self.tparams.file_properties.src_file_name
[0..self.tparams.file_properties.src_file_name_len],
&self.transaction_params.file_properties.src_file_name
[0..self.transaction_params.file_properties.src_file_name_len],
)?;
let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new();
let mut num_msgs_to_user = 0;
if self.tparams.msgs_to_user_size > 0 {
if self.transaction_params.msgs_to_user_size > 0 {
let mut index = 0;
while index < self.tparams.msgs_to_user_size {
while index < self.transaction_params.msgs_to_user_size {
// This should never panic as the validity of the options was checked beforehand.
let msgs_to_user_tlv =
MsgToUserTlv::from_bytes(&self.tparams.msgs_to_user_buf[index..])
MsgToUserTlv::from_bytes(&self.transaction_params.msgs_to_user_buf[index..])
.expect("message to user creation failed unexpectedly");
msgs_to_user.push(msgs_to_user_tlv);
index += msgs_to_user_tlv.len_full();
@@ -868,12 +964,12 @@ impl<
let metadata_recvd_params = MetadataReceivedParams {
id,
source_id,
file_size: self.tparams.file_size(),
file_size: self.transaction_params.file_size(),
src_file_name: src_name,
dest_file_name: dest_name,
msgs_to_user: &msgs_to_user[..num_msgs_to_user],
};
self.tparams.tstate.transaction_id = Some(id);
self.transaction_params.transfer_state.transaction_id = Some(id);
cfdp_user.metadata_recvd_indication(&metadata_recvd_params);
if self.vfs.exists(dest_name)? && self.vfs.is_dir(dest_name)? {
@@ -887,22 +983,22 @@ impl<
return Err(DestError::PathConcat);
}
let source_name = source_file_name.unwrap();
self.tparams.file_properties.dest_path_buf[dest_name.len()] = b'/';
self.tparams.file_properties.dest_path_buf
self.transaction_params.file_properties.dest_path_buf[dest_name.len()] = b'/';
self.transaction_params.file_properties.dest_path_buf
[dest_name.len() + 1..dest_name.len() + 1 + source_name.len()]
.copy_from_slice(source_name.as_bytes());
self.tparams.file_properties.dest_file_path_len += 1 + source_name.len();
self.transaction_params.file_properties.dest_file_path_len += 1 + source_name.len();
}
let dest_path_str = from_utf8(
&self.tparams.file_properties.dest_path_buf
[0..self.tparams.file_properties.dest_file_path_len],
&self.transaction_params.file_properties.dest_path_buf
[0..self.transaction_params.file_properties.dest_file_path_len],
)?;
if self.vfs.exists(dest_path_str)? {
self.vfs.truncate_file(dest_path_str)?;
} else {
self.vfs.create_file(dest_path_str)?;
}
self.tparams.tstate.file_status = FileStatus::Retained;
self.transaction_params.transfer_state.file_status = FileStatus::Retained;
drop(msgs_to_user);
self.set_step(TransactionStep::ReceivingFileDataPdus);
Ok(())
@@ -911,12 +1007,18 @@ impl<
fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
let mut sent_packets = 0;
self.notice_of_completion(cfdp_user)?;
if self.tparams.transmission_mode() == TransmissionMode::Acknowledged
|| self.tparams.metadata_params().closure_requested
if self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged
|| self.transaction_params.metadata_params().closure_requested
{
sent_packets += self.send_finished_pdu()?;
if self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged {
self.set_step(TransactionStep::WaitingForFinishedAck);
} else {
self.reset();
}
} else {
self.reset();
}
self.set_step(TransactionStep::SendingFinishedPdu);
Ok(sent_packets)
}
@@ -924,7 +1026,7 @@ impl<
if self.tstate().completion_disposition.get() == CompletionDisposition::Completed {
// TODO: Execute any filestore requests
} else if self
.tparams
.transaction_params
.remote_cfg
.as_ref()
.unwrap()
@@ -934,8 +1036,8 @@ impl<
// Safety: We already verified that the path is valid during the transaction start.
let dest_path = unsafe {
from_utf8_unchecked(
&self.tparams.file_properties.dest_path_buf
[0..self.tparams.file_properties.dest_file_path_len],
&self.transaction_params.file_properties.dest_path_buf
[0..self.transaction_params.file_properties.dest_file_path_len],
)
};
if self.vfs.exists(dest_path)? && self.vfs.is_file(dest_path)? {
@@ -1012,14 +1114,13 @@ impl<
pub fn reset(&mut self) {
self.set_step(TransactionStep::Idle);
self.state = State::Idle;
// self.packets_to_send_ctx.packet_available = false;
self.tparams.reset();
self.transaction_params.reset();
}
fn send_finished_pdu(&mut self) -> Result<u32, DestError> {
let tstate = self.tstate();
let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0);
let finished_pdu = if tstate.condition_code.get() == ConditionCode::NoError
|| tstate.condition_code.get() == ConditionCode::UnsupportedChecksumType
{
@@ -1038,11 +1139,10 @@ impl<
tstate.fault_location_finished.get(),
)
};
finished_pdu.write_to_bytes(&mut self.packet_buf)?;
self.pdu_sender.send_pdu(
finished_pdu.pdu_type(),
finished_pdu.file_directive_type(),
&self.packet_buf[0..finished_pdu.len_written()],
finished_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?;
self.pdu_sender.send_file_directive_pdu(
FileDirectiveType::FinishedPdu,
&self.pdu_and_cksum_buffer.borrow()[0..finished_pdu.len_written()],
)?;
Ok(1)
}
@@ -1054,12 +1154,12 @@ impl<
#[inline]
fn tstate(&self) -> &TransferState<Countdown> {
&self.tparams.tstate
&self.transaction_params.transfer_state
}
#[inline]
fn tstate_mut(&mut self) -> &mut TransferState<Countdown> {
&mut self.tparams.tstate
&mut self.transaction_params.transfer_state
}
}

View File

@@ -639,6 +639,14 @@ pub trait PduSendProvider {
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), GenericSendError>;
fn send_file_directive_pdu(
&self,
file_directive_type: FileDirectiveType,
raw_pdu: &[u8],
) -> Result<(), GenericSendError> {
self.send_pdu(PduType::FileDirective, Some(file_directive_type), raw_pdu)
}
}
#[cfg(feature = "std")]

View File

@@ -27,7 +27,7 @@
//! 4. A Finished PDU will be awaited, for example one generated using
//! [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
//!
//! ### Acknowledged transfer
//! ### Acknowledged transfer (*not implemented yet*)
//!
//! 4. A EOF ACK packet will be awaited, for example one generated using
//! [spacepackets::cfdp::pdu::ack::AckPdu].
@@ -844,7 +844,8 @@ impl<
FileDirectiveType::FinishedPdu,
condition_code,
transaction_status,
)?;
)
.map_err(PduError::from)?;
self.pdu_send_helper(&ack_pdu)?;
Ok(())
}
@@ -1387,7 +1388,7 @@ mod tests {
transfer_info: &TransferInfo,
seg_reqs: &[(u32, u32)],
) {
let nak_pdu = NakPduCreator::new(
let nak_pdu = NakPduCreator::new_normal_file_size(
transfer_info.pdu_header,
0,
transfer_info.file_size as u32,
@@ -2339,7 +2340,7 @@ mod tests {
);
// NAK to cause re-transmission of metadata PDU.
let nak_pdu = NakPduCreator::new(
let nak_pdu = NakPduCreator::new_normal_file_size(
transfer_info.pdu_header,
0,
transfer_info.file_size as u32,