diff --git a/Cargo.toml b/Cargo.toml index 2d03fa7..86162b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ derive-new = ">=0.6, <=0.7" hashbrown = { version = ">=0.14, <=0.15", optional = true } spacepackets = { git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git", version = "0.16", default-features = false } thiserror = { version = "2", default-features = false } +heapless = "0.9" serde = { version = "1", optional = true } defmt = { version = "1", optional = true } diff --git a/src/dest.rs b/src/dest.rs index b36f648..587c8dd 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -30,7 +30,10 @@ //! 4. A Finished PDU has been sent back to the remote side. //! 5. A Finished PDU ACK was received. use crate::{DummyPduProvider, GenericSendError, PduProvider, user::TransactionFinishedParams}; -use core::str::{Utf8Error, from_utf8, from_utf8_unchecked}; +use core::{ + cell::Cell, + str::{Utf8Error, from_utf8, from_utf8_unchecked}, +}; use super::{ CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, @@ -94,11 +97,11 @@ struct TransferState { // TODO: Can we delete this for good? // file_size_eof: u64, metadata_only: bool, - condition_code: ConditionCode, - delivery_code: DeliveryCode, - fault_location_finished: Option, + condition_code: Cell, + delivery_code: Cell, + fault_location_finished: Cell>, file_status: FileStatus, - completion_disposition: CompletionDisposition, + completion_disposition: Cell, checksum: u32, current_check_count: u32, current_check_timer: Option, @@ -112,11 +115,11 @@ impl Default for TransferState { progress: Default::default(), // file_size_eof: Default::default(), metadata_only: false, - condition_code: ConditionCode::NoError, - delivery_code: DeliveryCode::Incomplete, - fault_location_finished: None, + condition_code: Cell::new(ConditionCode::NoError), + delivery_code: Cell::new(DeliveryCode::Incomplete), + fault_location_finished: Cell::new(None), file_status: FileStatus::Unreported, - completion_disposition: CompletionDisposition::Completed, + completion_disposition: Cell::new(CompletionDisposition::Completed), checksum: 0, current_check_count: 0, current_check_timer: None, @@ -182,8 +185,8 @@ impl Default for TransactionParams { impl TransactionParams { fn reset(&mut self) { - self.tstate.condition_code = ConditionCode::NoError; - self.tstate.delivery_code = DeliveryCode::Incomplete; + self.tstate.condition_code.set(ConditionCode::NoError); + self.tstate.delivery_code.set(DeliveryCode::Incomplete); self.tstate.file_status = FileStatus::Unreported; } } @@ -263,7 +266,7 @@ pub struct DestinationHandler< CheckTimerProvider: CountdownProvider, > { local_cfg: LocalEntityConfig, - step: TransactionStep, + step: core::cell::Cell, state: State, tparams: TransactionParams, packet_buf: alloc::vec::Vec, @@ -321,7 +324,7 @@ impl< ) -> Self { Self { local_cfg, - step: TransactionStep::Idle, + step: Cell::new(TransactionStep::Idle), state: State::Idle, tparams: Default::default(), packet_buf: alloc::vec![0; max_packet_len], @@ -387,7 +390,7 @@ impl< EntityIdTlv::new(self.local_cfg.id), ); - self.step = TransactionStep::TransferCompletion; + self.set_step(TransactionStep::TransferCompletion); return true; } } @@ -511,7 +514,7 @@ impl< } } self.state = State::Busy; - self.step = TransactionStep::TransactionStart; + self.set_step(TransactionStep::TransactionStart); Ok(()) } @@ -520,9 +523,10 @@ impl< user: &mut impl CfdpUser, raw_packet: &[u8], ) -> Result<(), DestError> { + let step = self.step.get(); if self.state == State::Idle - || (self.step != TransactionStep::ReceivingFileDataPdus - && self.step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling) + || (step != TransactionStep::ReceivingFileDataPdus + && step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling) { return Err(DestError::WrongStateForFileData); } @@ -547,7 +551,11 @@ impl< fd_pdu.offset(), fd_pdu.file_data(), ) { - self.declare_fault(ConditionCode::FilestoreRejection); + if self.declare_fault(ConditionCode::FilestoreRejection) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); + } return Err(e.into()); } self.tstate_mut().progress += fd_pdu.file_data().len() as u64; @@ -559,7 +567,7 @@ impl< cfdp_user: &mut impl CfdpUser, raw_packet: &[u8], ) -> Result<(), DestError> { - if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus { + if self.state == State::Idle || self.step() != TransactionStep::ReceivingFileDataPdus { return Err(DestError::WrongStateForEof); } let eof_pdu = EofPdu::from_bytes(raw_packet)?; @@ -578,9 +586,15 @@ impl< ); self.tparams.tstate.progress = eof_pdu.file_size(); if eof_pdu.file_size() > 0 { - self.tparams.tstate.delivery_code = DeliveryCode::Incomplete; + self.tparams + .tstate + .delivery_code + .set(DeliveryCode::Incomplete); } else { - self.tparams.tstate.delivery_code = DeliveryCode::Complete; + self.tparams + .tstate + .delivery_code + .set(DeliveryCode::Complete); } // TODO: The cancel EOF also supplies a checksum and a progress number. We could cross // check that checksum, but how would we deal with a checksum failure? The standard @@ -595,27 +609,43 @@ impl< } fn trigger_notice_of_completion_cancelled( - &mut self, + &self, cond_code: ConditionCode, fault_location: EntityIdTlv, ) { - self.tparams.tstate.completion_disposition = CompletionDisposition::Cancelled; - self.tparams.tstate.condition_code = cond_code; - self.tparams.tstate.fault_location_finished = Some(fault_location); + self.tparams + .tstate + .completion_disposition + .set(CompletionDisposition::Cancelled); + self.tparams.tstate.condition_code.set(cond_code); + self.tparams + .tstate + .fault_location_finished + .set(Some(fault_location)); // For anything larger than 0, we'd have to do a checksum check to verify whether // the delivery is really complete, and we need the EOF checksum for that.. if self.tparams.tstate.progress == 0 { - self.tparams.tstate.delivery_code = DeliveryCode::Complete; + self.tparams + .tstate + .delivery_code + .set(DeliveryCode::Complete); } } /// Returns whether the transfer can be completed regularly. fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result { // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size - if self.tparams.tstate.progress > eof_pdu.file_size() - && self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError - { - return Ok(false); + if self.tparams.tstate.progress > eof_pdu.file_size() { + match self.declare_fault(ConditionCode::FileSizeError) { + FaultHandlerCode::IgnoreError => (), + FaultHandlerCode::AbandonTransaction => { + self.abandon_transaction(); + return Ok(false); + } + FaultHandlerCode::NoticeOfCancellation | FaultHandlerCode::NoticeOfSuspension => { + return Ok(false); + } + } } else if (self.tparams.tstate.progress < eof_pdu.file_size()) && self.tparams.transmission_mode() == TransmissionMode::Acknowledged { @@ -645,10 +675,10 @@ impl< fn file_transfer_complete_transition(&mut self) { if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged { - self.step = TransactionStep::TransferCompletion; + self.set_step(TransactionStep::TransferCompletion); } else { // TODO: Prepare ACK PDU somehow. - self.step = TransactionStep::SendingAckPdu; + self.set_step(TransactionStep::SendingAckPdu); } } @@ -675,33 +705,54 @@ impl< Ok(checksum_success) => { file_delivery_complete = checksum_success; if !checksum_success { - self.tparams.tstate.delivery_code = DeliveryCode::Incomplete; - self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure; + self.tparams + .tstate + .delivery_code + .set(DeliveryCode::Incomplete); + self.tparams + .tstate + .condition_code + .set(ConditionCode::FileChecksumFailure); } } Err(e) => match e { FilestoreError::ChecksumTypeNotImplemented(_) => { - self.declare_fault(ConditionCode::UnsupportedChecksumType); + if self.declare_fault(ConditionCode::UnsupportedChecksumType) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); + } // For this case, the applicable algorithm shall be the the null checksum, // which is always succesful. file_delivery_complete = true; } _ => { - self.declare_fault(ConditionCode::FilestoreRejection); + if self.declare_fault(ConditionCode::FilestoreRejection) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); + } + // Treat this equivalent to a failed checksum procedure. } }, }; } if file_delivery_complete { - self.tparams.tstate.delivery_code = DeliveryCode::Complete; - self.tparams.tstate.condition_code = ConditionCode::NoError; + self.tparams + .tstate + .delivery_code + .set(DeliveryCode::Complete); + self.tparams + .tstate + .condition_code + .set(ConditionCode::NoError); } file_delivery_complete } fn start_check_limit_handling(&mut self) { - self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling; + self.set_step(TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling); self.tparams.tstate.current_check_timer = Some(self.check_timer_creator.create_countdown( TimerContext::CheckLimit { local_id: self.local_cfg.id, @@ -725,7 +776,11 @@ impl< if self.tparams.tstate.current_check_count + 1 >= self.tparams.remote_cfg.unwrap().check_limit { - self.declare_fault(ConditionCode::CheckLimitReached); + if self.declare_fault(ConditionCode::CheckLimitReached) + == FaultHandlerCode::AbandonTransaction + { + self.abandon_transaction(); + } } else { self.tparams.tstate.current_check_count += 1; self.tparams @@ -744,7 +799,7 @@ impl< fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { let mut sent_packets = 0; - if self.step == TransactionStep::TransactionStart { + if self.step() == TransactionStep::TransactionStart { let result = self.transaction_start(cfdp_user); if let Err(e) = result { // If we can not even start the transaction properly, reset the handler. @@ -754,28 +809,30 @@ impl< return Err(e); } } - if self.step == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { + if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { self.check_limit_handling(); } - if self.step == TransactionStep::TransferCompletion { + if self.step() == TransactionStep::TransferCompletion { sent_packets += self.transfer_completion(cfdp_user)?; } - if self.step == TransactionStep::SendingAckPdu { + if self.step() == TransactionStep::SendingAckPdu { return Err(DestError::NotImplemented); } - if self.step == TransactionStep::SendingFinishedPdu { + if self.step() == TransactionStep::SendingFinishedPdu { self.reset(); } Ok(sent_packets) } /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. + #[inline] pub fn step(&self) -> TransactionStep { - self.step + self.step.get() } /// Get the step, which denotes whether the CFDP handler is active, and which CFDP class /// is used if it is active. + #[inline] pub fn state(&self) -> State { self.state } @@ -846,7 +903,8 @@ impl< self.vfs.create_file(dest_path_str)?; } self.tparams.tstate.file_status = FileStatus::Retained; - self.step = TransactionStep::ReceivingFileDataPdus; + drop(msgs_to_user); + self.set_step(TransactionStep::ReceivingFileDataPdus); Ok(()) } @@ -858,12 +916,12 @@ impl< { sent_packets += self.send_finished_pdu()?; } - self.step = TransactionStep::SendingFinishedPdu; + self.set_step(TransactionStep::SendingFinishedPdu); Ok(sent_packets) } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { - if self.tstate().completion_disposition == CompletionDisposition::Completed { + if self.tstate().completion_disposition.get() == CompletionDisposition::Completed { // TODO: Execute any filestore requests } else if self .tparams @@ -871,7 +929,7 @@ impl< .as_ref() .unwrap() .disposition_on_cancellation - && self.tstate().delivery_code == DeliveryCode::Incomplete + && self.tstate().delivery_code.get() == DeliveryCode::Incomplete { // Safety: We already verified that the path is valid during the transaction start. let dest_path = unsafe { @@ -888,15 +946,17 @@ impl< let tstate = self.tstate(); let transaction_finished_params = TransactionFinishedParams { id: tstate.transaction_id.unwrap(), - condition_code: tstate.condition_code, - delivery_code: tstate.delivery_code, + condition_code: tstate.condition_code.get(), + delivery_code: tstate.delivery_code.get(), file_status: tstate.file_status, }; cfdp_user.transaction_finished_indication(&transaction_finished_params); Ok(()) } - fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode { + // When the fault handler code [FaultHandlerCode::AbandonTransaction] is returned, the caller + // must call [Self::abandon_transaction] as soon as it is possible. + fn declare_fault(&self, condition_code: ConditionCode) -> FaultHandlerCode { // Cache those, because they might be reset when abandoning the transaction. let transaction_id = self.tstate().transaction_id.unwrap(); let progress = self.tstate().progress; @@ -910,25 +970,24 @@ impl< } FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(), FaultHandlerCode::IgnoreError => (), - FaultHandlerCode::AbandonTransaction => self.abandon_transaction(), + FaultHandlerCode::AbandonTransaction => (), } self.local_cfg .fault_handler .report_fault(transaction_id, condition_code, progress) } - fn notice_of_cancellation( - &mut self, - condition_code: ConditionCode, - fault_location: EntityIdTlv, - ) { - self.step = TransactionStep::TransferCompletion; - self.tstate_mut().condition_code = condition_code; - self.tstate_mut().fault_location_finished = Some(fault_location); - self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled; + fn notice_of_cancellation(&self, condition_code: ConditionCode, fault_location: EntityIdTlv) { + self.set_step_internal(TransactionStep::TransferCompletion); + let tstate = self.tstate(); + tstate.condition_code.set(condition_code); + tstate + .completion_disposition + .set(CompletionDisposition::Cancelled); + tstate.fault_location_finished.set(Some(fault_location)); } - fn notice_of_suspension(&mut self) { + fn notice_of_suspension(&self) { // TODO: Implement suspension handling. } @@ -936,12 +995,22 @@ impl< self.reset(); } + #[inline] + fn set_step_internal(&self, step: TransactionStep) { + self.step.set(step); + } + + #[inline] + fn set_step(&mut self, step: TransactionStep) { + self.set_step_internal(step); + } + /// This function is public to allow completely resetting the handler, but it is explicitely /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. /// Resetting the handler might interfere with these mechanisms and lead to unexpected /// behaviour. pub fn reset(&mut self) { - self.step = TransactionStep::Idle; + self.set_step(TransactionStep::Idle); self.state = State::Idle; // self.packets_to_send_ctx.packet_available = false; self.tparams.reset(); @@ -951,18 +1020,22 @@ impl< let tstate = self.tstate(); let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0); - let finished_pdu = if tstate.condition_code == ConditionCode::NoError - || tstate.condition_code == ConditionCode::UnsupportedChecksumType + let finished_pdu = if tstate.condition_code.get() == ConditionCode::NoError + || tstate.condition_code.get() == ConditionCode::UnsupportedChecksumType { - FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status) + FinishedPduCreator::new_default( + pdu_header, + tstate.delivery_code.get(), + tstate.file_status, + ) } else { FinishedPduCreator::new_generic( pdu_header, - tstate.condition_code, - tstate.delivery_code, + tstate.condition_code.get(), + tstate.delivery_code.get(), tstate.file_status, &[], - tstate.fault_location_finished, + tstate.fault_location_finished.get(), ) }; finished_pdu.write_to_bytes(&mut self.packet_buf)?; @@ -974,14 +1047,17 @@ impl< Ok(1) } + #[inline] pub fn local_cfg(&self) -> &LocalEntityConfig { &self.local_cfg } + #[inline] fn tstate(&self) -> &TransferState { &self.tparams.tstate } + #[inline] fn tstate_mut(&mut self) -> &mut TransferState { &mut self.tparams.tstate } diff --git a/src/lib.rs b/src/lib.rs index adef6df..d9ef1cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ extern crate std; #[cfg(feature = "alloc")] pub mod dest; pub mod filestore; +pub mod lost_segments; pub mod request; #[cfg(feature = "alloc")] pub mod source; diff --git a/src/lost_segments.rs b/src/lost_segments.rs new file mode 100644 index 0000000..bd2af38 --- /dev/null +++ b/src/lost_segments.rs @@ -0,0 +1,753 @@ +//! # Lost Segment Store Module +//! +//! The core abstraction provided by this module in the [LostSegmentStore]. +//! +//! The two concrete implementations provided are: +//! +//! * [LostSegmentsMap]: A hash set based implementation which can grow dynamically andcan +//! optionally be bounded. Suitable for systems where dynamic allocation is allowed. +//! * [LostSegmentsList]: A fixed-size list based implementation where the size +//! of the lost segment list is statically known at compile-time. Suitable for resource +//! constrained devices where dyanamic allocation is not allowed or possible. +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[non_exhaustive] +pub enum LostSegmentError { + #[error("store is full")] + StoreFull, + #[error("segment is empty")] + EmptySegment, + #[error("segment start is larger than segment end")] + StartLargerThanEnd((u64, u64)), + #[error("large file segments are not supported")] + LargeFileSegmentNotSupported, +} + +/// Generic trait to model a lost segment store. +/// +/// The destination handler can use this store to keep track of lost segments and re-requesting +/// them. This abstraction allow using different data structures as a backend. +pub trait LostSegmentStore { + // Iteration + type Iter<'a>: Iterator + 'a + where + Self: 'a; + + /// Iterate over all lost segments stored. + fn iter(&self) -> Self::Iter<'_>; + + fn len(&self) -> usize; + + /// Implementations may explicitely omit support for large file segments to save memory if + /// large file sizes are not used. + fn supports_large_file_size(&self) -> bool; + fn capacity(&self) -> Option; + fn reset(&mut self); + + fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError>; + fn remove_lost_segment( + &mut self, + segment_to_remove: (u64, u64), + ) -> Result; + + /// The lost segment store may additionally have the capability to coalesce overlapping or + /// adjacent segments. + fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError>; + + #[inline] + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// Implementation based on a [hashbrown::HashSet] which can grow dynamically. +/// +/// Optionally, a maximum capacity can be specified at creation time. This container allocates at +/// run-time! +#[cfg(feature = "alloc")] +#[derive(Debug, Default)] +pub struct LostSegmentsMap { + map: hashbrown::HashSet<(u64, u64)>, + opt_capacity: Option, +} + +#[cfg(feature = "alloc")] +impl LostSegmentsMap { + pub fn new(opt_capacity: Option) -> Self { + Self { + map: hashbrown::HashSet::new(), + opt_capacity, + } + } +} + +#[cfg(feature = "alloc")] +impl LostSegmentStore for LostSegmentsMap { + type Iter<'a> + = core::iter::Cloned> + where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + self.map.iter().cloned() + } + + #[inline] + fn len(&self) -> usize { + self.map.len() + } + + #[inline] + fn supports_large_file_size(&self) -> bool { + true + } + + #[inline] + fn capacity(&self) -> Option { + self.opt_capacity + } + + #[inline] + fn reset(&mut self) { + self.map.clear(); + } + + #[inline] + fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError> { + if lost_seg.1 == lost_seg.0 { + return Err(LostSegmentError::EmptySegment); + } + if lost_seg.0 > lost_seg.1 { + return Err(LostSegmentError::StartLargerThanEnd(lost_seg)); + } + if let Some(capacity) = self.opt_capacity + && self.map.len() == capacity + { + return Err(LostSegmentError::StoreFull); + } + self.map.insert(lost_seg); + Ok(()) + } + + fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> { + if self.map.len() <= 1 { + return Ok(()); + } + + // Move out segments, discard empty/invalid ones + let mut segs: alloc::vec::Vec<(u64, u64)> = self + .map + .drain() + .filter(|(s, e)| e > s) // keep only non-empty [s, e) + .collect(); + + // Sort by start, then by end + segs.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1))); + + // Merge + let mut merged: alloc::vec::Vec<(u64, u64)> = alloc::vec::Vec::with_capacity(segs.len()); + for (s, e) in segs { + if let Some(last) = merged.last_mut() { + // For half-open ranges, this merges overlapping or adjacent: + // [last.0, last.1) with [s, e) if s <= last.1 + if s <= last.1 { + if e > last.1 { + last.1 = e; + } + continue; + } + } + merged.push((s, e)); + } + + // Rebuild the set + self.map.extend(merged); + Ok(()) + } + + #[inline] + fn remove_lost_segment( + &mut self, + segment_to_remove: (u64, u64), + ) -> Result { + if segment_to_remove.1 == segment_to_remove.0 { + return Err(LostSegmentError::EmptySegment); + } + if segment_to_remove.0 > segment_to_remove.1 { + return Err(LostSegmentError::StartLargerThanEnd(segment_to_remove)); + } + Ok(self.map.remove(&segment_to_remove)) + } +} + +/// Implementation based on a [heapless::Vec] with a statically known container size. +#[derive(Default, Debug)] +pub struct LostSegmentsList { + list: heapless::vec::Vec<(T, T), N>, +} + +/// Type definition for segment list which only supports normal file sizes. This can be used +/// to save memory required for the lost segment list. +pub type LostSegmentsListSmall = LostSegmentsList; + +impl LostSegmentsList { + pub fn new() -> Self { + Self { + list: heapless::Vec::new(), + } + } + + #[inline] + fn num_lost_segments(&self) -> usize { + self.list.len() + } + + #[inline] + fn capacity(&self) -> Option { + Some(N) + } + + #[inline] + fn reset(&mut self) { + self.list.clear(); + } +} + +impl LostSegmentsList { + fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> { + // Remove empty/invalid ranges + self.list.retain(|&(s, e)| e > s); + if self.list.len() <= 1 { + return Ok(()); + } + + // Sort by start, then end (no extra allocs) + self.list + .as_mut_slice() + .sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1))); + + // In-place merge; merges overlapping or adjacent [s, e) where next.s <= prev.e + let mut w = 0usize; + for i in 0..self.list.len() { + if w == 0 { + self.list[w] = self.list[i]; + w = 1; + continue; + } + + let (prev_s, mut prev_e) = self.list[w - 1]; + let (s, e) = self.list[i]; + + if s <= prev_e { + // Extend previous + if e > prev_e { + prev_e = e; + self.list[w - 1] = (prev_s, prev_e); + } + } else { + // Start new merged interval + self.list[w] = (s, e); + w += 1; + } + } + + // Truncate to merged length + self.list.truncate(w); + Ok(()) + } +} + +impl LostSegmentStore for LostSegmentsList { + type Iter<'a> + = core::iter::Cloned> + where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + self.list.iter().cloned() + } + + fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError> { + if lost_seg.1 == lost_seg.0 { + return Err(LostSegmentError::EmptySegment); + } + if lost_seg.0 > lost_seg.1 { + return Err(LostSegmentError::StartLargerThanEnd(lost_seg)); + } + if self.list.is_full() { + return Err(LostSegmentError::StoreFull); + } + self.list.push(lost_seg).ok(); + Ok(()) + } + + fn remove_lost_segment( + &mut self, + segment_to_remove: (u64, u64), + ) -> Result { + if segment_to_remove.1 == segment_to_remove.0 { + return Err(LostSegmentError::EmptySegment); + } + if segment_to_remove.0 > segment_to_remove.1 { + return Err(LostSegmentError::StartLargerThanEnd(segment_to_remove)); + } + let current_len = self.list.len(); + self.list.retain(|val| val != &segment_to_remove); + Ok(self.list.len() != current_len) + } + + fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> { + self.coalesce_lost_segments() + } + + fn len(&self) -> usize { + self.num_lost_segments() + } + + fn supports_large_file_size(&self) -> bool { + true + } + + fn capacity(&self) -> Option { + self.capacity() + } + + fn reset(&mut self) { + self.reset(); + } +} + +impl LostSegmentStore for LostSegmentsList { + type Iter<'a> + = core::iter::Map, fn(&(u32, u32)) -> (u64, u64)> + where + Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + self.list.iter().map(|pair| (pair.0 as u64, pair.1 as u64)) + } + + fn add_lost_segment(&mut self, lost_seg: (u64, u64)) -> Result<(), LostSegmentError> { + if lost_seg.1 == lost_seg.0 { + return Err(LostSegmentError::EmptySegment); + } + if lost_seg.0 > lost_seg.1 { + return Err(LostSegmentError::StartLargerThanEnd(lost_seg)); + } + if lost_seg.1 > u32::MAX as u64 || lost_seg.0 > u32::MAX as u64 { + return Err(LostSegmentError::LargeFileSegmentNotSupported); + } + if self.list.is_full() { + return Err(LostSegmentError::StoreFull); + } + self.list.push((lost_seg.0 as u32, lost_seg.1 as u32)).ok(); + Ok(()) + } + + fn remove_lost_segment( + &mut self, + segment_to_remove: (u64, u64), + ) -> Result { + if segment_to_remove.1 == segment_to_remove.0 { + return Err(LostSegmentError::EmptySegment); + } + if segment_to_remove.0 > segment_to_remove.1 { + return Err(LostSegmentError::StartLargerThanEnd(segment_to_remove)); + } + if segment_to_remove.1 > u32::MAX as u64 || segment_to_remove.0 > u32::MAX as u64 { + return Err(LostSegmentError::LargeFileSegmentNotSupported); + } + let current_len = self.list.len(); + self.list + .retain(|val| (val.0 as u64, val.1 as u64) != segment_to_remove); + Ok(self.list.len() != current_len) + } + + fn coalesce_lost_segments(&mut self) -> Result<(), LostSegmentError> { + self.coalesce_lost_segments() + } + + #[inline] + fn len(&self) -> usize { + self.num_lost_segments() + } + + #[inline] + fn supports_large_file_size(&self) -> bool { + false + } + + #[inline] + fn capacity(&self) -> Option { + self.capacity() + } + + #[inline] + fn reset(&mut self) { + self.reset(); + } +} + +#[cfg(test)] +mod tests { + use std::vec::Vec; + + use super::*; + + fn generic_basic_state_test( + store: &impl LostSegmentStore, + supports_large_file_size: bool, + capacity: Option, + ) { + assert_eq!(store.supports_large_file_size(), supports_large_file_size); + assert_eq!(store.len(), 0); + assert!(store.is_empty()); + assert_eq!(store.capacity(), capacity); + assert_eq!(store.iter().count(), 0); + } + + fn generic_error_tests(store: &mut impl LostSegmentStore) { + matches!( + store.add_lost_segment((0, 0)).unwrap_err(), + LostSegmentError::EmptySegment + ); + matches!( + store.add_lost_segment((10, 0)).unwrap_err(), + LostSegmentError::StartLargerThanEnd((10, 0)) + ); + matches!( + store.remove_lost_segment((0, 0)).unwrap_err(), + LostSegmentError::EmptySegment + ); + matches!( + store.remove_lost_segment((10, 0)).unwrap_err(), + LostSegmentError::StartLargerThanEnd((10, 0)) + ); + } + + fn generic_add_segments_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + assert_eq!(store.len(), 1); + for segment in store.iter() { + assert_eq!(segment, (0, 20)); + } + store.add_lost_segment((20, 40)).unwrap(); + let mut segments: Vec<(u64, u64)> = store.iter().collect(); + segments.sort_unstable(); + assert_eq!(segments.len(), 2); + assert_eq!(segments[0], (0, 20)); + assert_eq!(segments[1], (20, 40)); + } + + fn generic_reset_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + assert_eq!(store.len(), 1); + store.reset(); + assert_eq!(store.len(), 0); + assert!(store.is_empty()); + assert_eq!(store.iter().count(), 0); + } + + fn generic_removal_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + store.add_lost_segment((20, 40)).unwrap(); + assert_eq!(store.len(), 2); + assert!(store.remove_lost_segment((0, 20)).unwrap()); + assert_eq!(store.len(), 1); + assert!(!store.remove_lost_segment((0, 20)).unwrap()); + assert_eq!(store.len(), 1); + assert!(store.remove_lost_segment((20, 40)).unwrap()); + assert_eq!(store.len(), 0); + } + + fn generic_coalescing_simple_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + store.add_lost_segment((20, 40)).unwrap(); + store.add_lost_segment((40, 60)).unwrap(); + store.coalesce_lost_segments().unwrap(); + for segment in store.iter() { + assert_eq!(segment, (0, 60)); + } + assert_eq!(store.len(), 1); + } + + fn generic_coalescing_simple_with_gaps_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + store.add_lost_segment((20, 40)).unwrap(); + store.add_lost_segment((40, 60)).unwrap(); + + store.add_lost_segment((80, 100)).unwrap(); + store.add_lost_segment((110, 120)).unwrap(); + store.add_lost_segment((120, 130)).unwrap(); + store.coalesce_lost_segments().unwrap(); + let mut segments: Vec<(u64, u64)> = store.iter().collect(); + segments.sort_unstable(); + assert_eq!(segments.len(), 3); + assert_eq!(segments[0], (0, 60)); + assert_eq!(segments[1], (80, 100)); + assert_eq!(segments[2], (110, 130)); + } + + fn generic_coalescing_overlapping_simple_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + store.add_lost_segment((10, 30)).unwrap(); + store.coalesce_lost_segments().unwrap(); + for segment in store.iter() { + assert_eq!(segment, (0, 30)); + } + assert_eq!(store.len(), 1); + } + + fn generic_coalescing_overlapping_adjacent_test(store: &mut impl LostSegmentStore) { + store.add_lost_segment((0, 20)).unwrap(); + store.add_lost_segment((10, 30)).unwrap(); + store.add_lost_segment((20, 40)).unwrap(); + store.coalesce_lost_segments().unwrap(); + for segment in store.iter() { + assert_eq!(segment, (0, 40)); + } + assert_eq!(store.len(), 1); + } + + fn generic_useless_coalescing_test(store: &mut impl LostSegmentStore) { + // Is okay, does nothing. + assert!(store.coalesce_lost_segments().is_ok()); + assert_eq!(store.len(), 0); + assert!(store.is_empty()); + store.add_lost_segment((10, 20)).unwrap(); + // Is okay, does nothing. + assert!(store.coalesce_lost_segments().is_ok()); + for segment in store.iter() { + assert_eq!(segment, (10, 20)); + } + } + + #[test] + fn test_basic_map_state_map() { + let store = LostSegmentsMap::default(); + generic_basic_state_test(&store, true, None); + } + + #[test] + fn test_basic_errors_map() { + let mut store = LostSegmentsMap::default(); + generic_error_tests(&mut store); + } + + #[test] + fn test_add_segments_map() { + let mut store = LostSegmentsMap::default(); + generic_add_segments_test(&mut store); + } + + #[test] + fn test_reset_map() { + let mut store = LostSegmentsMap::default(); + generic_reset_test(&mut store); + } + + #[test] + fn test_removal_map() { + let mut store = LostSegmentsMap::default(); + generic_removal_test(&mut store); + } + + #[test] + fn test_cap_limit_map() { + let mut store = LostSegmentsMap::new(Some(4)); + for i in 0..4 { + store.add_lost_segment((i * 20, (i + 1) * 20)).unwrap(); + } + matches!( + store.add_lost_segment((80, 100)), + Err(LostSegmentError::StoreFull) + ); + } + + #[test] + fn test_basic_list_state_list() { + let store = LostSegmentsListSmall::<12>::default(); + generic_basic_state_test(&store, false, Some(12)); + let store = LostSegmentsListSmall::<12>::new(); + generic_basic_state_test(&store, false, Some(12)); + } + #[test] + fn test_basic_errors_list() { + let mut store = LostSegmentsListSmall::<12>::default(); + generic_error_tests(&mut store); + } + + #[test] + fn test_add_segments_list() { + let mut store = LostSegmentsListSmall::<12>::default(); + generic_add_segments_test(&mut store); + } + + #[test] + fn test_reset_list() { + let mut store = LostSegmentsListSmall::<12>::default(); + generic_reset_test(&mut store); + } + + #[test] + fn test_removal_list() { + let mut store = LostSegmentsListSmall::<12>::default(); + generic_removal_test(&mut store); + } + + #[test] + fn test_cap_limit_list() { + let mut store = LostSegmentsListSmall::<4>::default(); + for i in 0..4 { + store.add_lost_segment((i * 20, (i + 1) * 20)).unwrap(); + } + matches!( + store.add_lost_segment((80, 100)), + Err(LostSegmentError::StoreFull) + ); + } + + #[test] + fn test_large_file_size_unsupported() { + let mut store = LostSegmentsListSmall::<4>::default(); + matches!( + store.add_lost_segment((0, u32::MAX as u64 + 1)), + Err(LostSegmentError::LargeFileSegmentNotSupported) + ); + } + + #[test] + fn test_large_file_size_unsupported_2() { + let mut store = LostSegmentsListSmall::<4>::default(); + matches!( + store.remove_lost_segment((0, u32::MAX as u64 + 1)), + Err(LostSegmentError::LargeFileSegmentNotSupported) + ); + } + + #[test] + fn test_basic_list_state_list_large() { + let store = LostSegmentsList::<12, u64>::default(); + generic_basic_state_test(&store, true, Some(12)); + } + #[test] + fn test_basic_errors_list_large() { + let mut store = LostSegmentsList::<12, u64>::default(); + generic_error_tests(&mut store); + } + + #[test] + fn test_add_segments_list_large() { + let mut store = LostSegmentsList::<12, u64>::default(); + generic_add_segments_test(&mut store); + } + + #[test] + fn test_reset_list_large() { + let mut store = LostSegmentsList::<12, u64>::default(); + generic_reset_test(&mut store); + } + + #[test] + fn test_removal_list_large() { + let mut store = LostSegmentsList::<12, u64>::default(); + generic_removal_test(&mut store); + } + + #[test] + fn test_cap_limit_list_large() { + let mut store = LostSegmentsList::<4, u64>::default(); + for i in 0..4 { + store.add_lost_segment((i * 20, (i + 1) * 20)).unwrap(); + } + matches!( + store.add_lost_segment((80, 100)), + Err(LostSegmentError::StoreFull) + ); + } + + #[test] + fn test_coalescing_simple_in_map() { + let mut store = LostSegmentsMap::default(); + generic_coalescing_simple_test(&mut store); + } + + #[test] + fn test_useless_coalescing_map() { + let mut store = LostSegmentsMap::default(); + generic_useless_coalescing_test(&mut store); + } + + #[test] + fn test_useless_coalescing_list() { + let mut store = LostSegmentsListSmall::<4>::default(); + generic_useless_coalescing_test(&mut store); + } + + #[test] + fn test_coalescing_simple_in_list() { + let mut store = LostSegmentsListSmall::<4>::default(); + generic_coalescing_simple_test(&mut store); + } + + #[test] + fn test_coalescing_simple_in_list_large() { + let mut store = LostSegmentsList::<4, u64>::default(); + generic_coalescing_simple_test(&mut store); + } + + #[test] + fn test_coalescing_overlapping_simple_in_map() { + let mut store = LostSegmentsMap::default(); + generic_coalescing_overlapping_simple_test(&mut store); + } + + #[test] + fn test_coalescing_overlapping_simple_in_list() { + let mut store = LostSegmentsListSmall::<4>::default(); + generic_coalescing_overlapping_simple_test(&mut store); + } + + #[test] + fn test_coalescing_overlapping_simple_in_list_large() { + let mut store = LostSegmentsList::<4, u64>::default(); + generic_coalescing_overlapping_simple_test(&mut store); + } + + #[test] + fn test_coalescing_overlapping_adjacent_in_map() { + let mut store = LostSegmentsMap::default(); + generic_coalescing_overlapping_adjacent_test(&mut store); + } + + #[test] + fn test_coalescing_overlapping_adjacent_in_list() { + let mut store = LostSegmentsListSmall::<4>::default(); + generic_coalescing_overlapping_adjacent_test(&mut store); + } + + #[test] + fn test_coalescing_overlapping_adjacent_in_list_large() { + let mut store = LostSegmentsList::<4, u64>::default(); + generic_coalescing_overlapping_adjacent_test(&mut store); + } + + #[test] + fn test_coalescing_simple_with_gaps_in_map() { + let mut store = LostSegmentsMap::default(); + generic_coalescing_simple_with_gaps_test(&mut store); + } + + #[test] + fn test_coalescing_simple_with_gaps_in_list() { + let mut store = LostSegmentsListSmall::<8>::default(); + generic_coalescing_simple_with_gaps_test(&mut store); + } + + #[test] + fn test_coalescing_simple_with_gaps_in_list_large() { + let mut store = LostSegmentsList::<8, u64>::default(); + generic_coalescing_simple_with_gaps_test(&mut store); + } +} diff --git a/src/source.rs b/src/source.rs index 3eb46a7..1344971 100644 --- a/src/source.rs +++ b/src/source.rs @@ -27,7 +27,7 @@ //! 4. A Finished PDU will be awaited, for example one generated using //! [spacepackets::cfdp::pdu::finished::FinishedPduCreator]. //! -//! ### Acknowledged transfer (*not implemented yet*) +//! ### Acknowledged transfer //! //! 4. A EOF ACK packet will be awaited, for example one generated using //! [spacepackets::cfdp::pdu::ack::AckPdu].