added lost segment store abstraction

This commit is contained in:
Robin Mueller
2025-09-12 15:56:17 +02:00
parent 8e755fd7b2
commit d239af7179
5 changed files with 903 additions and 72 deletions

View File

@@ -22,6 +22,7 @@ derive-new = ">=0.6, <=0.7"
hashbrown = { version = ">=0.14, <=0.15", optional = true }
spacepackets = { git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git", version = "0.16", default-features = false }
thiserror = { version = "2", default-features = false }
heapless = "0.9"
serde = { version = "1", optional = true }
defmt = { version = "1", optional = true }

View File

@@ -30,7 +30,10 @@
//! 4. A Finished PDU has been sent back to the remote side.
//! 5. A Finished PDU ACK was received.
use crate::{DummyPduProvider, GenericSendError, PduProvider, user::TransactionFinishedParams};
use core::str::{Utf8Error, from_utf8, from_utf8_unchecked};
use core::{
cell::Cell,
str::{Utf8Error, from_utf8, from_utf8_unchecked},
};
use super::{
CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider,
@@ -94,11 +97,11 @@ struct TransferState<Countdown: CountdownProvider> {
// TODO: Can we delete this for good?
// file_size_eof: u64,
metadata_only: bool,
condition_code: ConditionCode,
delivery_code: DeliveryCode,
fault_location_finished: Option<EntityIdTlv>,
condition_code: Cell<ConditionCode>,
delivery_code: Cell<DeliveryCode>,
fault_location_finished: Cell<Option<EntityIdTlv>>,
file_status: FileStatus,
completion_disposition: CompletionDisposition,
completion_disposition: Cell<CompletionDisposition>,
checksum: u32,
current_check_count: u32,
current_check_timer: Option<Countdown>,
@@ -112,11 +115,11 @@ impl<CheckTimer: CountdownProvider> Default for TransferState<CheckTimer> {
progress: Default::default(),
// file_size_eof: Default::default(),
metadata_only: false,
condition_code: ConditionCode::NoError,
delivery_code: DeliveryCode::Incomplete,
fault_location_finished: None,
condition_code: Cell::new(ConditionCode::NoError),
delivery_code: Cell::new(DeliveryCode::Incomplete),
fault_location_finished: Cell::new(None),
file_status: FileStatus::Unreported,
completion_disposition: CompletionDisposition::Completed,
completion_disposition: Cell::new(CompletionDisposition::Completed),
checksum: 0,
current_check_count: 0,
current_check_timer: None,
@@ -182,8 +185,8 @@ impl<CheckTimer: CountdownProvider> Default for TransactionParams<CheckTimer> {
impl<CheckTimer: CountdownProvider> TransactionParams<CheckTimer> {
fn reset(&mut self) {
self.tstate.condition_code = ConditionCode::NoError;
self.tstate.delivery_code = DeliveryCode::Incomplete;
self.tstate.condition_code.set(ConditionCode::NoError);
self.tstate.delivery_code.set(DeliveryCode::Incomplete);
self.tstate.file_status = FileStatus::Unreported;
}
}
@@ -263,7 +266,7 @@ pub struct DestinationHandler<
CheckTimerProvider: CountdownProvider,
> {
local_cfg: LocalEntityConfig<UserFaultHook>,
step: TransactionStep,
step: core::cell::Cell<TransactionStep>,
state: State,
tparams: TransactionParams<CheckTimerProvider>,
packet_buf: alloc::vec::Vec<u8>,
@@ -321,7 +324,7 @@ impl<
) -> Self {
Self {
local_cfg,
step: TransactionStep::Idle,
step: Cell::new(TransactionStep::Idle),
state: State::Idle,
tparams: Default::default(),
packet_buf: alloc::vec![0; max_packet_len],
@@ -387,7 +390,7 @@ impl<
EntityIdTlv::new(self.local_cfg.id),
);
self.step = TransactionStep::TransferCompletion;
self.set_step(TransactionStep::TransferCompletion);
return true;
}
}
@@ -511,7 +514,7 @@ impl<
}
}
self.state = State::Busy;
self.step = TransactionStep::TransactionStart;
self.set_step(TransactionStep::TransactionStart);
Ok(())
}
@@ -520,9 +523,10 @@ impl<
user: &mut impl CfdpUser,
raw_packet: &[u8],
) -> Result<(), DestError> {
let step = self.step.get();
if self.state == State::Idle
|| (self.step != TransactionStep::ReceivingFileDataPdus
&& self.step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling)
|| (step != TransactionStep::ReceivingFileDataPdus
&& step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling)
{
return Err(DestError::WrongStateForFileData);
}
@@ -547,7 +551,11 @@ impl<
fd_pdu.offset(),
fd_pdu.file_data(),
) {
self.declare_fault(ConditionCode::FilestoreRejection);
if self.declare_fault(ConditionCode::FilestoreRejection)
== FaultHandlerCode::AbandonTransaction
{
self.abandon_transaction();
}
return Err(e.into());
}
self.tstate_mut().progress += fd_pdu.file_data().len() as u64;
@@ -559,7 +567,7 @@ impl<
cfdp_user: &mut impl CfdpUser,
raw_packet: &[u8],
) -> Result<(), DestError> {
if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus {
if self.state == State::Idle || self.step() != TransactionStep::ReceivingFileDataPdus {
return Err(DestError::WrongStateForEof);
}
let eof_pdu = EofPdu::from_bytes(raw_packet)?;
@@ -578,9 +586,15 @@ impl<
);
self.tparams.tstate.progress = eof_pdu.file_size();
if eof_pdu.file_size() > 0 {
self.tparams.tstate.delivery_code = DeliveryCode::Incomplete;
self.tparams
.tstate
.delivery_code
.set(DeliveryCode::Incomplete);
} else {
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
self.tparams
.tstate
.delivery_code
.set(DeliveryCode::Complete);
}
// TODO: The cancel EOF also supplies a checksum and a progress number. We could cross
// check that checksum, but how would we deal with a checksum failure? The standard
@@ -595,27 +609,43 @@ impl<
}
fn trigger_notice_of_completion_cancelled(
&mut self,
&self,
cond_code: ConditionCode,
fault_location: EntityIdTlv,
) {
self.tparams.tstate.completion_disposition = CompletionDisposition::Cancelled;
self.tparams.tstate.condition_code = cond_code;
self.tparams.tstate.fault_location_finished = Some(fault_location);
self.tparams
.tstate
.completion_disposition
.set(CompletionDisposition::Cancelled);
self.tparams.tstate.condition_code.set(cond_code);
self.tparams
.tstate
.fault_location_finished
.set(Some(fault_location));
// For anything larger than 0, we'd have to do a checksum check to verify whether
// the delivery is really complete, and we need the EOF checksum for that..
if self.tparams.tstate.progress == 0 {
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
self.tparams
.tstate
.delivery_code
.set(DeliveryCode::Complete);
}
}
/// Returns whether the transfer can be completed regularly.
fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
// CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
if self.tparams.tstate.progress > eof_pdu.file_size()
&& self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError
{
return Ok(false);
if self.tparams.tstate.progress > eof_pdu.file_size() {
match self.declare_fault(ConditionCode::FileSizeError) {
FaultHandlerCode::IgnoreError => (),
FaultHandlerCode::AbandonTransaction => {
self.abandon_transaction();
return Ok(false);
}
FaultHandlerCode::NoticeOfCancellation | FaultHandlerCode::NoticeOfSuspension => {
return Ok(false);
}
}
} else if (self.tparams.tstate.progress < eof_pdu.file_size())
&& self.tparams.transmission_mode() == TransmissionMode::Acknowledged
{
@@ -645,10 +675,10 @@ impl<
fn file_transfer_complete_transition(&mut self) {
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged {
self.step = TransactionStep::TransferCompletion;
self.set_step(TransactionStep::TransferCompletion);
} else {
// TODO: Prepare ACK PDU somehow.
self.step = TransactionStep::SendingAckPdu;
self.set_step(TransactionStep::SendingAckPdu);
}
}
@@ -675,33 +705,54 @@ impl<
Ok(checksum_success) => {
file_delivery_complete = checksum_success;
if !checksum_success {
self.tparams.tstate.delivery_code = DeliveryCode::Incomplete;
self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure;
self.tparams
.tstate
.delivery_code
.set(DeliveryCode::Incomplete);
self.tparams
.tstate
.condition_code
.set(ConditionCode::FileChecksumFailure);
}
}
Err(e) => match e {
FilestoreError::ChecksumTypeNotImplemented(_) => {
self.declare_fault(ConditionCode::UnsupportedChecksumType);
if self.declare_fault(ConditionCode::UnsupportedChecksumType)
== FaultHandlerCode::AbandonTransaction
{
self.abandon_transaction();
}
// For this case, the applicable algorithm shall be the the null checksum,
// which is always succesful.
file_delivery_complete = true;
}
_ => {
self.declare_fault(ConditionCode::FilestoreRejection);
if self.declare_fault(ConditionCode::FilestoreRejection)
== FaultHandlerCode::AbandonTransaction
{
self.abandon_transaction();
}
// Treat this equivalent to a failed checksum procedure.
}
},
};
}
if file_delivery_complete {
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
self.tparams.tstate.condition_code = ConditionCode::NoError;
self.tparams
.tstate
.delivery_code
.set(DeliveryCode::Complete);
self.tparams
.tstate
.condition_code
.set(ConditionCode::NoError);
}
file_delivery_complete
}
fn start_check_limit_handling(&mut self) {
self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling;
self.set_step(TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling);
self.tparams.tstate.current_check_timer = Some(self.check_timer_creator.create_countdown(
TimerContext::CheckLimit {
local_id: self.local_cfg.id,
@@ -725,7 +776,11 @@ impl<
if self.tparams.tstate.current_check_count + 1
>= self.tparams.remote_cfg.unwrap().check_limit
{
self.declare_fault(ConditionCode::CheckLimitReached);
if self.declare_fault(ConditionCode::CheckLimitReached)
== FaultHandlerCode::AbandonTransaction
{
self.abandon_transaction();
}
} else {
self.tparams.tstate.current_check_count += 1;
self.tparams
@@ -744,7 +799,7 @@ impl<
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
let mut sent_packets = 0;
if self.step == TransactionStep::TransactionStart {
if self.step() == TransactionStep::TransactionStart {
let result = self.transaction_start(cfdp_user);
if let Err(e) = result {
// If we can not even start the transaction properly, reset the handler.
@@ -754,28 +809,30 @@ impl<
return Err(e);
}
}
if self.step == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
self.check_limit_handling();
}
if self.step == TransactionStep::TransferCompletion {
if self.step() == TransactionStep::TransferCompletion {
sent_packets += self.transfer_completion(cfdp_user)?;
}
if self.step == TransactionStep::SendingAckPdu {
if self.step() == TransactionStep::SendingAckPdu {
return Err(DestError::NotImplemented);
}
if self.step == TransactionStep::SendingFinishedPdu {
if self.step() == TransactionStep::SendingFinishedPdu {
self.reset();
}
Ok(sent_packets)
}
/// Get the step, which denotes the exact step of a pending CFDP transaction when applicable.
#[inline]
pub fn step(&self) -> TransactionStep {
self.step
self.step.get()
}
/// Get the step, which denotes whether the CFDP handler is active, and which CFDP class
/// is used if it is active.
#[inline]
pub fn state(&self) -> State {
self.state
}
@@ -846,7 +903,8 @@ impl<
self.vfs.create_file(dest_path_str)?;
}
self.tparams.tstate.file_status = FileStatus::Retained;
self.step = TransactionStep::ReceivingFileDataPdus;
drop(msgs_to_user);
self.set_step(TransactionStep::ReceivingFileDataPdus);
Ok(())
}
@@ -858,12 +916,12 @@ impl<
{
sent_packets += self.send_finished_pdu()?;
}
self.step = TransactionStep::SendingFinishedPdu;
self.set_step(TransactionStep::SendingFinishedPdu);
Ok(sent_packets)
}
fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
if self.tstate().completion_disposition == CompletionDisposition::Completed {
if self.tstate().completion_disposition.get() == CompletionDisposition::Completed {
// TODO: Execute any filestore requests
} else if self
.tparams
@@ -871,7 +929,7 @@ impl<
.as_ref()
.unwrap()
.disposition_on_cancellation
&& self.tstate().delivery_code == DeliveryCode::Incomplete
&& self.tstate().delivery_code.get() == DeliveryCode::Incomplete
{
// Safety: We already verified that the path is valid during the transaction start.
let dest_path = unsafe {
@@ -888,15 +946,17 @@ impl<
let tstate = self.tstate();
let transaction_finished_params = TransactionFinishedParams {
id: tstate.transaction_id.unwrap(),
condition_code: tstate.condition_code,
delivery_code: tstate.delivery_code,
condition_code: tstate.condition_code.get(),
delivery_code: tstate.delivery_code.get(),
file_status: tstate.file_status,
};
cfdp_user.transaction_finished_indication(&transaction_finished_params);
Ok(())
}
fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode {
// When the fault handler code [FaultHandlerCode::AbandonTransaction] is returned, the caller
// must call [Self::abandon_transaction] as soon as it is possible.
fn declare_fault(&self, condition_code: ConditionCode) -> FaultHandlerCode {
// Cache those, because they might be reset when abandoning the transaction.
let transaction_id = self.tstate().transaction_id.unwrap();
let progress = self.tstate().progress;
@@ -910,25 +970,24 @@ impl<
}
FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(),
FaultHandlerCode::IgnoreError => (),
FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
FaultHandlerCode::AbandonTransaction => (),
}
self.local_cfg
.fault_handler
.report_fault(transaction_id, condition_code, progress)
}
fn notice_of_cancellation(
&mut self,
condition_code: ConditionCode,
fault_location: EntityIdTlv,
) {
self.step = TransactionStep::TransferCompletion;
self.tstate_mut().condition_code = condition_code;
self.tstate_mut().fault_location_finished = Some(fault_location);
self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled;
fn notice_of_cancellation(&self, condition_code: ConditionCode, fault_location: EntityIdTlv) {
self.set_step_internal(TransactionStep::TransferCompletion);
let tstate = self.tstate();
tstate.condition_code.set(condition_code);
tstate
.completion_disposition
.set(CompletionDisposition::Cancelled);
tstate.fault_location_finished.set(Some(fault_location));
}
fn notice_of_suspension(&mut self) {
fn notice_of_suspension(&self) {
// TODO: Implement suspension handling.
}
@@ -936,12 +995,22 @@ impl<
self.reset();
}
#[inline]
fn set_step_internal(&self, step: TransactionStep) {
self.step.set(step);
}
#[inline]
fn set_step(&mut self, step: TransactionStep) {
self.set_step_internal(step);
}
/// This function is public to allow completely resetting the handler, but it is explicitely
/// discouraged to do this. CFDP has mechanism to detect issues and errors on itself.
/// Resetting the handler might interfere with these mechanisms and lead to unexpected
/// behaviour.
pub fn reset(&mut self) {
self.step = TransactionStep::Idle;
self.set_step(TransactionStep::Idle);
self.state = State::Idle;
// self.packets_to_send_ctx.packet_available = false;
self.tparams.reset();
@@ -951,18 +1020,22 @@ impl<
let tstate = self.tstate();
let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
let finished_pdu = if tstate.condition_code == ConditionCode::NoError
|| tstate.condition_code == ConditionCode::UnsupportedChecksumType
let finished_pdu = if tstate.condition_code.get() == ConditionCode::NoError
|| tstate.condition_code.get() == ConditionCode::UnsupportedChecksumType
{
FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status)
FinishedPduCreator::new_default(
pdu_header,
tstate.delivery_code.get(),
tstate.file_status,
)
} else {
FinishedPduCreator::new_generic(
pdu_header,
tstate.condition_code,
tstate.delivery_code,
tstate.condition_code.get(),
tstate.delivery_code.get(),
tstate.file_status,
&[],
tstate.fault_location_finished,
tstate.fault_location_finished.get(),
)
};
finished_pdu.write_to_bytes(&mut self.packet_buf)?;
@@ -974,14 +1047,17 @@ impl<
Ok(1)
}
#[inline]
pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHook> {
&self.local_cfg
}
#[inline]
fn tstate(&self) -> &TransferState<Countdown> {
&self.tparams.tstate
}
#[inline]
fn tstate_mut(&mut self) -> &mut TransferState<Countdown> {
&mut self.tparams.tstate
}

View File

@@ -93,6 +93,7 @@ extern crate std;
#[cfg(feature = "alloc")]
pub mod dest;
pub mod filestore;
pub mod lost_segments;
pub mod request;
#[cfg(feature = "alloc")]
pub mod source;

753
src/lost_segments.rs Normal file
View File

@@ -0,0 +1,753 @@
//! # Lost Segment Store Module
//!
//! The core abstraction provided by this module in the [LostSegmentStore].
//!
//! The two concrete implementations provided are:
//!
//! * [LostSegmentsMap]: A hash set based implementation which can grow dynamically andcan
//! optionally be bounded. Suitable for systems where dynamic allocation is allowed.
//! * [LostSegmentsList]: A fixed-size list based implementation where the size
//! of the lost segment list is statically known at compile-time. Suitable for resource
//! constrained devices where dyanamic allocation is not allowed or possible.
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum LostSegmentError {
#[error("store is full")]
StoreFull,
#[error("segment is empty")]
EmptySegment,
#[error("segment start is larger than segment end")]
StartLargerThanEnd((u64, u64)),
#[error("large file segments are not supported")]
LargeFileSegmentNotSupported,
}
/// Generic trait to model a lost segment store.
///
/// The destination handler can use this store to keep track of lost segments and re-requesting
/// them. This abstraction allow using different data structures as a backend.
pub trait LostSegmentStore {
// Iteration
type Iter<'a>: Iterator<Item = (u64, u64)> + 'a
where
Self: 'a;
/// Iterate over all lost segments stored.
fn iter(&self) -> Self::Iter<'_>;
fn len(&self) -> usize;
/// Implementations may explicitely omit support for large file segments to save memory if
/// large file sizes are not used.
fn supports_large_file_size(&self) -> bool;
fn capacity(&self) -> Option<usize>;
fn reset(&mut self);
fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError>;
fn remove_lost_segment(
&mut self,
segment_to_remove: (u64, u64),
) -> Result<bool, LostSegmentError>;
/// The lost segment store may additionally have the capability to coalesce overlapping or
/// adjacent segments.
fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError>;
#[inline]
fn is_empty(&self) -> bool {
self.len() == 0
}
}
/// Implementation based on a [hashbrown::HashSet] which can grow dynamically.
///
/// Optionally, a maximum capacity can be specified at creation time. This container allocates at
/// run-time!
#[cfg(feature = "alloc")]
#[derive(Debug, Default)]
pub struct LostSegmentsMap {
map: hashbrown::HashSet<(u64, u64)>,
opt_capacity: Option<usize>,
}
#[cfg(feature = "alloc")]
impl LostSegmentsMap {
pub fn new(opt_capacity: Option<usize>) -> Self {
Self {
map: hashbrown::HashSet::new(),
opt_capacity,
}
}
}
#[cfg(feature = "alloc")]
impl LostSegmentStore for LostSegmentsMap {
type Iter<'a>
= core::iter::Cloned<hashbrown::hash_set::Iter<'a, (u64, u64)>>
where
Self: 'a;
fn iter(&self) -> Self::Iter<'_> {
self.map.iter().cloned()
}
#[inline]
fn len(&self) -> usize {
self.map.len()
}
#[inline]
fn supports_large_file_size(&self) -> bool {
true
}
#[inline]
fn capacity(&self) -> Option<usize> {
self.opt_capacity
}
#[inline]
fn reset(&mut self) {
self.map.clear();
}
#[inline]
fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError> {
if lost_seg.1 == lost_seg.0 {
return Err(LostSegmentError::EmptySegment);
}
if lost_seg.0 > lost_seg.1 {
return Err(LostSegmentError::StartLargerThanEnd(lost_seg));
}
if let Some(capacity) = self.opt_capacity
&& self.map.len() == capacity
{
return Err(LostSegmentError::StoreFull);
}
self.map.insert(lost_seg);
Ok(())
}
fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> {
if self.map.len() <= 1 {
return Ok(());
}
// Move out segments, discard empty/invalid ones
let mut segs: alloc::vec::Vec<(u64, u64)> = self
.map
.drain()
.filter(|(s, e)| e > s) // keep only non-empty [s, e)
.collect();
// Sort by start, then by end
segs.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
// Merge
let mut merged: alloc::vec::Vec<(u64, u64)> = alloc::vec::Vec::with_capacity(segs.len());
for (s, e) in segs {
if let Some(last) = merged.last_mut() {
// For half-open ranges, this merges overlapping or adjacent:
// [last.0, last.1) with [s, e) if s <= last.1
if s <= last.1 {
if e > last.1 {
last.1 = e;
}
continue;
}
}
merged.push((s, e));
}
// Rebuild the set
self.map.extend(merged);
Ok(())
}
#[inline]
fn remove_lost_segment(
&mut self,
segment_to_remove: (u64, u64),
) -> Result<bool, LostSegmentError> {
if segment_to_remove.1 == segment_to_remove.0 {
return Err(LostSegmentError::EmptySegment);
}
if segment_to_remove.0 > segment_to_remove.1 {
return Err(LostSegmentError::StartLargerThanEnd(segment_to_remove));
}
Ok(self.map.remove(&segment_to_remove))
}
}
/// Implementation based on a [heapless::Vec] with a statically known container size.
#[derive(Default, Debug)]
pub struct LostSegmentsList<const N: usize, T> {
list: heapless::vec::Vec<(T, T), N>,
}
/// Type definition for segment list which only supports normal file sizes. This can be used
/// to save memory required for the lost segment list.
pub type LostSegmentsListSmall<const N: usize> = LostSegmentsList<N, u32>;
impl<const N: usize, T> LostSegmentsList<N, T> {
pub fn new() -> Self {
Self {
list: heapless::Vec::new(),
}
}
#[inline]
fn num_lost_segments(&self) -> usize {
self.list.len()
}
#[inline]
fn capacity(&self) -> Option<usize> {
Some(N)
}
#[inline]
fn reset(&mut self) {
self.list.clear();
}
}
impl<const N: usize, T: Copy + Clone + Ord> LostSegmentsList<N, T> {
fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> {
// Remove empty/invalid ranges
self.list.retain(|&(s, e)| e > s);
if self.list.len() <= 1 {
return Ok(());
}
// Sort by start, then end (no extra allocs)
self.list
.as_mut_slice()
.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
// In-place merge; merges overlapping or adjacent [s, e) where next.s <= prev.e
let mut w = 0usize;
for i in 0..self.list.len() {
if w == 0 {
self.list[w] = self.list[i];
w = 1;
continue;
}
let (prev_s, mut prev_e) = self.list[w - 1];
let (s, e) = self.list[i];
if s <= prev_e {
// Extend previous
if e > prev_e {
prev_e = e;
self.list[w - 1] = (prev_s, prev_e);
}
} else {
// Start new merged interval
self.list[w] = (s, e);
w += 1;
}
}
// Truncate to merged length
self.list.truncate(w);
Ok(())
}
}
impl<const N: usize> LostSegmentStore for LostSegmentsList<N, u64> {
type Iter<'a>
= core::iter::Cloned<core::slice::Iter<'a, (u64, u64)>>
where
Self: 'a;
fn iter(&self) -> Self::Iter<'_> {
self.list.iter().cloned()
}
fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError> {
if lost_seg.1 == lost_seg.0 {
return Err(LostSegmentError::EmptySegment);
}
if lost_seg.0 > lost_seg.1 {
return Err(LostSegmentError::StartLargerThanEnd(lost_seg));
}
if self.list.is_full() {
return Err(LostSegmentError::StoreFull);
}
self.list.push(lost_seg).ok();
Ok(())
}
fn remove_lost_segment(
&mut self,
segment_to_remove: (u64, u64),
) -> Result<bool, LostSegmentError> {
if segment_to_remove.1 == segment_to_remove.0 {
return Err(LostSegmentError::EmptySegment);
}
if segment_to_remove.0 > segment_to_remove.1 {
return Err(LostSegmentError::StartLargerThanEnd(segment_to_remove));
}
let current_len = self.list.len();
self.list.retain(|val| val != &segment_to_remove);
Ok(self.list.len() != current_len)
}
fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> {
self.coalesce_lost_segments()
}
fn len(&self) -> usize {
self.num_lost_segments()
}
fn supports_large_file_size(&self) -> bool {
true
}
fn capacity(&self) -> Option<usize> {
self.capacity()
}
fn reset(&mut self) {
self.reset();
}
}
impl<const N: usize> LostSegmentStore for LostSegmentsList<N, u32> {
type Iter<'a>
= core::iter::Map<core::slice::Iter<'a, (u32, u32)>, fn(&(u32, u32)) -> (u64, u64)>
where
Self: 'a;
fn iter(&self) -> Self::Iter<'_> {
self.list.iter().map(|pair| (pair.0 as u64, pair.1 as u64))
}
fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError> {
if lost_seg.1 == lost_seg.0 {
return Err(LostSegmentError::EmptySegment);
}
if lost_seg.0 > lost_seg.1 {
return Err(LostSegmentError::StartLargerThanEnd(lost_seg));
}
if lost_seg.1 > u32::MAX as u64 || lost_seg.0 > u32::MAX as u64 {
return Err(LostSegmentError::LargeFileSegmentNotSupported);
}
if self.list.is_full() {
return Err(LostSegmentError::StoreFull);
}
self.list.push((lost_seg.0 as u32, lost_seg.1 as u32)).ok();
Ok(())
}
fn remove_lost_segment(
&mut self,
segment_to_remove: (u64, u64),
) -> Result<bool, LostSegmentError> {
if segment_to_remove.1 == segment_to_remove.0 {
return Err(LostSegmentError::EmptySegment);
}
if segment_to_remove.0 > segment_to_remove.1 {
return Err(LostSegmentError::StartLargerThanEnd(segment_to_remove));
}
if segment_to_remove.1 > u32::MAX as u64 || segment_to_remove.0 > u32::MAX as u64 {
return Err(LostSegmentError::LargeFileSegmentNotSupported);
}
let current_len = self.list.len();
self.list
.retain(|val| (val.0 as u64, val.1 as u64) != segment_to_remove);
Ok(self.list.len() != current_len)
}
fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> {
self.coalesce_lost_segments()
}
#[inline]
fn len(&self) -> usize {
self.num_lost_segments()
}
#[inline]
fn supports_large_file_size(&self) -> bool {
false
}
#[inline]
fn capacity(&self) -> Option<usize> {
self.capacity()
}
#[inline]
fn reset(&mut self) {
self.reset();
}
}
#[cfg(test)]
mod tests {
use std::vec::Vec;
use super::*;
fn generic_basic_state_test(
store: &impl LostSegmentStore,
supports_large_file_size: bool,
capacity: Option<usize>,
) {
assert_eq!(store.supports_large_file_size(), supports_large_file_size);
assert_eq!(store.len(), 0);
assert!(store.is_empty());
assert_eq!(store.capacity(), capacity);
assert_eq!(store.iter().count(), 0);
}
fn generic_error_tests(store: &mut impl LostSegmentStore) {
matches!(
store.add_lost_segment((0, 0)).unwrap_err(),
LostSegmentError::EmptySegment
);
matches!(
store.add_lost_segment((10, 0)).unwrap_err(),
LostSegmentError::StartLargerThanEnd((10, 0))
);
matches!(
store.remove_lost_segment((0, 0)).unwrap_err(),
LostSegmentError::EmptySegment
);
matches!(
store.remove_lost_segment((10, 0)).unwrap_err(),
LostSegmentError::StartLargerThanEnd((10, 0))
);
}
fn generic_add_segments_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
assert_eq!(store.len(), 1);
for segment in store.iter() {
assert_eq!(segment, (0, 20));
}
store.add_lost_segment((20, 40)).unwrap();
let mut segments: Vec<(u64, u64)> = store.iter().collect();
segments.sort_unstable();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0], (0, 20));
assert_eq!(segments[1], (20, 40));
}
fn generic_reset_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
assert_eq!(store.len(), 1);
store.reset();
assert_eq!(store.len(), 0);
assert!(store.is_empty());
assert_eq!(store.iter().count(), 0);
}
fn generic_removal_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
store.add_lost_segment((20, 40)).unwrap();
assert_eq!(store.len(), 2);
assert!(store.remove_lost_segment((0, 20)).unwrap());
assert_eq!(store.len(), 1);
assert!(!store.remove_lost_segment((0, 20)).unwrap());
assert_eq!(store.len(), 1);
assert!(store.remove_lost_segment((20, 40)).unwrap());
assert_eq!(store.len(), 0);
}
fn generic_coalescing_simple_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
store.add_lost_segment((20, 40)).unwrap();
store.add_lost_segment((40, 60)).unwrap();
store.coalesce_lost_segments().unwrap();
for segment in store.iter() {
assert_eq!(segment, (0, 60));
}
assert_eq!(store.len(), 1);
}
fn generic_coalescing_simple_with_gaps_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
store.add_lost_segment((20, 40)).unwrap();
store.add_lost_segment((40, 60)).unwrap();
store.add_lost_segment((80, 100)).unwrap();
store.add_lost_segment((110, 120)).unwrap();
store.add_lost_segment((120, 130)).unwrap();
store.coalesce_lost_segments().unwrap();
let mut segments: Vec<(u64, u64)> = store.iter().collect();
segments.sort_unstable();
assert_eq!(segments.len(), 3);
assert_eq!(segments[0], (0, 60));
assert_eq!(segments[1], (80, 100));
assert_eq!(segments[2], (110, 130));
}
fn generic_coalescing_overlapping_simple_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
store.add_lost_segment((10, 30)).unwrap();
store.coalesce_lost_segments().unwrap();
for segment in store.iter() {
assert_eq!(segment, (0, 30));
}
assert_eq!(store.len(), 1);
}
fn generic_coalescing_overlapping_adjacent_test(store: &mut impl LostSegmentStore) {
store.add_lost_segment((0, 20)).unwrap();
store.add_lost_segment((10, 30)).unwrap();
store.add_lost_segment((20, 40)).unwrap();
store.coalesce_lost_segments().unwrap();
for segment in store.iter() {
assert_eq!(segment, (0, 40));
}
assert_eq!(store.len(), 1);
}
fn generic_useless_coalescing_test(store: &mut impl LostSegmentStore) {
// Is okay, does nothing.
assert!(store.coalesce_lost_segments().is_ok());
assert_eq!(store.len(), 0);
assert!(store.is_empty());
store.add_lost_segment((10, 20)).unwrap();
// Is okay, does nothing.
assert!(store.coalesce_lost_segments().is_ok());
for segment in store.iter() {
assert_eq!(segment, (10, 20));
}
}
#[test]
fn test_basic_map_state_map() {
let store = LostSegmentsMap::default();
generic_basic_state_test(&store, true, None);
}
#[test]
fn test_basic_errors_map() {
let mut store = LostSegmentsMap::default();
generic_error_tests(&mut store);
}
#[test]
fn test_add_segments_map() {
let mut store = LostSegmentsMap::default();
generic_add_segments_test(&mut store);
}
#[test]
fn test_reset_map() {
let mut store = LostSegmentsMap::default();
generic_reset_test(&mut store);
}
#[test]
fn test_removal_map() {
let mut store = LostSegmentsMap::default();
generic_removal_test(&mut store);
}
#[test]
fn test_cap_limit_map() {
let mut store = LostSegmentsMap::new(Some(4));
for i in 0..4 {
store.add_lost_segment((i * 20, (i + 1) * 20)).unwrap();
}
matches!(
store.add_lost_segment((80, 100)),
Err(LostSegmentError::StoreFull)
);
}
#[test]
fn test_basic_list_state_list() {
let store = LostSegmentsListSmall::<12>::default();
generic_basic_state_test(&store, false, Some(12));
let store = LostSegmentsListSmall::<12>::new();
generic_basic_state_test(&store, false, Some(12));
}
#[test]
fn test_basic_errors_list() {
let mut store = LostSegmentsListSmall::<12>::default();
generic_error_tests(&mut store);
}
#[test]
fn test_add_segments_list() {
let mut store = LostSegmentsListSmall::<12>::default();
generic_add_segments_test(&mut store);
}
#[test]
fn test_reset_list() {
let mut store = LostSegmentsListSmall::<12>::default();
generic_reset_test(&mut store);
}
#[test]
fn test_removal_list() {
let mut store = LostSegmentsListSmall::<12>::default();
generic_removal_test(&mut store);
}
#[test]
fn test_cap_limit_list() {
let mut store = LostSegmentsListSmall::<4>::default();
for i in 0..4 {
store.add_lost_segment((i * 20, (i + 1) * 20)).unwrap();
}
matches!(
store.add_lost_segment((80, 100)),
Err(LostSegmentError::StoreFull)
);
}
#[test]
fn test_large_file_size_unsupported() {
let mut store = LostSegmentsListSmall::<4>::default();
matches!(
store.add_lost_segment((0, u32::MAX as u64 + 1)),
Err(LostSegmentError::LargeFileSegmentNotSupported)
);
}
#[test]
fn test_large_file_size_unsupported_2() {
let mut store = LostSegmentsListSmall::<4>::default();
matches!(
store.remove_lost_segment((0, u32::MAX as u64 + 1)),
Err(LostSegmentError::LargeFileSegmentNotSupported)
);
}
#[test]
fn test_basic_list_state_list_large() {
let store = LostSegmentsList::<12, u64>::default();
generic_basic_state_test(&store, true, Some(12));
}
#[test]
fn test_basic_errors_list_large() {
let mut store = LostSegmentsList::<12, u64>::default();
generic_error_tests(&mut store);
}
#[test]
fn test_add_segments_list_large() {
let mut store = LostSegmentsList::<12, u64>::default();
generic_add_segments_test(&mut store);
}
#[test]
fn test_reset_list_large() {
let mut store = LostSegmentsList::<12, u64>::default();
generic_reset_test(&mut store);
}
#[test]
fn test_removal_list_large() {
let mut store = LostSegmentsList::<12, u64>::default();
generic_removal_test(&mut store);
}
#[test]
fn test_cap_limit_list_large() {
let mut store = LostSegmentsList::<4, u64>::default();
for i in 0..4 {
store.add_lost_segment((i * 20, (i + 1) * 20)).unwrap();
}
matches!(
store.add_lost_segment((80, 100)),
Err(LostSegmentError::StoreFull)
);
}
#[test]
fn test_coalescing_simple_in_map() {
let mut store = LostSegmentsMap::default();
generic_coalescing_simple_test(&mut store);
}
#[test]
fn test_useless_coalescing_map() {
let mut store = LostSegmentsMap::default();
generic_useless_coalescing_test(&mut store);
}
#[test]
fn test_useless_coalescing_list() {
let mut store = LostSegmentsListSmall::<4>::default();
generic_useless_coalescing_test(&mut store);
}
#[test]
fn test_coalescing_simple_in_list() {
let mut store = LostSegmentsListSmall::<4>::default();
generic_coalescing_simple_test(&mut store);
}
#[test]
fn test_coalescing_simple_in_list_large() {
let mut store = LostSegmentsList::<4, u64>::default();
generic_coalescing_simple_test(&mut store);
}
#[test]
fn test_coalescing_overlapping_simple_in_map() {
let mut store = LostSegmentsMap::default();
generic_coalescing_overlapping_simple_test(&mut store);
}
#[test]
fn test_coalescing_overlapping_simple_in_list() {
let mut store = LostSegmentsListSmall::<4>::default();
generic_coalescing_overlapping_simple_test(&mut store);
}
#[test]
fn test_coalescing_overlapping_simple_in_list_large() {
let mut store = LostSegmentsList::<4, u64>::default();
generic_coalescing_overlapping_simple_test(&mut store);
}
#[test]
fn test_coalescing_overlapping_adjacent_in_map() {
let mut store = LostSegmentsMap::default();
generic_coalescing_overlapping_adjacent_test(&mut store);
}
#[test]
fn test_coalescing_overlapping_adjacent_in_list() {
let mut store = LostSegmentsListSmall::<4>::default();
generic_coalescing_overlapping_adjacent_test(&mut store);
}
#[test]
fn test_coalescing_overlapping_adjacent_in_list_large() {
let mut store = LostSegmentsList::<4, u64>::default();
generic_coalescing_overlapping_adjacent_test(&mut store);
}
#[test]
fn test_coalescing_simple_with_gaps_in_map() {
let mut store = LostSegmentsMap::default();
generic_coalescing_simple_with_gaps_test(&mut store);
}
#[test]
fn test_coalescing_simple_with_gaps_in_list() {
let mut store = LostSegmentsListSmall::<8>::default();
generic_coalescing_simple_with_gaps_test(&mut store);
}
#[test]
fn test_coalescing_simple_with_gaps_in_list_large() {
let mut store = LostSegmentsList::<8, u64>::default();
generic_coalescing_simple_with_gaps_test(&mut store);
}
}

View File

@@ -27,7 +27,7 @@
//! 4. A Finished PDU will be awaited, for example one generated using
//! [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
//!
//! ### Acknowledged transfer (*not implemented yet*)
//! ### Acknowledged transfer
//!
//! 4. A EOF ACK packet will be awaited, for example one generated using
//! [spacepackets::cfdp::pdu::ack::AckPdu].