NAK sequence handling

This commit is contained in:
Robin Mueller
2025-09-18 18:31:15 +02:00
parent 8c373551dc
commit 096f3a5a38
2 changed files with 181 additions and 44 deletions

View File

@@ -30,36 +30,36 @@
//! 4. A Finished PDU has been sent back to the remote side. //! 4. A Finished PDU has been sent back to the remote side.
//! 5. A Finished PDU ACK was received. //! 5. A Finished PDU ACK was received.
use crate::{ use crate::{
DummyPduProvider, GenericSendError, IndicationConfig, PduProvider, PositiveAckParams,
lost_segments::{LostSegmentError, LostSegmentStore}, lost_segments::{LostSegmentError, LostSegmentStore},
user::TransactionFinishedParams, user::TransactionFinishedParams,
DummyPduProvider, GenericSendError, IndicationConfig, PduProvider, PositiveAckParams,
}; };
use core::{ use core::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
str::{from_utf8, from_utf8_unchecked, Utf8Error}, str::{Utf8Error, from_utf8, from_utf8_unchecked},
}; };
use super::{ use super::{
filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
Countdown, EntityType, LocalEntityConfig, PacketTarget, PduSender, RemoteConfigStore, Countdown, EntityType, LocalEntityConfig, PacketTarget, PduSender, RemoteConfigStore,
RemoteEntityConfig, State, TimerContext, TimerCreator, TransactionId, UserFaultHook, RemoteEntityConfig, State, TimerContext, TimerCreator, TransactionId, UserFaultHook,
filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
}; };
use smallvec::SmallVec; use smallvec::SmallVec;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
ChecksumType, ConditionCode, FaultHandlerCode, LargeFileFlag, PduType, TransactionStatus,
TransmissionMode,
pdu::{ pdu::{
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader,
ack::AckPdu, ack::AckPdu,
eof::EofPdu, eof::EofPdu,
file_data::FileDataPdu, file_data::FileDataPdu,
finished::{DeliveryCode, FileStatus, FinishedPduCreator}, finished::{DeliveryCode, FileStatus, FinishedPduCreator},
metadata::{MetadataGenericParams, MetadataPduReader}, metadata::{MetadataGenericParams, MetadataPduReader},
nak::NakPduCreatorWithReservedSeqReqsBuf, nak::NakPduCreatorWithReservedSeqReqsBuf,
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader,
}, },
tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, GenericTlv, ReadableTlv, TlvType}, tlv::{EntityIdTlv, GenericTlv, ReadableTlv, TlvType, msg_to_user::MsgToUserTlv},
ChecksumType, ConditionCode, FaultHandlerCode, LargeFileFlag, PduType, TransactionStatus,
TransmissionMode,
}, },
util::{UnsignedByteField, UnsignedEnum}, util::{UnsignedByteField, UnsignedEnum},
}; };
@@ -163,7 +163,7 @@ impl FinishedParams {
} }
} }
#[derive(Debug)] #[derive(Debug, Copy, Clone)]
pub struct AcknowledgedModeParams { pub struct AcknowledgedModeParams {
last_start_offset: u64, last_start_offset: u64,
last_end_offset: u64, last_end_offset: u64,
@@ -305,6 +305,8 @@ pub enum DestError {
NoRemoteConfigFound(UnsignedByteField), NoRemoteConfigFound(UnsignedByteField),
#[error("issue sending PDU: {0}")] #[error("issue sending PDU: {0}")]
SendError(#[from] GenericSendError), SendError(#[from] GenericSendError),
#[error("invalid remote entity configuration: {0:?}")]
InvalidRemoteConfig(RemoteEntityConfig),
#[error("cfdp feature not implemented")] #[error("cfdp feature not implemented")]
NotImplemented, NotImplemented,
} }
@@ -388,14 +390,14 @@ impl<PduSenderInstance: PduSender, UserFaultHookInstance: UserFaultHook>
} }
impl< impl<
PduSenderInstance: PduSender, PduSenderInstance: PduSender,
UserFaultHookInstance: UserFaultHook, UserFaultHookInstance: UserFaultHook,
VirtualFilestoreInstance: VirtualFilestore, VirtualFilestoreInstance: VirtualFilestore,
RemoteConfigStoreInstance: RemoteConfigStore, RemoteConfigStoreInstance: RemoteConfigStore,
TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>, TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
CountdownInstance: Countdown, CountdownInstance: Countdown,
LostSegmentTracker: LostSegmentStore, LostSegmentTracker: LostSegmentStore,
> >
DestinationHandler< DestinationHandler<
PduSenderInstance, PduSenderInstance,
UserFaultHookInstance, UserFaultHookInstance,
@@ -989,12 +991,12 @@ impl<
} }
} }
fn deferred_lost_segment_handling(&mut self) { fn deferred_lost_segment_handling(&mut self) -> Result<(), DestError> {
assert!( assert!(
self.transaction_params.acked_params.is_some(), self.transaction_params.acked_params.is_some(),
"acknowledged parameters unexpectedly None" "acknowledged parameters unexpectedly None"
); );
let acked_params = self.transaction_params.acked_params.as_mut().unwrap(); let acked_params = self.transaction_params.acked_params.unwrap();
if self.lost_segment_tracker.is_empty() && !acked_params.metadata_missing { if self.lost_segment_tracker.is_empty() && !acked_params.metadata_missing {
// We are done and have received everything. // We are done and have received everything.
match self.checksum_verify( match self.checksum_verify(
@@ -1022,12 +1024,16 @@ impl<
.set(DeliveryCode::Incomplete); .set(DeliveryCode::Incomplete);
} }
} }
self.transaction_params
.acked_params
.as_mut()
.unwrap()
.deferred_procedure_active = false;
self.set_step(TransactionStep::TransferCompletion); self.set_step(TransactionStep::TransferCompletion);
acked_params.deferred_procedure_active = false; return Ok(());
return;
} }
let mut first_nak_issuance = self.transaction_params.deferred_procedure_timer.is_none(); let first_nak_issuance = self.transaction_params.deferred_procedure_timer.is_none();
if first_nak_issuance { if first_nak_issuance {
self.transaction_params.deferred_procedure_timer = Some( self.transaction_params.deferred_procedure_timer = Some(
self.check_timer_creator self.check_timer_creator
@@ -1047,7 +1053,7 @@ impl<
.unwrap() .unwrap()
.has_expired() .has_expired()
{ {
return; return Ok(());
} }
if !first_nak_issuance if !first_nak_issuance
&& acked_params.nak_activity_counter + 1 && acked_params.nak_activity_counter + 1
@@ -1058,13 +1064,16 @@ impl<
.unwrap() .unwrap()
.nak_timer_expiration_limit .nak_timer_expiration_limit
{ {
self.transaction_params.finished_params.delivery_code = DeliveryCode::Incomplete; self.transaction_params
.finished_params
.delivery_code
.set(DeliveryCode::Incomplete);
if self.declare_fault(ConditionCode::NakLimitReached) if self.declare_fault(ConditionCode::NakLimitReached)
== FaultHandlerCode::AbandonTransaction == FaultHandlerCode::AbandonTransaction
{ {
self.abandon_transaction(); self.abandon_transaction();
} }
return; return Ok(());
} }
let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0); let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0);
let max_segment_reqs = NakPduCreatorWithReservedSeqReqsBuf::calculate_max_segment_requests( let max_segment_reqs = NakPduCreatorWithReservedSeqReqsBuf::calculate_max_segment_requests(
@@ -1074,24 +1083,119 @@ impl<
.unwrap() .unwrap()
.max_packet_len, .max_packet_len,
&pdu_header, &pdu_header,
); )
.map_err(|_| DestError::InvalidRemoteConfig(self.transaction_params.remote_cfg.unwrap()))?;
let mut segments_to_send = self.lost_segment_tracker.number_of_segments(); let mut segments_to_send = self.lost_segment_tracker.number_of_segments();
if acked_params.metadata_missing { if acked_params.metadata_missing {
segments_to_send += 1; segments_to_send += 1;
} }
if segments_to_send <= max_segment_reqs { if segments_to_send <= max_segment_reqs {
// Should never fail because we calculcated the allowed maximum number of segment
// requests for the buffer.
let mut nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new(
self.pdu_and_cksum_buffer.get_mut(),
pdu_header,
segments_to_send,
)
.unwrap();
// All error conditions were checked so this should never fail.
//
// 1. Number of segments was calculated.
// 2. Buffer size was calculated based on PDU header and maximum packet size.
// 3. Large file flag is based on PDU header, so there should never be a missmatch.
self.lost_segment_tracker
.write_to_nak_segment_list(&mut nak_pdu_creator, acked_params.metadata_missing)
.expect("unexpected lost segment write error");
let written_len = nak_pdu_creator.finish();
self.pdu_sender.send_file_directive_pdu(
FileDirectiveType::NakPdu,
&self.pdu_and_cksum_buffer.borrow()[0..written_len],
)?;
} else {
self.write_multi_packet_nak_sequence(max_segment_reqs, acked_params.metadata_missing)?;
} }
// TODO.
Ok(())
}
#[cold]
fn write_multi_packet_nak_sequence(
&mut self,
max_segments: usize,
first_segment_metadata: bool,
) -> Result<(), DestError> {
let pdu_header = PduHeader::new_no_file_data(self.transaction_params.pdu_conf, 0);
let mut nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new( let mut nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new(
self.pdu_and_cksum_buffer.get_mut(), self.pdu_and_cksum_buffer.get_mut(),
pdu_header, pdu_header,
core::cmp::min(segments_to_send as usize, max_segment_reqs), max_segments,
) )
for index in 0..segments_to_send { .unwrap();
let mut segment_index = 0;
let mut buf_index = 0;
let mut remaining = max_segments;
let mut seg_buf = nak_pdu_creator.segment_request_buffer_mut();
// First segment re-requests metadata PDU.
if first_segment_metadata {
if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large {
seg_buf[0..16].fill(0);
buf_index = 16;
} else {
seg_buf[0..8].fill(0);
buf_index = 8;
}
segment_index = 1; // account for the header gap entry
} }
// TODO.
for (start, end) in self.lost_segment_tracker.iter() {
// flush full PDU *before* writing the new entry
if segment_index == max_segments {
let written_len = nak_pdu_creator.finish();
self.pdu_sender.send_file_directive_pdu(
FileDirectiveType::NakPdu,
&self.pdu_and_cksum_buffer.borrow()[..written_len],
)?;
remaining = remaining.saturating_sub(max_segments);
// new PDU with the same capacity
nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new(
self.pdu_and_cksum_buffer.get_mut(),
pdu_header,
core::cmp::min(remaining, max_segments),
)
.unwrap();
seg_buf = nak_pdu_creator.segment_request_buffer_mut();
segment_index = 0;
buf_index = 0;
}
// write entry
if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large {
seg_buf[buf_index..buf_index + 8].copy_from_slice(&start.to_be_bytes());
buf_index += 8;
seg_buf[buf_index..buf_index + 8].copy_from_slice(&end.to_be_bytes());
buf_index += 8;
} else {
seg_buf[buf_index..buf_index + 4].copy_from_slice(&(start as u32).to_be_bytes());
buf_index += 4;
seg_buf[buf_index..buf_index + 4].copy_from_slice(&(end as u32).to_be_bytes());
buf_index += 4;
}
segment_index += 1;
}
// send trailing PDU if anything was written
if segment_index > 0 {
let written_len = nak_pdu_creator.finish();
self.pdu_sender.send_file_directive_pdu(
FileDirectiveType::NakPdu,
&self.pdu_and_cksum_buffer.borrow()[..written_len],
)?;
}
Ok(())
} }
fn trigger_notice_of_completion_cancelled( fn trigger_notice_of_completion_cancelled(
@@ -1315,6 +1419,15 @@ impl<
return Err(e); return Err(e);
} }
} }
if self.step() == TransactionStep::WaitingForMetadata
|| self.step() == TransactionStep::ReceivingFileDataPdus
{
if let Some(ack_params) = &mut self.transaction_params.acked_params {
if ack_params.deferred_procedure_active {
self.deferred_lost_segment_handling()?;
}
}
}
if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
self.check_limit_handling()?; self.check_limit_handling()?;
} }
@@ -1601,21 +1714,21 @@ mod tests {
use rand::Rng; use rand::Rng;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
lv::Lv,
pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket},
ChecksumType, TransmissionMode, ChecksumType, TransmissionMode,
lv::Lv,
pdu::{WritablePduPacket, finished::FinishedPduReader, metadata::MetadataPduCreator},
}, },
util::{UbfU16, UnsignedByteFieldU8}, util::{UbfU16, UnsignedByteFieldU8},
}; };
use crate::{ use crate::{
CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd,
filestore::NativeFilestore, filestore::NativeFilestore,
lost_segments::LostSegmentsList, lost_segments::LostSegmentsList,
tests::{ tests::{
basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, LOCAL_ID, REMOTE_ID, SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer,
TestCheckTimerCreator, TestFaultHandler, TimerExpiryControl, LOCAL_ID, REMOTE_ID, TestCheckTimerCreator, TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table,
}, },
FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd, CRC_32,
}; };
use super::*; use super::*;
@@ -1932,12 +2045,14 @@ mod tests {
let dest_handler = let dest_handler =
default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default()); default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default());
assert!(dest_handler.transmission_mode().is_none()); assert!(dest_handler.transmission_mode().is_none());
assert!(dest_handler assert!(
.local_cfg dest_handler
.fault_handler .local_cfg
.user_hook .fault_handler
.borrow() .user_hook
.all_queues_empty()); .borrow()
.all_queues_empty()
);
assert!(dest_handler.pdu_sender.queue_empty()); assert!(dest_handler.pdu_sender.queue_empty());
assert_eq!(dest_handler.state(), State::Idle); assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle); assert_eq!(dest_handler.step(), TransactionStep::Idle);

