continue acked dest handler

This commit is contained in:
Robin Mueller
2025-09-16 19:25:48 +02:00
parent 45bb153768
commit 0d56217826
8 changed files with 1034 additions and 670 deletions

View File

@@ -37,7 +37,7 @@ alloc = [
"hashbrown", "hashbrown",
"spacepackets/alloc" "spacepackets/alloc"
] ]
serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde"] serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde", "heapless/serde"]
defmt = ["dep:defmt", "spacepackets/defmt"] defmt = ["dep:defmt", "spacepackets/defmt"]
[dev-dependencies] [dev-dependencies]

View File

@@ -13,9 +13,10 @@ use std::{
use cfdp::{ use cfdp::{
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider, EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider, RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHook,
dest::DestinationHandler, dest::DestinationHandler,
filestore::NativeFilestore, filestore::NativeFilestore,
lost_segments::LostSegmentsList,
request::{PutRequestOwned, StaticPutRequestCacher}, request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler, source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
@@ -62,7 +63,7 @@ pub struct Cli {
#[derive(Default)] #[derive(Default)]
pub struct ExampleFaultHandler {} pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler { impl UserFaultHook for ExampleFaultHandler {
fn notice_of_suspension_cb( fn notice_of_suspension_cb(
&mut self, &mut self,
transaction_id: TransactionId, transaction_id: TransactionId,
@@ -261,7 +262,7 @@ impl UdpServer {
while let Ok(tm) = receiver.try_recv() { while let Ok(tm) = receiver.try_recv() {
debug!("Sending PDU: {:?}", tm); debug!("Sending PDU: {:?}", tm);
pdu_printout(&tm); pdu_printout(&tm);
let result = self.socket.send_to(tm.pdu(), self.remote_addr()); let result = self.socket.send_to(tm.raw_pdu(), self.remote_addr());
if let Err(e) = result { if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}") warn!("Sending TM with UDP socket failed: {e}")
} }
@@ -284,7 +285,7 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) {
spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (), spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => { spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
let meta_pdu = let meta_pdu =
MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed"); MetadataPduReader::new(pdu.raw_pdu()).expect("creating metadata pdu failed");
debug!("Metadata PDU: {:?}", meta_pdu) debug!("Metadata PDU: {:?}", meta_pdu)
} }
spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (), spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
@@ -292,7 +293,8 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) {
spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (), spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
}, },
spacepackets::cfdp::PduType::FileData => { spacepackets::cfdp::PduType::FileData => {
let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed"); let fd_pdu =
FileDataPdu::from_bytes(pdu.raw_pdu()).expect("creating file data pdu failed");
debug!("File data PDU: {:?}", fd_pdu); debug!("File data PDU: {:?}", fd_pdu);
} }
} }
@@ -367,6 +369,7 @@ fn main() {
NativeFilestore::default(), NativeFilestore::default(),
remote_cfg_python, remote_cfg_python,
StdTimerCreator::default(), StdTimerCreator::default(),
LostSegmentsList::default(),
); );
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);

File diff suppressed because it is too large Load Diff

View File

@@ -100,7 +100,7 @@ pub mod source;
pub mod time; pub mod time;
pub mod user; pub mod user;
use crate::time::CountdownProvider; use crate::time::Countdown;
use core::{cell::RefCell, fmt::Debug, hash::Hash}; use core::{cell::RefCell, fmt::Debug, hash::Hash};
use crc::{CRC_32_ISCSI, CRC_32_ISO_HDLC, Crc}; use crc::{CRC_32_ISCSI, CRC_32_ISO_HDLC, Crc};
@@ -181,8 +181,8 @@ pub enum TimerContext {
/// The timer will be used to perform the Positive Acknowledgement Procedures as specified in /// The timer will be used to perform the Positive Acknowledgement Procedures as specified in
/// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer /// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer
/// interval of the remote entity configuration. /// interval of the remote entity configuration.
pub trait TimerCreatorProvider { pub trait TimerCreator {
type Countdown: CountdownProvider; type Countdown: Countdown;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown; fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown;
} }
@@ -294,16 +294,23 @@ impl RemoteEntityConfig {
} }
} }
pub trait RemoteEntityConfigProvider { #[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum RemoteConfigStoreError {
#[error("store is full")]
Full,
}
pub trait RemoteConfigStore {
/// Retrieve the remote entity configuration for the given remote ID. /// Retrieve the remote entity configuration for the given remote ID.
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>; fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>; fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
/// Add a new remote configuration. Return [true] if the configuration was /// Add a new remote configuration. Return [true] if the configuration was
/// inserted successfully, and [false] if a configuration already exists. /// inserted successfully, and [false] if a configuration already exists.
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool; fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError>;
/// Remote a configuration. Returns [true] if the configuration was removed successfully,
/// and [false] if no configuration exists for the given remote ID.
fn remove_config(&mut self, remote_id: u64) -> bool;
} }
/// This is a thin wrapper around a [hashbrown::HashMap] to store remote entity configurations. /// This is a thin wrapper around a [hashbrown::HashMap] to store remote entity configurations.
@@ -311,20 +318,26 @@ pub trait RemoteEntityConfigProvider {
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
#[derive(Default, Debug)] #[derive(Default, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct StdRemoteEntityConfigProvider(pub hashbrown::HashMap<u64, RemoteEntityConfig>); pub struct RemoteConfigStoreStd(pub hashbrown::HashMap<u64, RemoteEntityConfig>);
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider { impl RemoteConfigStore for RemoteConfigStoreStd {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.0.get(&remote_id) self.0.get(&remote_id)
} }
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> { fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.0.get_mut(&remote_id) self.0.get_mut(&remote_id)
} }
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool {
self.0.insert(cfg.entity_id.value(), *cfg).is_some() fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
Ok(self.0.insert(cfg.entity_id.value(), *cfg).is_some())
} }
fn remove_config(&mut self, remote_id: u64) -> bool { }
#[cfg(feature = "std")]
impl RemoteConfigStoreStd {
pub fn remove_config(&mut self, remote_id: u64) -> bool {
self.0.remove(&remote_id).is_some() self.0.remove(&remote_id).is_some()
} }
} }
@@ -335,10 +348,10 @@ impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider {
#[derive(Default, Debug)] #[derive(Default, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))] #[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct VecRemoteEntityConfigProvider(pub alloc::vec::Vec<RemoteEntityConfig>); pub struct RemoteConfigList(pub alloc::vec::Vec<RemoteEntityConfig>);
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider { impl RemoteConfigStore for RemoteConfigList {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.0 self.0
.iter() .iter()
@@ -351,12 +364,19 @@ impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider {
.find(|cfg| cfg.entity_id.value() == remote_id) .find(|cfg| cfg.entity_id.value() == remote_id)
} }
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool { fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
self.0.push(*cfg); for other_cfg in self.0.iter() {
true if cfg.entity_id.value() == other_cfg.entity_id.value() {
return Ok(false);
} }
}
self.0.push(*cfg);
Ok(true)
}
}
fn remove_config(&mut self, remote_id: u64) -> bool { impl RemoteConfigList {
pub fn remove_config(&mut self, remote_id: u64) -> bool {
for (idx, cfg) in self.0.iter().enumerate() { for (idx, cfg) in self.0.iter().enumerate() {
if cfg.entity_id.value() == remote_id { if cfg.entity_id.value() == remote_id {
self.0.remove(idx); self.0.remove(idx);
@@ -367,10 +387,55 @@ impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider {
} }
} }
/// A remote entity configurations also implements the [RemoteEntityConfigProvider], but the /// This is a thin wrapper around a [alloc::vec::Vec] to store remote entity configurations.
/// [RemoteEntityConfigProvider::add_config] and [RemoteEntityConfigProvider::remove_config] /// It implements the full [RemoteEntityConfigProvider] trait.
/// are no-ops and always returns [false]. #[derive(Default, Debug)]
impl RemoteEntityConfigProvider for RemoteEntityConfig { #[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RemoteConfigListHeapless<const N: usize>(pub heapless::vec::Vec<RemoteEntityConfig, N>);
impl<const N: usize> RemoteConfigStore for RemoteConfigListHeapless<N> {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.0
.iter()
.find(|&cfg| cfg.entity_id.value() == remote_id)
}
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.0
.iter_mut()
.find(|cfg| cfg.entity_id.value() == remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
if self.0.is_full() {
return Err(RemoteConfigStoreError::Full);
}
for other_cfg in self.0.iter() {
if cfg.entity_id.value() == other_cfg.entity_id.value() {
return Ok(false);
}
}
self.0.push(*cfg).unwrap();
Ok(true)
}
}
impl<const N: usize> RemoteConfigListHeapless<N> {
pub fn remove_config(&mut self, remote_id: u64) -> bool {
for (idx, cfg) in self.0.iter().enumerate() {
if cfg.entity_id.value() == remote_id {
self.0.remove(idx);
return true;
}
}
false
}
}
/// A remote entity configurations also implements the [RemoteConfigStore], but the
/// [RemoteConfigStore::add_config] always returns [RemoteConfigStoreError::Full].
impl RemoteConfigStore for RemoteEntityConfig {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
if remote_id == self.entity_id.value() { if remote_id == self.entity_id.value() {
return Some(self); return Some(self);
@@ -385,12 +450,8 @@ impl RemoteEntityConfigProvider for RemoteEntityConfig {
None None
} }
fn add_config(&mut self, _cfg: &RemoteEntityConfig) -> bool { fn add_config(&mut self, _cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
false Err(RemoteConfigStoreError::Full)
}
fn remove_config(&mut self, _remote_id: u64) -> bool {
false
} }
} }
@@ -404,7 +465,7 @@ impl RemoteEntityConfigProvider for RemoteEntityConfig {
/// ///
/// For each error reported by the [FaultHandler], the appropriate fault handler callback /// For each error reported by the [FaultHandler], the appropriate fault handler callback
/// will be called depending on the [FaultHandlerCode]. /// will be called depending on the [FaultHandlerCode].
pub trait UserFaultHookProvider { pub trait UserFaultHook {
fn notice_of_suspension_cb( fn notice_of_suspension_cb(
&mut self, &mut self,
transaction_id: TransactionId, transaction_id: TransactionId,
@@ -429,7 +490,7 @@ pub trait UserFaultHookProvider {
#[derive(Default, Debug, PartialEq, Eq, Copy, Clone)] #[derive(Default, Debug, PartialEq, Eq, Copy, Clone)]
pub struct DummyFaultHook {} pub struct DummyFaultHook {}
impl UserFaultHookProvider for DummyFaultHook { impl UserFaultHook for DummyFaultHook {
fn notice_of_suspension_cb( fn notice_of_suspension_cb(
&mut self, &mut self,
_transaction_id: TransactionId, _transaction_id: TransactionId,
@@ -477,14 +538,14 @@ impl UserFaultHookProvider for DummyFaultHook {
/// These defaults can be overriden by using the [Self::set_fault_handler] method. /// These defaults can be overriden by using the [Self::set_fault_handler] method.
/// Please note that in any case, fault handler overrides can be specified by the sending CFDP /// Please note that in any case, fault handler overrides can be specified by the sending CFDP
/// entity. /// entity.
pub struct FaultHandler<UserHandler: UserFaultHookProvider> { pub struct FaultHandler<UserHandler: UserFaultHook> {
handler_array: [FaultHandlerCode; 10], handler_array: [FaultHandlerCode; 10],
// Could also change the user fault handler trait to have non mutable methods, but that limits // Could also change the user fault handler trait to have non mutable methods, but that limits
// flexbility on the user side.. // flexbility on the user side..
pub user_hook: RefCell<UserHandler>, pub user_hook: RefCell<UserHandler>,
} }
impl<UserHandler: UserFaultHookProvider> FaultHandler<UserHandler> { impl<UserHandler: UserFaultHook> FaultHandler<UserHandler> {
fn condition_code_to_array_index(conditon_code: ConditionCode) -> Option<usize> { fn condition_code_to_array_index(conditon_code: ConditionCode) -> Option<usize> {
Some(match conditon_code { Some(match conditon_code {
ConditionCode::PositiveAckLimitReached => 0, ConditionCode::PositiveAckLimitReached => 0,
@@ -590,17 +651,17 @@ impl Default for IndicationConfig {
} }
/// Each CFDP entity handler has a [LocalEntityConfig]uration. /// Each CFDP entity handler has a [LocalEntityConfig]uration.
pub struct LocalEntityConfig<UserFaultHook: UserFaultHookProvider> { pub struct LocalEntityConfig<UserFaultHookInstance: UserFaultHook> {
pub id: UnsignedByteField, pub id: UnsignedByteField,
pub indication_cfg: IndicationConfig, pub indication_cfg: IndicationConfig,
pub fault_handler: FaultHandler<UserFaultHook>, pub fault_handler: FaultHandler<UserFaultHookInstance>,
} }
impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> { impl<UserFaultHookInstance: UserFaultHook> LocalEntityConfig<UserFaultHookInstance> {
pub fn new( pub fn new(
id: UnsignedByteField, id: UnsignedByteField,
indication_cfg: IndicationConfig, indication_cfg: IndicationConfig,
hook: UserFaultHook, hook: UserFaultHookInstance,
) -> Self { ) -> Self {
Self { Self {
id, id,
@@ -610,12 +671,12 @@ impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> {
} }
} }
impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> { impl<UserFaultHookInstance: UserFaultHook> LocalEntityConfig<UserFaultHookInstance> {
pub fn user_fault_hook_mut(&mut self) -> &mut RefCell<UserFaultHook> { pub fn user_fault_hook_mut(&mut self) -> &mut RefCell<UserFaultHookInstance> {
&mut self.fault_handler.user_hook &mut self.fault_handler.user_hook
} }
pub fn user_fault_hook(&self) -> &RefCell<UserFaultHook> { pub fn user_fault_hook(&self) -> &RefCell<UserFaultHookInstance> {
&self.fault_handler.user_hook &self.fault_handler.user_hook
} }
} }
@@ -692,7 +753,7 @@ pub mod std_mod {
} }
} }
impl CountdownProvider for StdCountdown { impl Countdown for StdCountdown {
fn has_expired(&self) -> bool { fn has_expired(&self) -> bool {
if self.start_time.elapsed() > self.expiry_time { if self.start_time.elapsed() > self.expiry_time {
return true; return true;
@@ -723,7 +784,7 @@ pub mod std_mod {
} }
} }
impl TimerCreatorProvider for StdTimerCreator { impl TimerCreator for StdTimerCreator {
type Countdown = StdCountdown; type Countdown = StdCountdown;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown {
@@ -809,7 +870,7 @@ pub enum PacketTarget {
pub trait PduProvider { pub trait PduProvider {
fn pdu_type(&self) -> PduType; fn pdu_type(&self) -> PduType;
fn file_directive_type(&self) -> Option<FileDirectiveType>; fn file_directive_type(&self) -> Option<FileDirectiveType>;
fn pdu(&self) -> &[u8]; fn raw_pdu(&self) -> &[u8];
fn packet_target(&self) -> Result<PacketTarget, PduError>; fn packet_target(&self) -> Result<PacketTarget, PduError>;
} }
@@ -824,7 +885,7 @@ impl PduProvider for DummyPduProvider {
None None
} }
fn pdu(&self) -> &[u8] { fn raw_pdu(&self) -> &[u8] {
&[] &[]
} }
@@ -936,7 +997,7 @@ impl PduProvider for PduRawWithInfo<'_> {
self.file_directive_type self.file_directive_type
} }
fn pdu(&self) -> &[u8] { fn raw_pdu(&self) -> &[u8] {
self.raw_packet self.raw_packet
} }
@@ -998,7 +1059,7 @@ pub mod alloc_mod {
self.file_directive_type self.file_directive_type
} }
fn pdu(&self) -> &[u8] { fn raw_pdu(&self) -> &[u8] {
&self.pdu &self.pdu
} }
@@ -1009,8 +1070,8 @@ pub mod alloc_mod {
} }
#[derive(Debug)] #[derive(Debug)]
struct PositiveAckParams<Countdown: CountdownProvider> { struct PositiveAckParams<CountdownInstance: Countdown> {
ack_timer: Countdown, ack_timer: CountdownInstance,
ack_counter: u32, ack_counter: u32,
} }
@@ -1072,7 +1133,7 @@ pub(crate) mod tests {
expiry_control: TimerExpiryControl, expiry_control: TimerExpiryControl,
} }
impl CountdownProvider for TestCheckTimer { impl Countdown for TestCheckTimer {
fn has_expired(&self) -> bool { fn has_expired(&self) -> bool {
match self.context { match self.context {
TimerContext::CheckLimit { TimerContext::CheckLimit {
@@ -1132,7 +1193,7 @@ pub(crate) mod tests {
} }
} }
impl TimerCreatorProvider for TestCheckTimerCreator { impl TimerCreator for TestCheckTimerCreator {
type Countdown = TestCheckTimer; type Countdown = TestCheckTimer;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown {
@@ -1150,6 +1211,7 @@ pub(crate) mod tests {
} }
} }
#[derive(Debug)]
pub struct FileSegmentRecvdParamsNoSegMetadata { pub struct FileSegmentRecvdParamsNoSegMetadata {
#[allow(dead_code)] #[allow(dead_code)]
pub id: TransactionId, pub id: TransactionId,
@@ -1157,8 +1219,9 @@ pub(crate) mod tests {
pub length: usize, pub length: usize,
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct TestCfdpUser { pub struct TestCfdpUser {
pub check_queues_empty_on_drop: bool,
pub next_expected_seq_num: u64, pub next_expected_seq_num: u64,
pub expected_full_src_name: String, pub expected_full_src_name: String,
pub expected_full_dest_name: String, pub expected_full_dest_name: String,
@@ -1179,6 +1242,7 @@ pub(crate) mod tests {
expected_file_size: u64, expected_file_size: u64,
) -> Self { ) -> Self {
Self { Self {
check_queues_empty_on_drop: true,
next_expected_seq_num, next_expected_seq_num,
expected_full_src_name, expected_full_src_name,
expected_full_dest_name, expected_full_dest_name,
@@ -1196,6 +1260,12 @@ pub(crate) mod tests {
assert_eq!(id.source_id, LOCAL_ID.into()); assert_eq!(id.source_id, LOCAL_ID.into());
assert_eq!(id.seq_num().value(), self.next_expected_seq_num); assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
} }
pub fn indication_queues_empty(&self) -> bool {
self.finished_indic_queue.is_empty()
&& self.metadata_recv_queue.is_empty()
&& self.file_seg_recvd_queue.is_empty()
}
} }
impl CfdpUser for TestCfdpUser { impl CfdpUser for TestCfdpUser {
@@ -1285,6 +1355,20 @@ pub(crate) mod tests {
} }
} }
impl Drop for TestCfdpUser {
fn drop(&mut self) {
if self.check_queues_empty_on_drop {
assert!(
self.indication_queues_empty(),
"indication queues not empty on drop: finished: {}, metadata: {}, file seg: {}",
self.finished_indic_queue.len(),
self.metadata_recv_queue.len(),
self.file_seg_recvd_queue.len()
);
}
}
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub(crate) struct TestFaultHandler { pub(crate) struct TestFaultHandler {
pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
@@ -1293,7 +1377,7 @@ pub(crate) mod tests {
pub ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>, pub ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
} }
impl UserFaultHookProvider for TestFaultHandler { impl UserFaultHook for TestFaultHandler {
fn notice_of_suspension_cb( fn notice_of_suspension_cb(
&mut self, &mut self,
transaction_id: TransactionId, transaction_id: TransactionId,
@@ -1398,8 +1482,8 @@ pub(crate) mod tests {
dest_id: impl Into<UnsignedByteField>, dest_id: impl Into<UnsignedByteField>,
max_packet_len: usize, max_packet_len: usize,
crc_on_transmission_by_default: bool, crc_on_transmission_by_default: bool,
) -> StdRemoteEntityConfigProvider { ) -> RemoteConfigStoreStd {
let mut table = StdRemoteEntityConfigProvider::default(); let mut table = RemoteConfigStoreStd::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
dest_id.into(), dest_id.into(),
max_packet_len, max_packet_len,
@@ -1408,7 +1492,7 @@ pub(crate) mod tests {
TransmissionMode::Unacknowledged, TransmissionMode::Unacknowledged,
ChecksumType::Crc32, ChecksumType::Crc32,
); );
table.add_config(&remote_entity_cfg); table.add_config(&remote_entity_cfg).unwrap();
table table
} }
@@ -1549,9 +1633,10 @@ pub(crate) mod tests {
TransmissionMode::Unacknowledged, TransmissionMode::Unacknowledged,
ChecksumType::Crc32, ChecksumType::Crc32,
); );
assert!(!remote_entity_cfg.add_config(&dummy)); assert_eq!(
// Removal is no-op. remote_entity_cfg.add_config(&dummy).unwrap_err(),
assert!(!remote_entity_cfg.remove_config(REMOTE_ID.value())); RemoteConfigStoreError::Full
);
let remote_entity_retrieved = remote_entity_cfg.get(REMOTE_ID.value()).unwrap(); let remote_entity_retrieved = remote_entity_cfg.get(REMOTE_ID.value()).unwrap();
assert_eq!(remote_entity_retrieved.entity_id, REMOTE_ID.into()); assert_eq!(remote_entity_retrieved.entity_id, REMOTE_ID.into());
// Does not exist. // Does not exist.
@@ -1569,9 +1654,9 @@ pub(crate) mod tests {
TransmissionMode::Unacknowledged, TransmissionMode::Unacknowledged,
ChecksumType::Crc32, ChecksumType::Crc32,
); );
let mut remote_cfg_provider = StdRemoteEntityConfigProvider::default(); let mut remote_cfg_provider = RemoteConfigStoreStd::default();
assert!(remote_cfg_provider.0.is_empty()); assert!(remote_cfg_provider.0.is_empty());
remote_cfg_provider.add_config(&remote_entity_cfg); remote_cfg_provider.add_config(&remote_entity_cfg).unwrap();
assert_eq!(remote_cfg_provider.0.len(), 1); assert_eq!(remote_cfg_provider.0.len(), 1);
let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values( let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(), LOCAL_ID.into(),
@@ -1583,7 +1668,9 @@ pub(crate) mod tests {
); );
let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap(); let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap();
assert_eq!(cfg_0.entity_id, REMOTE_ID.into()); assert_eq!(cfg_0.entity_id, REMOTE_ID.into());
remote_cfg_provider.add_config(&remote_entity_cfg_2); remote_cfg_provider
.add_config(&remote_entity_cfg_2)
.unwrap();
assert_eq!(remote_cfg_provider.0.len(), 2); assert_eq!(remote_cfg_provider.0.len(), 2);
let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap(); let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap();
assert_eq!(cfg_1.entity_id, LOCAL_ID.into()); assert_eq!(cfg_1.entity_id, LOCAL_ID.into());
@@ -1597,7 +1684,7 @@ pub(crate) mod tests {
#[test] #[test]
fn test_remote_cfg_provider_vector() { fn test_remote_cfg_provider_vector() {
let mut remote_cfg_provider = VecRemoteEntityConfigProvider::default(); let mut remote_cfg_provider = RemoteConfigList::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values( let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(), REMOTE_ID.into(),
1024, 1024,
@@ -1607,7 +1694,7 @@ pub(crate) mod tests {
ChecksumType::Crc32, ChecksumType::Crc32,
); );
assert!(remote_cfg_provider.0.is_empty()); assert!(remote_cfg_provider.0.is_empty());
remote_cfg_provider.add_config(&remote_entity_cfg); remote_cfg_provider.add_config(&remote_entity_cfg).unwrap();
assert_eq!(remote_cfg_provider.0.len(), 1); assert_eq!(remote_cfg_provider.0.len(), 1);
let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values( let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(), LOCAL_ID.into(),
@@ -1619,7 +1706,11 @@ pub(crate) mod tests {
); );
let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap(); let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap();
assert_eq!(cfg_0.entity_id, REMOTE_ID.into()); assert_eq!(cfg_0.entity_id, REMOTE_ID.into());
remote_cfg_provider.add_config(&remote_entity_cfg_2); assert!(
remote_cfg_provider
.add_config(&remote_entity_cfg_2)
.unwrap()
);
assert_eq!(remote_cfg_provider.0.len(), 2); assert_eq!(remote_cfg_provider.0.len(), 2);
let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap(); let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap();
assert_eq!(cfg_1.entity_id, LOCAL_ID.into()); assert_eq!(cfg_1.entity_id, LOCAL_ID.into());
@@ -1649,7 +1740,7 @@ pub(crate) mod tests {
let dummy_pdu_provider = DummyPduProvider(()); let dummy_pdu_provider = DummyPduProvider(());
assert_eq!(dummy_pdu_provider.pdu_type(), PduType::FileData); assert_eq!(dummy_pdu_provider.pdu_type(), PduType::FileData);
assert!(dummy_pdu_provider.file_directive_type().is_none()); assert!(dummy_pdu_provider.file_directive_type().is_none());
assert_eq!(dummy_pdu_provider.pdu(), &[]); assert_eq!(dummy_pdu_provider.raw_pdu(), &[]);
assert_eq!( assert_eq!(
dummy_pdu_provider.packet_target(), dummy_pdu_provider.packet_target(),
Ok(PacketTarget::SourceEntity) Ok(PacketTarget::SourceEntity)

View File

@@ -4,9 +4,9 @@
//! //!
//! The two concrete implementations provided are: //! The two concrete implementations provided are:
//! //!
//! * [LostSegmentsMap]: A hash set based implementation which can grow dynamically andcan //! * [LostSegmentsList]: A hash set based implementation which can grow dynamically andcan
//! optionally be bounded. Suitable for systems where dynamic allocation is allowed. //! optionally be bounded. Suitable for systems where dynamic allocation is allowed.
//! * [LostSegmentsList]: A fixed-size list based implementation where the size //! * [LostSegmentsListHeapless]: A fixed-size list based implementation where the size
//! of the lost segment list is statically known at compile-time. Suitable for resource //! of the lost segment list is statically known at compile-time. Suitable for resource
//! constrained devices where dyanamic allocation is not allowed or possible. //! constrained devices where dyanamic allocation is not allowed or possible.
#[derive(Debug, PartialEq, Eq, thiserror::Error)] #[derive(Debug, PartialEq, Eq, thiserror::Error)]

View File

@@ -67,13 +67,13 @@ use spacepackets::{
use spacepackets::seq_count::SequenceCounter; use spacepackets::seq_count::SequenceCounter;
use crate::{ use crate::{
DummyPduProvider, EntityType, GenericSendError, PduProvider, PositiveAckParams, DummyPduProvider, EntityType, GenericSendError, PduProvider, PositiveAckParams, TimerCreator,
TimerCreatorProvider, time::CountdownProvider, time::Countdown,
}; };
use super::{ use super::{
LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig, LocalEntityConfig, PacketTarget, PduSendProvider, RemoteConfigStore, RemoteEntityConfig, State,
RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, TransactionId, UserFaultHook,
filestore::{FilestoreError, VirtualFilestore}, filestore::{FilestoreError, VirtualFilestore},
request::{ReadablePutRequest, StaticPutRequestCacher}, request::{ReadablePutRequest, StaticPutRequestCacher},
user::{CfdpUser, TransactionFinishedParams}, user::{CfdpUser, TransactionFinishedParams},
@@ -244,18 +244,18 @@ pub enum FsmContext {
/// thread pool, or move the newly created handler to a new thread. /// thread pool, or move the newly created handler to a new thread.
pub struct SourceHandler< pub struct SourceHandler<
PduSender: PduSendProvider, PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider, UserFaultHookInstance: UserFaultHook,
Vfs: VirtualFilestore, Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider, RemoteConfigStoreInstance: RemoteConfigStore,
TimerCreator: TimerCreatorProvider<Countdown = Countdown>, TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
Countdown: CountdownProvider, CountdownInstance: Countdown,
SequenceCounterInstance: SequenceCounter, SequenceCounterInstance: SequenceCounter,
> { > {
local_cfg: LocalEntityConfig<UserFaultHook>, local_cfg: LocalEntityConfig<UserFaultHookInstance>,
pdu_sender: PduSender, pdu_sender: PduSender,
pdu_and_cksum_buffer: RefCell<alloc::vec::Vec<u8>>, pdu_and_cksum_buffer: RefCell<alloc::vec::Vec<u8>>,
put_request_cacher: StaticPutRequestCacher, put_request_cacher: StaticPutRequestCacher,
remote_cfg_table: RemoteCfgTable, remote_cfg_table: RemoteConfigStoreInstance,
vfs: Vfs, vfs: Vfs,
state_helper: StateHelper, state_helper: StateHelper,
// Transfer related state information // Transfer related state information
@@ -264,29 +264,29 @@ pub struct SourceHandler<
file_params: FileParams, file_params: FileParams,
// PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers.
pdu_conf: CommonPduConfig, pdu_conf: CommonPduConfig,
check_timer: RefCell<Option<Countdown>>, check_timer: RefCell<Option<CountdownInstance>>,
positive_ack_params: RefCell<Option<PositiveAckParams<Countdown>>>, positive_ack_params: RefCell<Option<PositiveAckParams<CountdownInstance>>>,
timer_creator: TimerCreator, timer_creator: TimerCreatorInstance,
seq_count_provider: SequenceCounterInstance, seq_count_provider: SequenceCounterInstance,
anomalies: AnomalyTracker, anomalies: AnomalyTracker,
} }
impl< impl<
PduSender: PduSendProvider, PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider, UserFaultHookInstance: UserFaultHook,
Vfs: VirtualFilestore, Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider, RemoteConfigStoreInstance: RemoteConfigStore,
TimerCreator: TimerCreatorProvider<Countdown = Countdown>, TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
Countdown: CountdownProvider, CountdownInstance: Countdown,
SequenceCounterInstance: SequenceCounter, SequenceCounterInstance: SequenceCounter,
> >
SourceHandler< SourceHandler<
PduSender, PduSender,
UserFaultHook, UserFaultHookInstance,
Vfs, Vfs,
RemoteCfgTable, RemoteConfigStoreInstance,
TimerCreator, TimerCreatorInstance,
Countdown, CountdownInstance,
SequenceCounterInstance, SequenceCounterInstance,
> >
{ {
@@ -314,13 +314,13 @@ impl<
/// which contains an incrementing counter. /// which contains an incrementing counter.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
cfg: LocalEntityConfig<UserFaultHook>, cfg: LocalEntityConfig<UserFaultHookInstance>,
pdu_sender: PduSender, pdu_sender: PduSender,
vfs: Vfs, vfs: Vfs,
put_request_cacher: StaticPutRequestCacher, put_request_cacher: StaticPutRequestCacher,
pdu_and_cksum_buf_size: usize, pdu_and_cksum_buf_size: usize,
remote_cfg_table: RemoteCfgTable, remote_cfg_table: RemoteConfigStoreInstance,
timer_creator: TimerCreator, timer_creator: TimerCreatorInstance,
seq_count_provider: SequenceCounterInstance, seq_count_provider: SequenceCounterInstance,
) -> Self { ) -> Self {
Self { Self {
@@ -409,7 +409,7 @@ impl<
} }
#[inline] #[inline]
pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHook> { pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHookInstance> {
&self.local_cfg &self.local_cfg
} }
@@ -544,16 +544,16 @@ impl<
.expect("PDU directive type unexpectedly not set") .expect("PDU directive type unexpectedly not set")
{ {
FileDirectiveType::FinishedPdu => { FileDirectiveType::FinishedPdu => {
let finished_pdu = FinishedPduReader::new(packet_to_insert.pdu())?; let finished_pdu = FinishedPduReader::new(packet_to_insert.raw_pdu())?;
self.handle_finished_pdu(&finished_pdu)? self.handle_finished_pdu(&finished_pdu)?
} }
FileDirectiveType::NakPdu => { FileDirectiveType::NakPdu => {
let nak_pdu = NakPduReader::new(packet_to_insert.pdu())?; let nak_pdu = NakPduReader::new(packet_to_insert.raw_pdu())?;
sent_packets += self.handle_nak_pdu(&nak_pdu)?; sent_packets += self.handle_nak_pdu(&nak_pdu)?;
} }
FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
FileDirectiveType::AckPdu => { FileDirectiveType::AckPdu => {
let ack_pdu = AckPdu::from_bytes(packet_to_insert.pdu())?; let ack_pdu = AckPdu::from_bytes(packet_to_insert.raw_pdu())?;
self.handle_ack_pdu(&ack_pdu)? self.handle_ack_pdu(&ack_pdu)?
} }
FileDirectiveType::EofPdu FileDirectiveType::EofPdu
@@ -1196,7 +1196,7 @@ mod tests {
use super::*; use super::*;
use crate::{ use crate::{
CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd,
filestore::NativeFilestore, filestore::NativeFilestore,
request::PutRequestOwned, request::PutRequestOwned,
source::TransactionStep, source::TransactionStep,
@@ -1222,7 +1222,7 @@ mod tests {
TestCfdpSender, TestCfdpSender,
TestFaultHandler, TestFaultHandler,
NativeFilestore, NativeFilestore,
StdRemoteEntityConfigProvider, RemoteConfigStoreStd,
TestCheckTimerCreator, TestCheckTimerCreator,
TestCheckTimer, TestCheckTimer,
SequenceCounterSimple<u16>, SequenceCounterSimple<u16>,
@@ -1686,18 +1686,15 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transaction_info = tb.common_file_transfer_init_with_metadata_check( let transaction_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request,
file_size,
);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(
&mut cfdp_user, &mut user,
transaction_info.closure_requested, transaction_info.closure_requested,
EofParams::new_success(file_size, CRC_32.digest().finalize()), EofParams::new_success(file_size, CRC_32.digest().finalize()),
1, 1,
) );
} }
#[test] #[test]
@@ -1712,53 +1709,50 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transaction_info = tb.common_file_transfer_init_with_metadata_check( let transaction_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request,
file_size,
);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(
&mut cfdp_user, &mut user,
transaction_info.closure_requested, transaction_info.closure_requested,
EofParams::new_success(file_size, CRC_32.digest().finalize()), EofParams::new_success(file_size, CRC_32.digest().finalize()),
1, 1,
); );
tb.acknowledge_eof_pdu(&mut cfdp_user, &transaction_info); tb.acknowledge_eof_pdu(&mut user, &transaction_info);
tb.finish_handling(&mut cfdp_user, &transaction_info); tb.finish_handling(&mut user, &transaction_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
#[test] #[test]
fn test_tiny_file_transfer_not_acked_no_closure() { fn test_tiny_file_transfer_not_acked_no_closure() {
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
tb.common_tiny_file_transfer(&mut cfdp_user, false); tb.common_tiny_file_transfer(&mut user, false);
} }
#[test] #[test]
fn test_tiny_file_transfer_acked() { fn test_tiny_file_transfer_acked() {
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, false); let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, false);
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
#[test] #[test]
fn test_tiny_file_transfer_not_acked_with_closure() { fn test_tiny_file_transfer_not_acked_with_closure() {
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true);
tb.finish_handling(&mut cfdp_user, &transfer_info) tb.finish_handling(&mut user, &transfer_info)
} }
#[test] #[test]
fn test_two_segment_file_transfer_not_acked_no_closure() { fn test_two_segment_file_transfer_not_acked_no_closure() {
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.write(true) .write(true)
.open(&tb.srcfile) .open(&tb.srcfile)
@@ -1768,14 +1762,14 @@ mod tests {
file.write_all(&rand_data) file.write_all(&rand_data)
.expect("writing file content failed"); .expect("writing file content failed");
drop(file); drop(file);
let (_, fd_pdus) = tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec()); let (_, fd_pdus) = tb.generic_file_transfer(&mut user, false, rand_data.to_vec());
assert_eq!(fd_pdus, 2); assert_eq!(fd_pdus, 2);
} }
#[test] #[test]
fn test_two_segment_file_transfer_not_acked_with_closure() { fn test_two_segment_file_transfer_not_acked_with_closure() {
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.write(true) .write(true)
.open(&tb.srcfile) .open(&tb.srcfile)
@@ -1786,14 +1780,14 @@ mod tests {
.expect("writing file content failed"); .expect("writing file content failed");
drop(file); drop(file);
let (transfer_info, fd_pdus) = let (transfer_info, fd_pdus) =
tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec()); tb.generic_file_transfer(&mut user, true, rand_data.to_vec());
assert_eq!(fd_pdus, 2); assert_eq!(fd_pdus, 2);
tb.finish_handling(&mut cfdp_user, &transfer_info) tb.finish_handling(&mut user, &transfer_info)
} }
#[test] #[test]
fn test_two_segment_file_transfer_acked() { fn test_two_segment_file_transfer_acked() {
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128);
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.write(true) .write(true)
@@ -1805,10 +1799,10 @@ mod tests {
.expect("writing file content failed"); .expect("writing file content failed");
drop(file); drop(file);
let (transfer_info, fd_pdus) = let (transfer_info, fd_pdus) =
tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec()); tb.generic_file_transfer(&mut user, true, rand_data.to_vec());
assert_eq!(fd_pdus, 2); assert_eq!(fd_pdus, 2);
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
@@ -1824,19 +1818,16 @@ mod tests {
Some(true), Some(true),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transaction_info = tb.common_file_transfer_init_with_metadata_check( let transaction_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request,
file_size,
);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(
&mut cfdp_user, &mut user,
transaction_info.closure_requested, transaction_info.closure_requested,
EofParams::new_success(file_size, CRC_32.digest().finalize()), EofParams::new_success(file_size, CRC_32.digest().finalize()),
1, 1,
); );
tb.finish_handling(&mut cfdp_user, &transaction_info) tb.finish_handling(&mut user, &transaction_info)
} }
#[test] #[test]
@@ -1898,15 +1889,12 @@ mod tests {
Some(true), Some(true),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transaction_info = tb.common_file_transfer_init_with_metadata_check( let transaction_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request,
file_size,
);
let expected_id = tb.handler.transaction_id().unwrap(); let expected_id = tb.handler.transaction_id().unwrap();
tb.common_eof_pdu_check( tb.common_eof_pdu_check(
&mut cfdp_user, &mut user,
transaction_info.closure_requested, transaction_info.closure_requested,
EofParams::new_success(file_size, CRC_32.digest().finalize()), EofParams::new_success(file_size, CRC_32.digest().finalize()),
1, 1,
@@ -1917,10 +1905,7 @@ mod tests {
// cancellation -> leads to an EOF PDU with the appropriate error code. // cancellation -> leads to an EOF PDU with the appropriate error code.
tb.expiry_control.set_check_limit_expired(); tb.expiry_control.set_check_limit_expired();
assert_eq!( assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1);
tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(),
1
);
assert!(!tb.pdu_queue_empty()); assert!(!tb.pdu_queue_empty());
let next_pdu = tb.get_next_sent_pdu().unwrap(); let next_pdu = tb.get_next_sent_pdu().unwrap();
let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format"); let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
@@ -1953,9 +1938,9 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize); let mut user = tb.create_user(0, filesize);
assert_eq!(cfdp_user.transaction_indication_call_count, 0); assert_eq!(user.transaction_indication_call_count, 0);
assert_eq!(cfdp_user.eof_sent_call_count, 0); assert_eq!(user.eof_sent_call_count, 0);
tb.put_request(&put_request) tb.put_request(&put_request)
.expect("put_request call failed"); .expect("put_request call failed");
@@ -1964,7 +1949,7 @@ mod tests {
assert!(tb.get_next_sent_pdu().is_none()); assert!(tb.get_next_sent_pdu().is_none());
let id = tb.handler.transaction_id().unwrap(); let id = tb.handler.transaction_id().unwrap();
tb.handler tb.handler
.cancel_request(&mut cfdp_user, &id) .cancel_request(&mut user, &id)
.expect("transaction cancellation failed"); .expect("transaction cancellation failed");
assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.state(), State::Idle);
assert_eq!(tb.handler.step(), TransactionStep::Idle); assert_eq!(tb.handler.step(), TransactionStep::Idle);
@@ -2007,12 +1992,9 @@ mod tests {
) )
.expect("creating put request failed"); .expect("creating put request failed");
let file_size = rand_data.len() as u64; let file_size = rand_data.len() as u64;
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transaction_info = tb.common_file_transfer_init_with_metadata_check( let transaction_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request,
file_size,
);
let mut chunks = rand_data.chunks( let mut chunks = rand_data.chunks(
calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&transaction_info.pdu_header, &transaction_info.pdu_header,
@@ -2031,7 +2013,7 @@ mod tests {
let expected_id = tb.handler.transaction_id().unwrap(); let expected_id = tb.handler.transaction_id().unwrap();
assert!( assert!(
tb.handler tb.handler
.cancel_request(&mut cfdp_user, &expected_id) .cancel_request(&mut user, &expected_id)
.expect("cancellation failed") .expect("cancellation failed")
); );
assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.state(), State::Idle);
@@ -2070,18 +2052,10 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transfer_info = tb.common_file_transfer_init_with_metadata_check( let transfer_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request, tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
file_size,
);
tb.common_eof_pdu_check(
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
1,
);
assert!(tb.pdu_queue_empty()); assert!(tb.pdu_queue_empty());
@@ -2089,18 +2063,13 @@ mod tests {
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
2,
);
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
@@ -2117,18 +2086,10 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transfer_info = tb.common_file_transfer_init_with_metadata_check( let transfer_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request, tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
file_size,
);
tb.common_eof_pdu_check(
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
1,
);
assert!(tb.pdu_queue_empty()); assert!(tb.pdu_queue_empty());
@@ -2136,33 +2097,23 @@ mod tests {
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
2,
);
// Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU.
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
eof_params.condition_code = ConditionCode::PositiveAckLimitReached; eof_params.condition_code = ConditionCode::PositiveAckLimitReached;
tb.common_eof_pdu_check( tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3);
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
3,
);
// This boilerplate handling is still expected. In a real-life use-case I would expect // This boilerplate handling is still expected. In a real-life use-case I would expect
// this to fail as well, leading to a transaction abandonment. This is tested separately. // this to fail as well, leading to a transaction abandonment. This is tested separately.
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
@@ -2179,18 +2130,10 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transfer_info = tb.common_file_transfer_init_with_metadata_check( let transfer_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request, tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
file_size,
);
tb.common_eof_pdu_check(
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
1,
);
assert!(tb.pdu_queue_empty()); assert!(tb.pdu_queue_empty());
@@ -2198,29 +2141,19 @@ mod tests {
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
2,
);
// Enforce a postive ack timer expiry -> positive ACK limit reached -> Cancel EOF sent. // Enforce a postive ack timer expiry -> positive ACK limit reached -> Cancel EOF sent.
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
eof_params.condition_code = ConditionCode::PositiveAckLimitReached; eof_params.condition_code = ConditionCode::PositiveAckLimitReached;
tb.common_eof_pdu_check( tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3);
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
3,
);
// Cancellation fault should have been triggered. // Cancellation fault should have been triggered.
let fault_handler = tb.test_fault_handler_mut(); let fault_handler = tb.test_fault_handler_mut();
let fh_ref_mut = fault_handler.get_mut(); let fh_ref_mut = fault_handler.get_mut();
@@ -2236,22 +2169,17 @@ mod tests {
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 4);
&mut cfdp_user,
transfer_info.closure_requested,
eof_params,
4,
);
// Enforce a postive ack timer expiry -> positive ACK limit reached -> Transaction // Enforce a postive ack timer expiry -> positive ACK limit reached -> Transaction
// abandoned // abandoned
tb.expiry_control.set_positive_ack_expired(); tb.expiry_control.set_positive_ack_expired();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine_no_packet(&mut cfdp_user) .state_machine_no_packet(&mut user)
.expect("source handler FSM failure"); .expect("source handler FSM failure");
assert_eq!(sent_packets, 0); assert_eq!(sent_packets, 0);
// Abandonment fault should have been triggered. // Abandonment fault should have been triggered.
@@ -2269,21 +2197,21 @@ mod tests {
#[test] #[test]
fn test_nak_for_whole_file() { fn test_nak_for_whole_file() {
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let (data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); let (data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true);
let seg_reqs = &[(0, transfer_info.file_size as u32)]; let seg_reqs = &[(0, transfer_info.file_size as u32)];
tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, seg_reqs); tb.nak_for_file_segments(&mut user, &transfer_info, seg_reqs);
tb.check_next_file_pdu(0, data.as_bytes()); tb.check_next_file_pdu(0, data.as_bytes());
tb.all_fault_queues_empty(); tb.all_fault_queues_empty();
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
#[test] #[test]
fn test_nak_for_file_segment() { fn test_nak_for_file_segment() {
let mut cfdp_user = TestCfdpUser::default(); let mut user = TestCfdpUser::default();
let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128);
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.write(true) .write(true)
@@ -2295,14 +2223,14 @@ mod tests {
.expect("writing file content failed"); .expect("writing file content failed");
drop(file); drop(file);
let (transfer_info, fd_pdus) = let (transfer_info, fd_pdus) =
tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec()); tb.generic_file_transfer(&mut user, false, rand_data.to_vec());
assert_eq!(fd_pdus, 2); assert_eq!(fd_pdus, 2);
tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, &[(0, 90)]); tb.nak_for_file_segments(&mut user, &transfer_info, &[(0, 90)]);
tb.check_next_file_pdu(0, &rand_data[0..90]); tb.check_next_file_pdu(0, &rand_data[0..90]);
tb.all_fault_queues_empty(); tb.all_fault_queues_empty();
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
@@ -2318,14 +2246,11 @@ mod tests {
Some(false), Some(false),
) )
.expect("creating put request failed"); .expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, file_size); let mut user = tb.create_user(0, file_size);
let transfer_info = tb.common_file_transfer_init_with_metadata_check( let transfer_info =
&mut cfdp_user, tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
put_request,
file_size,
);
tb.common_eof_pdu_check( tb.common_eof_pdu_check(
&mut cfdp_user, &mut user,
transfer_info.closure_requested, transfer_info.closure_requested,
EofParams::new_success(file_size, CRC_32.digest().finalize()), EofParams::new_success(file_size, CRC_32.digest().finalize()),
1, 1,
@@ -2343,7 +2268,7 @@ mod tests {
let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap(); let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap();
let sent_packets = tb let sent_packets = tb
.handler .handler
.state_machine(&mut cfdp_user, Some(&packet_info)) .state_machine(&mut user, Some(&packet_info))
.unwrap(); .unwrap();
assert_eq!(sent_packets, 1); assert_eq!(sent_packets, 1);
let next_pdu = tb.get_next_sent_pdu().unwrap(); let next_pdu = tb.get_next_sent_pdu().unwrap();
@@ -2351,8 +2276,8 @@ mod tests {
tb.metadata_check(&next_pdu, file_size); tb.metadata_check(&next_pdu, file_size);
tb.all_fault_queues_empty(); tb.all_fault_queues_empty();
tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); tb.acknowledge_eof_pdu(&mut user, &transfer_info);
tb.finish_handling(&mut cfdp_user, &transfer_info); tb.finish_handling(&mut user, &transfer_info);
tb.common_finished_pdu_ack_check(); tb.common_finished_pdu_ack_check();
} }
} }

View File

@@ -1,7 +1,7 @@
use core::fmt::Debug; use core::fmt::Debug;
/// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone. /// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone.
pub trait CountdownProvider: Debug { pub trait Countdown: Debug {
fn has_expired(&self) -> bool; fn has_expired(&self) -> bool;
fn reset(&mut self); fn reset(&mut self);
} }

View File

@@ -13,9 +13,10 @@ use std::{
use cfdp::{ use cfdp::{
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig, EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
StdTimerCreator, TransactionId, UserFaultHookProvider, StdTimerCreator, TransactionId, UserFaultHook,
dest::DestinationHandler, dest::DestinationHandler,
filestore::NativeFilestore, filestore::NativeFilestore,
lost_segments::LostSegmentsList,
request::{PutRequestOwned, StaticPutRequestCacher}, request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler, source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
@@ -33,7 +34,7 @@ const FILE_DATA: &str = "Hello World!";
#[derive(Default)] #[derive(Default)]
pub struct ExampleFaultHandler {} pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler { impl UserFaultHook for ExampleFaultHandler {
fn notice_of_suspension_cb( fn notice_of_suspension_cb(
&mut self, &mut self,
transaction_id: TransactionId, transaction_id: TransactionId,
@@ -230,6 +231,7 @@ fn end_to_end_test(with_closure: bool) {
NativeFilestore::default(), NativeFilestore::default(),
remote_cfg_of_source, remote_cfg_of_source,
StdTimerCreator::default(), StdTimerCreator::default(),
LostSegmentsList::default(),
); );
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);