View File

@@ -154,15 +154,37 @@ pub trait LostSegmentStore {
fn write_to_nak_segment_list( fn write_to_nak_segment_list(
&self, &self,
nak_builder: &mut NakPduCreatorWithReservedSeqReqsBuf, nak_builder: &mut NakPduCreatorWithReservedSeqReqsBuf,
first_segment_request_for_metadata: bool,
) -> Result<usize, LostSegmentWriteError> { ) -> Result<usize, LostSegmentWriteError> {
let file_flag = nak_builder.pdu_header().common_pdu_conf().file_flag; let file_flag = nak_builder.pdu_header().common_pdu_conf().file_flag;
if nak_builder.num_segment_reqs() != self.number_of_segments() { let mut relevant_size = self.number_of_segments();
if first_segment_request_for_metadata {
relevant_size += 1;
}
if nak_builder.num_segment_reqs() != relevant_size {
return Err(LostSegmentWriteError::NumberOfSegmentsMismatch { return Err(LostSegmentWriteError::NumberOfSegmentsMismatch {
expected: self.number_of_segments(), expected: self.number_of_segments(),
actual: nak_builder.num_segment_reqs(), actual: nak_builder.num_segment_reqs(),
}); });
} }
self.write_segments_to_bytes(nak_builder.segment_request_buffer_mut(), file_flag) let mut buf = nak_builder.segment_request_buffer_mut();
let mut written_len = 0;
if first_segment_request_for_metadata {
match file_flag {
LargeFileFlag::Normal => {
buf[0..8].fill(0);
buf = &mut buf[8..];
written_len += 8;
}
LargeFileFlag::Large => {
buf[0..16].fill(0);
buf = &mut buf[16..];
written_len += 16;
}
}
}
written_len += self.write_segments_to_bytes(buf, file_flag)?;
Ok(written_len)
} }
} }