From ada26f626e18d9f8e84e7c5b72cda7d83ea8852b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 14 Aug 2025 16:29:40 +0200 Subject: [PATCH] Add acknowledged source handler --- .github/workflows/ci.yml | 4 +- Cargo.toml | 6 +- README.md | 3 +- coverage.py => automation/coverage.py | 0 docs.sh | 3 - examples/python-interop/main.rs | 27 +- justfile | 34 + src/dest.rs | 142 +-- src/filestore.rs | 10 +- src/lib.rs | 204 ++- src/request.rs | 6 +- src/source.rs | 1679 ++++++++++++++++--------- src/time.rs | 2 +- src/user.rs | 2 +- tests/end-to-end.rs | 13 +- 15 files changed, 1404 insertions(+), 731 deletions(-) rename coverage.py => automation/coverage.py (100%) delete mode 100755 docs.sh create mode 100644 justfile diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dec7e0b..0e5f31a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@1.81.0 + - uses: dtolnay/rust-toolchain@1.85.0 - run: cargo check --release cross-check: @@ -45,7 +45,7 @@ jobs: - uses: dtolnay/rust-toolchain@stable with: targets: "armv7-unknown-linux-gnueabihf, thumbv7em-none-eabihf" - - run: cargo check --release --target=${{matrix.target}} --no-default-features + - run: cargo check --release --target=${{matrix.target}} --no-default-features --features "alloc" fmt: name: Check formatting diff --git a/Cargo.toml b/Cargo.toml index 453e683..2d03fa7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "cfdp-rs" version = "0.2.0" -edition = "2021" -rust-version = "1.81.0" +edition = "2024" +rust-version = "1.85.0" authors = ["Robin Mueller "] description = "High level CCSDS File Delivery Protocol components" homepage = "https://egit.irs.uni-stuttgart.de/rust/cfdp" @@ -20,7 +20,7 @@ crc = "3" smallvec = "1" derive-new = ">=0.6, <=0.7" hashbrown = { version = ">=0.14, <=0.15", optional = true } -spacepackets = { version = "0.15", default-features = false } +spacepackets = { git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git", version = "0.16", default-features = false } thiserror = { version = "2", default-features = false } serde = { version = "1", optional = true } defmt = { version = "1", optional = true } diff --git a/README.md b/README.md index 8cef4a7..8b6693d 100644 --- a/README.md +++ b/README.md @@ -17,10 +17,11 @@ The underlying base packet library used to generate the packets to be sent is th `cfdp-rs` currently supports following high-level features: - Unacknowledged (class 1) file transfers for both source and destination side. +- Acknowledged (class 2) file transfers for the source side. The following features have not been implemented yet. PRs or notifications for demand are welcome! -- Acknowledged (class 2) file transfers for both source and destination side. +- Acknowledged (class 2) file transfers for the destination side. - Suspending transfers - Inactivity handling - Start and end of transmission and reception opportunity handling diff --git a/coverage.py b/automation/coverage.py similarity index 100% rename from coverage.py rename to automation/coverage.py diff --git a/docs.sh b/docs.sh deleted file mode 100755 index 37563d2..0000000 --- a/docs.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" -cargo +nightly doc --all-features --open diff --git a/examples/python-interop/main.rs b/examples/python-interop/main.rs index 8c775cf..ef21d7b 100644 --- a/examples/python-interop/main.rs +++ b/examples/python-interop/main.rs @@ -3,31 +3,35 @@ use std::{ fs::OpenOptions, io::{self, ErrorKind, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, - sync::mpsc, + sync::{ + atomic::{AtomicBool, AtomicU16}, + mpsc, + }, thread, time::Duration, }; use cfdp::{ + EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider, + RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider, dest::DestinationHandler, filestore::NativeFilestore, request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, - EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider, - RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider, }; use clap::Parser; use log::{debug, info, warn}; use spacepackets::{ cfdp::{ - pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError}, ChecksumType, ConditionCode, TransmissionMode, + pdu::{PduError, file_data::FileDataPdu, metadata::MetadataPduReader}, }, - seq_count::SeqCountProviderSyncU16, util::{UnsignedByteFieldU16, UnsignedEnum}, }; +static KILL_APP: AtomicBool = AtomicBool::new(false); + const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); @@ -231,7 +235,7 @@ impl UdpServer { Ok(None) } else { Err(e.into()) - } + }; } }; let (_, from) = res; @@ -338,7 +342,7 @@ fn main() { spacepackets::cfdp::TransmissionMode::Unacknowledged, ChecksumType::Crc32C, ); - let seq_count_provider = SeqCountProviderSyncU16::default(); + let seq_count_provider = AtomicU16::default(); let mut source_handler = SourceHandler::new( local_cfg_source, source_tm_tx, @@ -411,6 +415,9 @@ fn main() { .expect("put request failed"); } loop { + if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) { + break; + } let mut next_delay = None; let mut undelayed_call_count = 0; let packet_info = match source_tc_rx.try_recv() { @@ -453,6 +460,9 @@ fn main() { loop { let mut next_delay = None; let mut undelayed_call_count = 0; + if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) { + break; + } let packet_info = match dest_tc_rx.try_recv() { Ok(pdu_with_info) => Some(pdu_with_info), Err(e) => match e { @@ -494,6 +504,9 @@ fn main() { info!("Starting UDP server on {}", remote_addr); loop { loop { + if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) { + break; + } match udp_server.try_recv_tc() { Ok(result) => match result { Some((pdu, _addr)) => { diff --git a/justfile b/justfile new file mode 100644 index 0000000..8f68ad3 --- /dev/null +++ b/justfile @@ -0,0 +1,34 @@ +all: check build clippy fmt docs test coverage + +clippy: + cargo clippy -- -D warnings + +fmt: + cargo fmt --all -- --check + +check: + cargo check --all-features + +test: + cargo nextest r --all-features + cargo test --doc + +build: + cargo build --all-features + +embedded: + cargo build --target thumbv7em-none-eabihf --no-default-features --features "alloc" + +docs: + export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" + cargo +nightly doc --all-features + +docs-html: + export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" + cargo +nightly doc --all-features --open + +coverage: + cargo llvm-cov nextest + +coverage-html: + cargo llvm-cov nextest --html --open diff --git a/src/dest.rs b/src/dest.rs index b55334c..b36f648 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -29,29 +29,28 @@ //! 3. An EOF ACK PDU has been sent back to the remote side. //! 4. A Finished PDU has been sent back to the remote side. //! 5. A Finished PDU ACK was received. -use crate::{user::TransactionFinishedParams, DummyPduProvider, GenericSendError, PduProvider}; -use core::str::{from_utf8, from_utf8_unchecked, Utf8Error}; +use crate::{DummyPduProvider, GenericSendError, PduProvider, user::TransactionFinishedParams}; +use core::str::{Utf8Error, from_utf8, from_utf8_unchecked}; use super::{ - filestore::{FilestoreError, NativeFilestore, VirtualFilestore}, - user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, - RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCountdown, - StdRemoteEntityConfigProvider, StdTimerCreator, TimerContext, TimerCreatorProvider, + RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TimerCreatorProvider, TransactionId, UserFaultHookProvider, + filestore::{FilestoreError, VirtualFilestore}, + user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, }; use smallvec::SmallVec; use spacepackets::{ cfdp::{ + ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode, pdu::{ + CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, eof::EofPdu, file_data::FileDataPdu, finished::{DeliveryCode, FileStatus, FinishedPduCreator}, metadata::{MetadataGenericParams, MetadataPduReader}, - CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, }, - tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, GenericTlv, ReadableTlv, TlvType}, - ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode, + tlv::{EntityIdTlv, GenericTlv, ReadableTlv, TlvType, msg_to_user::MsgToUserTlv}, }, util::{UnsignedByteField, UnsignedEnum}, }; @@ -278,20 +277,20 @@ pub struct DestinationHandler< pub type StdDestinationHandler = DestinationHandler< PduSender, UserFaultHook, - NativeFilestore, - StdRemoteEntityConfigProvider, - StdTimerCreator, - StdCountdown, + crate::filestore::NativeFilestore, + crate::StdRemoteEntityConfigProvider, + crate::StdTimerCreator, + crate::StdCountdown, >; impl< - PduSender: PduSendProvider, - UserFaultHook: UserFaultHookProvider, - Vfs: VirtualFilestore, - RemoteCfgTable: RemoteEntityConfigProvider, - TimerCreator: TimerCreatorProvider, - Countdown: CountdownProvider, - > DestinationHandler + PduSender: PduSendProvider, + UserFaultHook: UserFaultHookProvider, + Vfs: VirtualFilestore, + RemoteCfgTable: RemoteEntityConfigProvider, + TimerCreator: TimerCreatorProvider, + Countdown: CountdownProvider, +> DestinationHandler { /// Constructs a new destination handler. /// @@ -990,7 +989,6 @@ impl< #[cfg(test)] mod tests { - use core::{cell::Cell, sync::atomic::AtomicBool}; #[allow(unused_imports)] use std::println; use std::{ @@ -999,80 +997,28 @@ mod tests { string::String, }; - use alloc::{sync::Arc, vec::Vec}; + use alloc::vec::Vec; use rand::Rng; use spacepackets::{ cfdp::{ - lv::Lv, - pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket}, ChecksumType, TransmissionMode, + lv::Lv, + pdu::{WritablePduPacket, finished::FinishedPduReader, metadata::MetadataPduCreator}, }, util::{UbfU16, UnsignedByteFieldU8}, }; use crate::{ + CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, filestore::NativeFilestore, tests::{ - basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler, - LOCAL_ID, REMOTE_ID, + LOCAL_ID, REMOTE_ID, SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, + TestCheckTimerCreator, TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table, }, - CountdownProvider, FaultHandler, IndicationConfig, PduRawWithInfo, - StdRemoteEntityConfigProvider, TimerCreatorProvider, CRC_32, }; use super::*; - #[derive(Debug)] - struct TestCheckTimer { - counter: Cell, - expired: Arc, - } - - impl CountdownProvider for TestCheckTimer { - fn has_expired(&self) -> bool { - self.expired.load(core::sync::atomic::Ordering::Relaxed) - } - fn reset(&mut self) { - self.counter.set(0); - } - } - - impl TestCheckTimer { - pub fn new(expired_flag: Arc) -> Self { - Self { - counter: Cell::new(0), - expired: expired_flag, - } - } - } - - struct TestCheckTimerCreator { - check_limit_expired_flag: Arc, - } - - impl TestCheckTimerCreator { - pub fn new(expired_flag: Arc) -> Self { - Self { - check_limit_expired_flag: expired_flag, - } - } - } - - impl TimerCreatorProvider for TestCheckTimerCreator { - type Countdown = TestCheckTimer; - - fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { - match timer_context { - TimerContext::CheckLimit { .. } => { - TestCheckTimer::new(self.check_limit_expired_flag.clone()) - } - _ => { - panic!("invalid check timer creator, can only be used for check limit handling") - } - } - } - } - type TestDestHandler = DestinationHandler< TestCfdpSender, TestFaultHandler, @@ -1083,7 +1029,7 @@ mod tests { >; struct DestHandlerTestbench { - check_timer_expired: Arc, + expiry_control: TimerExpiryControl, handler: TestDestHandler, src_path: PathBuf, dest_path: PathBuf, @@ -1110,12 +1056,11 @@ mod tests { src_path: PathBuf, dest_path: PathBuf, ) -> Self { - let check_timer_expired = Arc::new(AtomicBool::new(false)); + let expiry_control = TimerExpiryControl::default(); let test_sender = TestCfdpSender::default(); - let dest_handler = - default_dest_handler(fault_handler, test_sender, check_timer_expired.clone()); + let dest_handler = default_dest_handler(fault_handler, test_sender, &expiry_control); let handler = Self { - check_timer_expired, + expiry_control, handler: dest_handler, src_path, closure_requested, @@ -1157,8 +1102,9 @@ mod tests { } fn set_check_timer_expired(&mut self) { - self.check_timer_expired - .store(true, core::sync::atomic::Ordering::Relaxed); + self.expiry_control + .check_limit + .store(true, core::sync::atomic::Ordering::Release); } fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser { @@ -1301,7 +1247,7 @@ mod tests { fn default_dest_handler( test_fault_handler: TestFaultHandler, test_packet_sender: TestCfdpSender, - check_timer_expired: Arc, + expiry_control: &TimerExpiryControl, ) -> TestDestHandler { let local_entity_cfg = LocalEntityConfig { id: REMOTE_ID.into(), @@ -1314,7 +1260,7 @@ mod tests { test_packet_sender, NativeFilestore::default(), basic_remote_cfg_table(LOCAL_ID, 1024, true), - TestCheckTimerCreator::new(check_timer_expired), + TestCheckTimerCreator::new(expiry_control), ) } @@ -1372,14 +1318,17 @@ mod tests { fn test_basic() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); - let dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); + let dest_handler = + default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default()); assert!(dest_handler.transmission_mode().is_none()); - assert!(dest_handler - .local_cfg - .fault_handler - .user_hook - .borrow() - .all_queues_empty()); + assert!( + dest_handler + .local_cfg + .fault_handler + .user_hook + .borrow() + .all_queues_empty() + ); assert!(dest_handler.pdu_sender.queue_empty()); assert_eq!(dest_handler.state(), State::Idle); assert_eq!(dest_handler.step(), TransactionStep::Idle); @@ -1389,7 +1338,8 @@ mod tests { fn test_cancelling_idle_fsm() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); - let mut dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); + let mut dest_handler = + default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default()); assert!(!dest_handler.cancel_request(&TransactionId::new( UnsignedByteFieldU8::new(0).into(), UnsignedByteFieldU8::new(0).into() diff --git a/src/filestore.rs b/src/filestore.rs index 12fd848..e3e8e7a 100644 --- a/src/filestore.rs +++ b/src/filestore.rs @@ -1,5 +1,5 @@ -use spacepackets::cfdp::ChecksumType; use spacepackets::ByteConversionError; +use spacepackets::cfdp::ChecksumType; #[cfg(feature = "std")] pub use std_mod::*; @@ -375,9 +375,11 @@ mod tests { .create_dir(dir_path.to_str().expect("getting str for file failed")) .unwrap(); assert!(NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap()); - assert!(NATIVE_FS - .is_dir(dir_path.as_path().to_str().unwrap()) - .unwrap()); + assert!( + NATIVE_FS + .is_dir(dir_path.as_path().to_str().unwrap()) + .unwrap() + ); } #[test] diff --git a/src/lib.rs b/src/lib.rs index dda8628..adef6df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,13 +5,14 @@ //! //! # Features //! -//! The crate currently supports following features: +//! `cfdp-rs` currently supports following high-level features: //! //! - Unacknowledged (class 1) file transfers for both source and destination side. +//! - Acknowledged (class 2) file transfers for the source side. //! //! The following features have not been implemented yet. PRs or notifications for demand are welcome! //! -//! - Acknowledged (class 2) file transfers for both source and destination side. +//! - Acknowledged (class 2) file transfers for the destination side. //! - Suspending transfers //! - Inactivity handling //! - Start and end of transmission and reception opportunity handling @@ -100,7 +101,7 @@ pub mod user; use crate::time::CountdownProvider; use core::{cell::RefCell, fmt::Debug, hash::Hash}; -use crc::{Crc, CRC_32_ISCSI, CRC_32_ISO_HDLC}; +use crc::{CRC_32_ISCSI, CRC_32_ISO_HDLC, Crc}; #[cfg(feature = "alloc")] pub use alloc_mod::*; @@ -109,8 +110,8 @@ use core::time::Duration; use serde::{Deserialize, Serialize}; use spacepackets::{ cfdp::{ - pdu::{FileDirectiveType, PduError, PduHeader}, ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode, + pdu::{FileDirectiveType, PduError, PduHeader}, }, util::{UnsignedByteField, UnsignedEnum}, }; @@ -619,7 +620,6 @@ impl LocalEntityConfig { } /// Generic error type for sending a PDU via a message queue. -#[cfg(feature = "std")] #[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)] #[non_exhaustive] pub enum GenericSendError { @@ -631,7 +631,6 @@ pub enum GenericSendError { Other, } -#[cfg(feature = "std")] pub trait PduSendProvider { fn send_pdu( &self, @@ -846,40 +845,39 @@ pub fn determine_packet_target(raw_pdu: &[u8]) -> Result expected: None, } })?; - let packet_target = - match file_directive_type { - // Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a. - // the source handler - FileDirectiveType::NakPdu - | FileDirectiveType::FinishedPdu - | FileDirectiveType::KeepAlivePdu => PacketTarget::SourceEntity, - // Section b) of 4.5.3: These PDUs should always be targeted towards the file receiver a.k.a. - // the destination handler - FileDirectiveType::MetadataPdu - | FileDirectiveType::EofPdu - | FileDirectiveType::PromptPdu => PacketTarget::DestEntity, - // Section a): Recipient depends of the type of PDU that is being acknowledged. We can simply - // extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to - // the source handler, for a Finished PDU, it is passed to the destination handler. - FileDirectiveType::AckPdu => { - let acked_directive = FileDirectiveType::try_from(raw_pdu[header_len + 1]) - .map_err(|_| PduError::InvalidDirectiveType { - found: raw_pdu[header_len], - expected: None, - })?; - if acked_directive == FileDirectiveType::EofPdu { - PacketTarget::SourceEntity - } else if acked_directive == FileDirectiveType::FinishedPdu { - PacketTarget::DestEntity - } else { - // TODO: Maybe a better error? This might be confusing.. - return Err(PduError::InvalidDirectiveType { - found: raw_pdu[header_len + 1], - expected: None, - }); - } + let packet_target = match file_directive_type { + // Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a. + // the source handler + FileDirectiveType::NakPdu + | FileDirectiveType::FinishedPdu + | FileDirectiveType::KeepAlivePdu => PacketTarget::SourceEntity, + // Section b) of 4.5.3: These PDUs should always be targeted towards the file receiver a.k.a. + // the destination handler + FileDirectiveType::MetadataPdu + | FileDirectiveType::EofPdu + | FileDirectiveType::PromptPdu => PacketTarget::DestEntity, + // Section a): Recipient depends of the type of PDU that is being acknowledged. We can simply + // extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to + // the source handler, for a Finished PDU, it is passed to the destination handler. + FileDirectiveType::AckPdu => { + let acked_directive = FileDirectiveType::try_from(raw_pdu[header_len + 1] >> 4) + .map_err(|_| PduError::InvalidDirectiveType { + found: (raw_pdu[header_len + 1] >> 4), + expected: None, + })?; + if acked_directive == FileDirectiveType::EofPdu { + PacketTarget::SourceEntity + } else if acked_directive == FileDirectiveType::FinishedPdu { + PacketTarget::DestEntity + } else { + // TODO: Maybe a better error? This might be confusing.. + return Err(PduError::InvalidDirectiveType { + found: raw_pdu[header_len + 1], + expected: None, + }); } - }; + } + }; Ok(packet_target) } @@ -941,11 +939,11 @@ impl PduProvider for PduRawWithInfo<'_> { #[cfg(feature = "alloc")] pub mod alloc_mod { use spacepackets::cfdp::{ - pdu::{FileDirectiveType, PduError}, PduType, + pdu::{FileDirectiveType, PduError}, }; - use crate::{determine_packet_target, PacketTarget, PduProvider, PduRawWithInfo}; + use crate::{PacketTarget, PduProvider, PduRawWithInfo, determine_packet_target}; #[derive(Debug, PartialEq, Eq, Clone)] pub struct PduOwnedWithInfo { @@ -1003,21 +1001,25 @@ pub mod alloc_mod { #[cfg(test)] pub(crate) mod tests { - use core::cell::RefCell; + use core::{ + cell::{Cell, RefCell}, + sync::atomic::AtomicBool, + }; + use std::{println, sync::Arc}; use alloc::{collections::VecDeque, string::String, vec::Vec}; use spacepackets::{ cfdp::{ + ChecksumType, ConditionCode, PduType, TransmissionMode, lv::Lv, pdu::{ + CommonPduConfig, FileDirectiveType, PduHeader, eof::EofPdu, file_data::FileDataPdu, metadata::{MetadataGenericParams, MetadataPduCreator}, - CommonPduConfig, FileDirectiveType, PduHeader, WritablePduPacket, }, - ChecksumType, ConditionCode, PduType, TransmissionMode, }, - util::{UnsignedByteField, UnsignedByteFieldU16, UnsignedByteFieldU8, UnsignedEnum}, + util::{UnsignedByteField, UnsignedByteFieldU8, UnsignedByteFieldU16, UnsignedEnum}, }; use user::{CfdpUser, OwnedMetadataRecvdParams, TransactionFinishedParams}; @@ -1028,6 +1030,111 @@ pub(crate) mod tests { pub const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); pub const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); + // This test structure allows to precisely control the expiry of CFDP timers. + #[derive(Debug, Default, Clone)] + pub(crate) struct TimerExpiryControl { + pub(crate) check_limit: Arc, + pub(crate) positive_ack: Arc, + } + + impl TimerExpiryControl { + pub fn set_check_limit_expired(&mut self) { + self.check_limit + .store(true, core::sync::atomic::Ordering::Release); + } + + #[allow(dead_code)] + pub fn set_positive_ack_expired(&mut self) { + self.positive_ack + .store(true, core::sync::atomic::Ordering::Release); + } + } + + #[derive(Debug)] + pub(crate) struct TestCheckTimer { + counter: Cell, + context: TimerContext, + expiry_control: TimerExpiryControl, + } + + impl CountdownProvider for TestCheckTimer { + fn has_expired(&self) -> bool { + match self.context { + TimerContext::CheckLimit { + local_id: _, + remote_id: _, + entity_type: _, + } => self + .expiry_control + .check_limit + .load(core::sync::atomic::Ordering::Acquire), + TimerContext::PositiveAck { expiry_time: _ } => self + .expiry_control + .positive_ack + .load(core::sync::atomic::Ordering::Acquire), + TimerContext::NakActivity { expiry_time: _ } => todo!(), + } + } + fn reset(&mut self) { + match self.context { + TimerContext::CheckLimit { + local_id: _, + remote_id: _, + entity_type: _, + } => self + .expiry_control + .check_limit + .store(false, core::sync::atomic::Ordering::Release), + TimerContext::NakActivity { expiry_time: _ } => todo!(), + TimerContext::PositiveAck { expiry_time: _ } => self + .expiry_control + .positive_ack + .store(false, core::sync::atomic::Ordering::Release), + } + self.counter.set(0); + } + } + + impl TestCheckTimer { + pub fn new(context: TimerContext, expiry_control: &TimerExpiryControl) -> Self { + Self { + counter: Cell::new(0), + context, + expiry_control: expiry_control.clone(), + } + } + } + + pub(crate) struct TestCheckTimerCreator { + expiry_control: TimerExpiryControl, + } + + impl TestCheckTimerCreator { + pub fn new(expiry_control: &TimerExpiryControl) -> Self { + Self { + expiry_control: expiry_control.clone(), + } + } + } + + impl TimerCreatorProvider for TestCheckTimerCreator { + type Countdown = TestCheckTimer; + + fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown { + match timer_context { + TimerContext::CheckLimit { .. } => { + TestCheckTimer::new(timer_context, &self.expiry_control) + } + TimerContext::PositiveAck { expiry_time: _ } => { + TestCheckTimer::new(timer_context, &self.expiry_control) + } + _ => { + panic!("invalid check timer creator, can only be used for check limit handling") + } + } + } + } + pub struct FileSegmentRecvdParamsNoSegMetadata { #[allow(dead_code)] pub id: TransactionId, @@ -1247,6 +1354,12 @@ pub(crate) mod tests { file_directive_type: Option, raw_pdu: &[u8], ) -> Result<(), GenericSendError> { + println!( + "sent pdu: {:?}, directive: {:?}, len: {}", + pdu_type, + file_directive_type, + raw_pdu.len() + ); self.packet_queue.borrow_mut().push_back(SentPdu { pdu_type, file_directive_type, @@ -1260,6 +1373,7 @@ pub(crate) mod tests { pub fn retrieve_next_pdu(&self) -> Option { self.packet_queue.borrow_mut().pop_front() } + pub fn queue_empty(&self) -> bool { self.packet_queue.borrow_mut().is_empty() } diff --git a/src/request.rs b/src/request.rs index 1f2cc51..244186f 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,7 +1,7 @@ use spacepackets::{ cfdp::{ - tlv::{GenericTlv, Tlv, TlvType}, SegmentationControl, TransmissionMode, + tlv::{GenericTlv, Tlv, TlvType}, }, util::UnsignedByteField, }; @@ -233,8 +233,8 @@ pub mod alloc_mod { use super::*; use alloc::string::ToString; use spacepackets::{ - cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv, TlvOwned, WritableTlv}, ByteConversionError, + cfdp::tlv::{ReadableTlv, TlvOwned, WritableTlv, msg_to_user::MsgToUserTlv}, }; /// Owned variant of [PutRequest] with no lifetimes which is also [Clone]able. @@ -557,7 +557,7 @@ mod tests { use std::string::String; use spacepackets::{ - cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv}, + cfdp::tlv::{ReadableTlv, msg_to_user::MsgToUserTlv}, util::UbfU16, }; diff --git a/src/source.rs b/src/source.rs index eb1dd98..3eb46a7 100644 --- a/src/source.rs +++ b/src/source.rs @@ -36,41 +36,48 @@ //! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity. //! The [spacepackets::cfdp::pdu::finished::FinishedPduReader] can be used to inspect the //! generated PDU. -use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error}; +use core::{ + cell::{Cell, RefCell}, + ops::ControlFlow, + str::Utf8Error, + time::Duration, +}; use spacepackets::{ + ByteConversionError, cfdp::{ + ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl, + TransactionStatus, TransmissionMode, lv::Lv, pdu::{ + CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, + ack::AckPdu, eof::EofPdu, file_data::{ - calculate_max_file_seg_len_for_max_packet_len_and_pdu_header, FileDataPduCreatorWithReservedDatafield, + calculate_max_file_seg_len_for_max_packet_len_and_pdu_header, }, finished::{DeliveryCode, FileStatus, FinishedPduReader}, metadata::{MetadataGenericParams, MetadataPduCreator}, - CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, + nak::NakPduReader, }, - ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl, - TransmissionMode, }, util::{UnsignedByteField, UnsignedEnum}, - ByteConversionError, }; -use spacepackets::seq_count::SequenceCountProvider; +use spacepackets::seq_count::SequenceCounter; use crate::{ - time::CountdownProvider, DummyPduProvider, EntityType, GenericSendError, PduProvider, - TimerCreatorProvider, + DummyPduProvider, EntityType, GenericSendError, PduProvider, TimerCreatorProvider, + time::CountdownProvider, }; use super::{ + LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig, + RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, filestore::{FilestoreError, VirtualFilestore}, request::{ReadablePutRequest, StaticPutRequestCacher}, user::{CfdpUser, TransactionFinishedParams}, - LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig, - RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, }; /// This enumeration models the different transaction steps of the source entity handler. @@ -87,11 +94,11 @@ pub enum TransactionStep { SendingEof = 6, WaitingForEofAck = 7, WaitingForFinished = 8, - // SendingAckOfFinished = 9, + //SendingAckOfFinished = 9, NoticeOfCompletion = 10, } -#[derive(Default)] +#[derive(Default, Copy, Clone)] pub struct FileParams { pub progress: u64, pub segment_len: u64, @@ -99,15 +106,40 @@ pub struct FileParams { pub metadata_only: bool, pub file_size: u64, pub empty_file: bool, + /// The checksum is cached to avoid expensive re-calculation when the EOF PDU needs to be + /// re-sent. + pub checksum_completed_file: Option, } -pub struct StateHelper { - state: super::State, - step: TransactionStep, - num_packets_ready: u32, +// Explicit choice to put all simple internal fields into Cells. +// I think this is more efficient than wrapping the whole helper into a RefCell, especially +// because some of the individual fields are used frequently. +struct StateHelper { + step: Cell, + state: Cell, + num_packets_ready: Cell, } -#[derive(Debug)] +impl Default for StateHelper { + fn default() -> Self { + Self { + state: Cell::new(super::State::Idle), + step: Cell::new(TransactionStep::Idle), + num_packets_ready: Cell::new(0), + } + } +} + +impl StateHelper { + #[allow(dead_code)] + pub fn reset(&self) { + self.step.set(TransactionStep::Idle); + self.state.set(super::State::Idle); + self.num_packets_ready.set(0); + } +} + +#[derive(Debug, Copy, Clone)] pub struct FinishedParams { condition_code: ConditionCode, delivery_code: DeliveryCode, @@ -116,22 +148,12 @@ pub struct FinishedParams { #[derive(Debug, derive_new::new)] pub struct TransferState { - transaction_id: TransactionId, - remote_cfg: RemoteEntityConfig, - transmission_mode: super::TransmissionMode, - closure_requested: bool, - cond_code_eof: Option, - finished_params: Option, -} - -impl Default for StateHelper { - fn default() -> Self { - Self { - state: super::State::Idle, - step: TransactionStep::Idle, - num_packets_ready: 0, - } - } + transaction_id: Cell, + remote_cfg: RefCell, + transmission_mode: Cell, + closure_requested: Cell, + cond_code_eof: Cell>, + finished_params: Cell>, } #[derive(Debug, thiserror::Error)] @@ -156,6 +178,8 @@ pub enum SourceError { SourceFileNotValidUtf8(Utf8Error), #[error("destination file does not have valid UTF8 format: {0}")] DestFileNotValidUtf8(Utf8Error), + #[error("invalid NAK PDU received")] + InvalidNakPdu, #[error("error related to PDU creation: {0}")] Pdu(#[from] PduError), #[error("cfdp feature not implemented")] @@ -180,6 +204,24 @@ pub enum PutRequestError { FilestoreError(#[from] FilestoreError), } +#[derive(Debug)] +struct PositiveAckParams { + ack_timer: Countdown, + ack_counter: u32, +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct AnomalyTracker { + invalid_ack_directive_code: u8, +} + +#[derive(Debug, Default, PartialEq, Eq)] +pub enum FsmContext { + #[default] + None, + ResetWhenPossible, +} + /// This is the primary CFDP source handler. It models the CFDP source entity, which is /// primarily responsible for handling put requests to send files to another CFDP destination /// entity. @@ -214,7 +256,7 @@ pub struct SourceHandler< RemoteCfgTable: RemoteEntityConfigProvider, TimerCreator: TimerCreatorProvider, Countdown: CountdownProvider, - SeqCountProvider: SequenceCountProvider, + SequenceCounterInstance: SequenceCounter, > { local_cfg: LocalEntityConfig, pdu_sender: PduSender, @@ -224,25 +266,27 @@ pub struct SourceHandler< vfs: Vfs, state_helper: StateHelper, // Transfer related state information - tstate: Option, + transfer_state: Option, // File specific transfer fields - fparams: FileParams, + file_params: FileParams, // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers. pdu_conf: CommonPduConfig, - countdown: Option, + check_timer: RefCell>, + positive_ack_params: RefCell>>, timer_creator: TimerCreator, - seq_count_provider: SeqCountProvider, + seq_count_provider: SequenceCounterInstance, + anomalies: AnomalyTracker, } impl< - PduSender: PduSendProvider, - UserFaultHook: UserFaultHookProvider, - Vfs: VirtualFilestore, - RemoteCfgTable: RemoteEntityConfigProvider, - TimerCreator: TimerCreatorProvider, - Countdown: CountdownProvider, - SeqCountProvider: SequenceCountProvider, - > + PduSender: PduSendProvider, + UserFaultHook: UserFaultHookProvider, + Vfs: VirtualFilestore, + RemoteCfgTable: RemoteEntityConfigProvider, + TimerCreator: TimerCreatorProvider, + Countdown: CountdownProvider, + SequenceCounterInstance: SequenceCounter, +> SourceHandler< PduSender, UserFaultHook, @@ -250,7 +294,7 @@ impl< RemoteCfgTable, TimerCreator, Countdown, - SeqCountProvider, + SequenceCounterInstance, > { /// Creates a new instance of a source handler. @@ -273,7 +317,7 @@ impl< /// * `timer_creator` - [TimerCreatorProvider] used by the CFDP handler to generate /// timers required by various tasks. This allows to use this handler for embedded systems /// where the standard time APIs might not be available. - /// * `seq_count_provider` - The [SequenceCountProvider] used to generate the [TransactionId] + /// * `seq_count_provider` - The [SequenceCounter] used to generate the [TransactionId] /// which contains an incrementing counter. #[allow(clippy::too_many_arguments)] pub fn new( @@ -284,7 +328,7 @@ impl< pdu_and_cksum_buf_size: usize, remote_cfg_table: RemoteCfgTable, timer_creator: TimerCreator, - seq_count_provider: SeqCountProvider, + seq_count_provider: SequenceCounterInstance, ) -> Self { Self { local_cfg: cfg, @@ -294,10 +338,12 @@ impl< vfs, put_request_cacher, state_helper: Default::default(), - tstate: Default::default(), - fparams: Default::default(), + transfer_state: Default::default(), + file_params: Default::default(), pdu_conf: Default::default(), - countdown: None, + anomalies: Default::default(), + check_timer: RefCell::new(None), + positive_ack_params: RefCell::new(None), timer_creator, seq_count_provider, } @@ -324,15 +370,19 @@ impl< cfdp_user: &mut impl CfdpUser, pdu: Option<&impl PduProvider>, ) -> Result { + let mut sent_packets = 0; if let Some(packet) = pdu { - self.insert_packet(cfdp_user, packet)?; + sent_packets += self.insert_packet(cfdp_user, packet)?; } - match self.state_helper.state { + match self.state() { super::State::Idle => { // TODO: In acknowledged mode, add timer handling. Ok(0) } - super::State::Busy => self.fsm_busy(cfdp_user, pdu), + super::State::Busy => { + sent_packets += self.fsm_busy(cfdp_user, pdu)?; + Ok(sent_packets) + } super::State::Suspended => { // There is now way to suspend the handler currently anyway. Ok(0) @@ -340,47 +390,34 @@ impl< } } - fn insert_packet( - &mut self, - _cfdp_user: &mut impl CfdpUser, - packet_to_insert: &impl PduProvider, - ) -> Result<(), SourceError> { - if packet_to_insert.packet_target()? != PacketTarget::SourceEntity { - // Unwrap is okay here, a PacketInfo for a file data PDU should always have the - // destination as the target. - return Err(SourceError::CantProcessPacketType { - pdu_type: packet_to_insert.pdu_type(), - directive_type: packet_to_insert.file_directive_type(), - }); - } - if packet_to_insert.pdu_type() == PduType::FileData { - // The [PacketInfo] API should ensure that file data PDUs can not be passed - // into a source entity, so this should never happen. - return Err(SourceError::UnexpectedPdu { - pdu_type: PduType::FileData, - directive_type: None, - }); - } - // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is - // always a valid value. - match packet_to_insert - .file_directive_type() - .expect("PDU directive type unexpectedly not set") - { - FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_to_insert)?, - FileDirectiveType::NakPdu => self.handle_nak_pdu(), - FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), - FileDirectiveType::AckPdu => return Err(SourceError::NotImplemented), - FileDirectiveType::EofPdu - | FileDirectiveType::PromptPdu - | FileDirectiveType::MetadataPdu => { - return Err(SourceError::CantProcessPacketType { - pdu_type: packet_to_insert.pdu_type(), - directive_type: packet_to_insert.file_directive_type(), - }); - } - } - Ok(()) + #[inline] + pub fn transaction_id(&self) -> Option { + self.transfer_state.as_ref().map(|v| v.transaction_id.get()) + } + + /// Returns the [TransmissionMode] for the active file operation. + #[inline] + pub fn transmission_mode(&self) -> Option { + self.transfer_state + .as_ref() + .map(|v| v.transmission_mode.get()) + } + + /// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when + /// applicable. + #[inline] + pub fn step(&self) -> TransactionStep { + self.state_helper.step.get() + } + + #[inline] + pub fn state(&self) -> State { + self.state_helper.state.get() + } + + #[inline] + pub fn local_cfg(&self) -> &LocalEntityConfig { + &self.local_cfg } /// This function is used to pass a put request to the source handler, which is @@ -394,7 +431,7 @@ impl< &mut self, put_request: &impl ReadablePutRequest, ) -> Result<(), PutRequestError> { - if self.state_helper.state != super::State::Idle { + if self.state() != super::State::Idle { return Err(PutRequestError::AlreadyBusy); } self.put_request_cacher.set(put_request)?; @@ -410,7 +447,7 @@ impl< )); } let remote_cfg = remote_cfg.unwrap(); - self.state_helper.num_packets_ready = 0; + self.state_helper.num_packets_ready.set(0); let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() { self.put_request_cacher.static_fields.trans_mode.unwrap() } else { @@ -438,7 +475,7 @@ impl< let transaction_id = TransactionId::new( self.local_cfg().id, UnsignedByteField::new( - SeqCountProvider::MAX_BIT_WIDTH / 8, + SequenceCounterInstance::MAX_BIT_WIDTH / 8, self.seq_count_provider.get_and_increment().into(), ), ); @@ -469,21 +506,75 @@ impl< self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into(); self.pdu_conf.transaction_seq_num = *transaction_id.seq_num(); self.pdu_conf.trans_mode = transmission_mode; - self.fparams.segment_len = self.calculate_max_file_seg_len(remote_cfg); + self.file_params.segment_len = self.calculate_max_file_seg_len(remote_cfg); // Set up the transfer context structure. - self.tstate = Some(TransferState { - transaction_id, - remote_cfg: *remote_cfg, - transmission_mode, - closure_requested, - cond_code_eof: None, - finished_params: None, + self.transfer_state = Some(TransferState { + transaction_id: Cell::new(transaction_id), + remote_cfg: RefCell::new(*remote_cfg), + transmission_mode: Cell::new(transmission_mode), + closure_requested: Cell::new(closure_requested), + cond_code_eof: Cell::new(None), + finished_params: Cell::new(None), }); - self.state_helper.state = super::State::Busy; + self.state_helper.state.set(super::State::Busy); Ok(()) } + fn insert_packet( + &mut self, + _cfdp_user: &mut impl CfdpUser, + packet_to_insert: &impl PduProvider, + ) -> Result { + if packet_to_insert.packet_target()? != PacketTarget::SourceEntity { + // Unwrap is okay here, a PacketInfo for a file data PDU should always have the + // destination as the target. + return Err(SourceError::CantProcessPacketType { + pdu_type: packet_to_insert.pdu_type(), + directive_type: packet_to_insert.file_directive_type(), + }); + } + if packet_to_insert.pdu_type() == PduType::FileData { + // The [PacketInfo] API should ensure that file data PDUs can not be passed + // into a source entity, so this should never happen. + return Err(SourceError::UnexpectedPdu { + pdu_type: PduType::FileData, + directive_type: None, + }); + } + let mut sent_packets = 0; + + // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is + // always a valid value. + match packet_to_insert + .file_directive_type() + .expect("PDU directive type unexpectedly not set") + { + FileDirectiveType::FinishedPdu => { + let finished_pdu = FinishedPduReader::new(packet_to_insert.pdu())?; + self.handle_finished_pdu(&finished_pdu)? + } + FileDirectiveType::NakPdu => { + let nak_pdu = NakPduReader::new(packet_to_insert.pdu())?; + sent_packets += self.handle_nak_pdu(&nak_pdu)?; + } + FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), + FileDirectiveType::AckPdu => { + let ack_pdu = AckPdu::from_bytes(packet_to_insert.pdu())?; + self.handle_ack_pdu(&ack_pdu)? + } + FileDirectiveType::EofPdu + | FileDirectiveType::PromptPdu + | FileDirectiveType::MetadataPdu => { + return Err(SourceError::CantProcessPacketType { + pdu_type: packet_to_insert.pdu_type(), + directive_type: packet_to_insert.file_directive_type(), + }); + } + } + Ok(sent_packets) + } + /// This functions models the Cancel.request CFDP primitive and is the recommended way to /// cancel a transaction. /// @@ -500,171 +591,221 @@ impl< user: &mut impl CfdpUser, transaction_id: &TransactionId, ) -> Result { - if self.state_helper.state == super::State::Idle { + if self.state() == super::State::Idle { return Ok(false); } if let Some(active_id) = self.transaction_id() { if active_id == *transaction_id { // Control flow result can be ignored here for the cancel request. - let _ = self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?; + self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?; return Ok(true); } } Ok(false) } + /// This function is public to allow completely resetting the handler, but it is explicitely + /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. + /// Resetting the handler might interfere with these mechanisms and lead to unexpected + /// behaviour. + pub fn reset(&mut self) { + self.state_helper = Default::default(); + self.transfer_state = None; + self.file_params = Default::default(); + *self.check_timer.borrow_mut() = None; + } + + #[inline] + fn set_step(&mut self, step: TransactionStep) { + self.set_step_internal(step); + } + + #[inline] + fn set_step_internal(&self, step: TransactionStep) { + self.state_helper.step.set(step); + } + fn fsm_busy( &mut self, user: &mut impl CfdpUser, - pdu: Option<&impl PduProvider>, + _pdu: Option<&impl PduProvider>, ) -> Result { let mut sent_packets = 0; - if self.state_helper.step == TransactionStep::Idle { - self.state_helper.step = TransactionStep::TransactionStart; + if self.step() == TransactionStep::Idle { + self.set_step(TransactionStep::TransactionStart); } - if self.state_helper.step == TransactionStep::TransactionStart { + if self.step() == TransactionStep::TransactionStart { self.handle_transaction_start(user)?; - self.state_helper.step = TransactionStep::SendingMetadata; + self.set_step(TransactionStep::SendingMetadata); } - if self.state_helper.step == TransactionStep::SendingMetadata { + if self.step() == TransactionStep::SendingMetadata { self.prepare_and_send_metadata_pdu()?; - self.state_helper.step = TransactionStep::SendingFileData; + self.set_step(TransactionStep::SendingFileData); sent_packets += 1; } - if self.state_helper.step == TransactionStep::SendingFileData { + if self.step() == TransactionStep::SendingFileData { if let ControlFlow::Break(packets) = self.file_data_fsm()? { sent_packets += packets; // Exit for each file data PDU to allow flow control. return Ok(sent_packets); } } - if self.state_helper.step == TransactionStep::SendingEof { + if self.step() == TransactionStep::SendingEof { self.eof_fsm(user)?; sent_packets += 1; } - if self.state_helper.step == TransactionStep::WaitingForFinished { - self.handle_wait_for_finished_pdu(user, pdu)?; + if self.step() == TransactionStep::WaitingForEofAck { + sent_packets += self.handle_waiting_for_ack_pdu(user)?; } - if self.state_helper.step == TransactionStep::NoticeOfCompletion { + if self.step() == TransactionStep::WaitingForFinished { + sent_packets += self.handle_waiting_for_finished_pdu(user)?; + } + if self.step() == TransactionStep::NoticeOfCompletion { self.notice_of_completion(user); self.reset(); } Ok(sent_packets) } - fn handle_wait_for_finished_pdu( + fn handle_waiting_for_ack_pdu(&mut self, user: &mut impl CfdpUser) -> Result { + self.handle_positive_ack_procedures(user) + } + + fn handle_positive_ack_procedures( &mut self, user: &mut impl CfdpUser, - packet: Option<&impl PduProvider>, - ) -> Result<(), SourceError> { - if let Some(packet) = packet { - if let Some(FileDirectiveType::FinishedPdu) = packet.file_directive_type() { - let finished_pdu = FinishedPduReader::new(packet.pdu())?; - self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams { - condition_code: finished_pdu.condition_code(), - delivery_code: finished_pdu.delivery_code(), - file_status: finished_pdu.file_status(), - }); - if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { - // TODO: Ack packet handling - self.state_helper.step = TransactionStep::NoticeOfCompletion; + ) -> Result { + let mut sent_packets = 0; + let mut positive_ack_limit_reached = false; + if let Some(positive_ack_params) = self.positive_ack_params.borrow_mut().as_mut() { + if positive_ack_params.ack_timer.has_expired() { + let ack_timer_exp_limit = self + .transfer_state + .as_ref() + .unwrap() + .remote_cfg + .borrow() + .positive_ack_timer_expiration_limit; + if positive_ack_params.ack_counter + 1 >= ack_timer_exp_limit { + positive_ack_limit_reached = true; } else { - self.state_helper.step = TransactionStep::NoticeOfCompletion; + positive_ack_params.ack_timer.reset(); + positive_ack_params.ack_counter += 1; + self.prepare_and_send_eof_pdu( + user, + self.file_params.checksum_completed_file.unwrap(), + )?; + sent_packets += 1; } - return Ok(()); } } - // If we reach this state, countdown is definitely valid instance. - if self.countdown.as_ref().unwrap().has_expired() { - self.declare_fault(user, ConditionCode::CheckLimitReached)?; + if positive_ack_limit_reached { + let (fault_packets_sent, ctx) = + self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?; + if ctx == FsmContext::ResetWhenPossible { + self.reset(); + } + sent_packets += fault_packets_sent; } - /* - def _handle_wait_for_finish(self): - if ( - self.transmission_mode == TransmissionMode.ACKNOWLEDGED - and self.__handle_retransmission() - ): - return - if ( - self._inserted_pdu.pdu is None - or self._inserted_pdu.pdu_directive_type is None - or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU - ): - if self._params.check_timer is not None: - if self._params.check_timer.timed_out(): - self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED) - return - finished_pdu = self._inserted_pdu.to_finished_pdu() - self._inserted_pdu.pdu = None - self._params.finished_params = finished_pdu.finished_params - if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: - self._prepare_finished_ack_packet(finished_pdu.condition_code) - self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED - else: - self.states.step = TransactionStep.NOTICE_OF_COMPLETION - */ - Ok(()) + Ok(sent_packets) + } + + fn handle_retransmission(&mut self, nak_pdu: &NakPduReader) -> Result { + let mut sent_packets = 0; + let segment_req_iter = nak_pdu.get_segment_requests_iterator().unwrap(); + for segment_req in segment_req_iter { + // Special case: Metadata PDU is re-requested. + if segment_req.0 == 0 && segment_req.1 == 0 { + self.prepare_and_send_metadata_pdu()?; + sent_packets += 1; + continue; + } else { + if (segment_req.1 < segment_req.0) || (segment_req.0 > self.file_params.progress) { + return Err(SourceError::InvalidNakPdu); + } + let mut missing_chunk_len = segment_req.1 - segment_req.0; + let current_offset = segment_req.0; + while missing_chunk_len > 0 { + let chunk_size = + core::cmp::min(missing_chunk_len, self.file_params.segment_len); + self.prepare_and_send_file_data_pdu(current_offset, chunk_size)?; + sent_packets += 1; + missing_chunk_len -= missing_chunk_len; + } + } + } + Ok(sent_packets) + } + + fn handle_waiting_for_finished_pdu( + &mut self, + user: &mut impl CfdpUser, + ) -> Result { + // If we reach this state, countdown definitely is set. + #[allow(clippy::collapsible_if)] + if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged + && self.check_timer.borrow().as_ref().unwrap().has_expired() + { + let (sent_packets, ctx) = self.declare_fault(user, ConditionCode::CheckLimitReached)?; + if ctx == FsmContext::ResetWhenPossible { + self.reset(); + } + return Ok(sent_packets); + } + Ok(0) } fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> { - let tstate = self.tstate.as_ref().unwrap(); let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), - tstate.remote_cfg.default_crc_type, - self.fparams.file_size, - self.pdu_and_cksum_buffer.get_mut(), + self.tstate_ref().remote_cfg.borrow().default_crc_type, + self.file_params.file_size, + &mut self.pdu_and_cksum_buffer.borrow_mut(), )?; + self.file_params.checksum_completed_file = Some(checksum); self.prepare_and_send_eof_pdu(user, checksum)?; - let tstate = self.tstate.as_ref().unwrap(); - if tstate.transmission_mode == TransmissionMode::Unacknowledged { - if tstate.closure_requested { - self.countdown = Some(self.timer_creator.create_countdown( + if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { + if self.tstate_ref().closure_requested.get() { + *self.check_timer.borrow_mut() = Some(self.timer_creator.create_countdown( crate::TimerContext::CheckLimit { local_id: self.local_cfg.id, - remote_id: tstate.remote_cfg.entity_id, + remote_id: self.tstate_ref().remote_cfg.borrow().entity_id, entity_type: EntityType::Sending, }, )); - self.state_helper.step = TransactionStep::WaitingForFinished; + self.set_step(TransactionStep::WaitingForFinished); } else { - self.state_helper.step = TransactionStep::NoticeOfCompletion; + self.set_step(TransactionStep::NoticeOfCompletion); } } else { - // TODO: Start positive ACK procedure. + self.start_positive_ack_procedure(); } - /* - if self.cfg.indication_cfg.eof_sent_indication_required: - assert self._params.transaction_id is not None - self.user.eof_sent_indication(self._params.transaction_id) - if self.transmission_mode == TransmissionMode.UNACKNOWLEDGED: - if self._params.closure_requested: - assert self._params.remote_cfg is not None - self._params.check_timer = ( - self.check_timer_provider.provide_check_timer( - local_entity_id=self.cfg.local_entity_id, - remote_entity_id=self._params.remote_cfg.entity_id, - entity_type=EntityType.SENDING, - ) - ) - self.states.step = TransactionStep.WAITING_FOR_FINISHED - else: - self.states.step = TransactionStep.NOTICE_OF_COMPLETION - else: - self._start_positive_ack_procedure() - */ Ok(()) } + fn start_positive_ack_procedure(&self) { + self.set_step_internal(TransactionStep::WaitingForEofAck); + *self.positive_ack_params.borrow_mut() = Some(PositiveAckParams { + ack_timer: self + .timer_creator + .create_countdown(crate::TimerContext::PositiveAck { + expiry_time: Duration::from_secs( + self.tstate_ref() + .remote_cfg + .borrow() + .positive_ack_timer_interval_seconds as u64, + ), + }), + ack_counter: 0, + }) + } + fn handle_transaction_start( &mut self, cfdp_user: &mut impl CfdpUser, ) -> Result<(), SourceError> { - let tstate = self - .tstate - .as_ref() - .expect("transfer state unexpectedly empty"); if !self.put_request_cacher.has_source_file() { - self.fparams.metadata_only = true; + self.file_params.metadata_only = true; } else { let source_file = self .put_request_cacher @@ -679,31 +820,43 @@ impl< self.put_request_cacher .dest_file() .map_err(SourceError::DestFileNotValidUtf8)?; - self.fparams.file_size = self.vfs.file_size(source_file)?; - if self.fparams.file_size > u32::MAX as u64 { + self.file_params.file_size = self.vfs.file_size(source_file)?; + if self.file_params.file_size > u32::MAX as u64 { self.pdu_conf.file_flag = LargeFileFlag::Large } else { - if self.fparams.file_size == 0 { - self.fparams.empty_file = true; + if self.file_params.file_size == 0 { + self.file_params.empty_file = true; } self.pdu_conf.file_flag = LargeFileFlag::Normal } } - cfdp_user.transaction_indication(&tstate.transaction_id); + cfdp_user.transaction_indication(&self.transaction_id().unwrap()); + Ok(()) + } + + fn prepare_and_send_ack_pdu( + &mut self, + condition_code: ConditionCode, + transaction_status: TransactionStatus, + ) -> Result<(), SourceError> { + let ack_pdu = AckPdu::new( + PduHeader::new_no_file_data(self.pdu_conf, 0), + FileDirectiveType::FinishedPdu, + condition_code, + transaction_status, + )?; + self.pdu_send_helper(&ack_pdu)?; Ok(()) } fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> { - let tstate = self - .tstate - .as_ref() - .expect("transfer state unexpectedly empty"); + let tstate = self.tstate_ref(); let metadata_params = MetadataGenericParams::new( - tstate.closure_requested, - tstate.remote_cfg.default_crc_type, - self.fparams.file_size, + tstate.closure_requested.get(), + tstate.remote_cfg.borrow().default_crc_type, + self.file_params.file_size, ); - if self.fparams.metadata_only { + if self.file_params.metadata_only { let metadata_pdu = MetadataPduCreator::new( PduHeader::new_no_file_data(self.pdu_conf, 0), metadata_params, @@ -724,49 +877,48 @@ impl< } fn file_data_fsm(&mut self) -> Result, SourceError> { - if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged { - // TODO: Handle re-transmission - } - if !self.fparams.metadata_only - && self.fparams.progress < self.fparams.file_size + //if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged { + // TODO: Handle re-transmission + //} + if !self.file_params.metadata_only + && self.file_params.progress < self.file_params.file_size && self.send_progressing_file_data_pdu()? { return Ok(ControlFlow::Break(1)); } - if self.fparams.empty_file || self.fparams.progress >= self.fparams.file_size { + if self.file_params.empty_file || self.file_params.progress >= self.file_params.file_size { // EOF is still expected. - self.state_helper.step = TransactionStep::SendingEof; - self.tstate.as_mut().unwrap().cond_code_eof = Some(ConditionCode::NoError); - } else if self.fparams.metadata_only { + self.set_step(TransactionStep::SendingEof); + self.tstate_ref() + .cond_code_eof + .set(Some(ConditionCode::NoError)); + } else if self.file_params.metadata_only { // Special case: Metadata Only, no EOF required. - if self.tstate.as_ref().unwrap().closure_requested { - self.state_helper.step = TransactionStep::WaitingForFinished; + if self.tstate_ref().closure_requested.get() { + self.set_step(TransactionStep::WaitingForFinished); } else { - self.state_helper.step = TransactionStep::NoticeOfCompletion; + self.set_step(TransactionStep::NoticeOfCompletion); } } Ok(ControlFlow::Continue(())) } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) { - let tstate = self.tstate.as_ref().unwrap(); if self.local_cfg.indication_cfg.transaction_finished { // The first case happens for unacknowledged file copy operation with no closure. - let finished_params = if tstate.finished_params.is_none() { - TransactionFinishedParams { - id: tstate.transaction_id, - condition_code: ConditionCode::NoError, - delivery_code: DeliveryCode::Complete, - file_status: FileStatus::Unreported, - } - } else { - let finished_params = tstate.finished_params.as_ref().unwrap(); - TransactionFinishedParams { - id: tstate.transaction_id, + let finished_params = match self.tstate_ref().finished_params.get() { + Some(finished_params) => TransactionFinishedParams { + id: self.transaction_id().unwrap(), condition_code: finished_params.condition_code, delivery_code: finished_params.delivery_code, file_status: finished_params.file_status, - } + }, + None => TransactionFinishedParams { + id: self.transaction_id().unwrap(), + condition_code: ConditionCode::NoError, + delivery_code: DeliveryCode::Complete, + file_status: FileStatus::Unreported, + }, }; cfdp_user.transaction_finished_indication(&finished_params); } @@ -789,16 +941,22 @@ impl< fn send_progressing_file_data_pdu(&mut self) -> Result { // Should never be called, but use defensive programming here. - if self.fparams.progress >= self.fparams.file_size { + if self.file_params.progress >= self.file_params.file_size { return Ok(false); } - let read_len = if self.fparams.file_size < self.fparams.segment_len { - self.fparams.file_size - } else if self.fparams.progress + self.fparams.segment_len > self.fparams.file_size { - self.fparams.file_size - self.fparams.progress - } else { - self.fparams.segment_len - }; + let read_len = self + .file_params + .segment_len + .min(self.file_params.file_size - self.file_params.progress); + self.prepare_and_send_file_data_pdu(self.file_params.progress, read_len)?; + Ok(true) + } + + fn prepare_and_send_file_data_pdu( + &mut self, + offset: u64, + size: u64, + ) -> Result<(), SourceError> { let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata( PduHeader::new_for_file_data( self.pdu_conf, @@ -806,15 +964,15 @@ impl< SegmentMetadataFlag::NotPresent, SegmentationControl::NoRecordBoundaryPreservation, ), - self.fparams.progress, - read_len, + offset, + size, ); let mut unwritten_pdu = pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?; self.vfs.read_data( self.put_request_cacher.source_file().unwrap(), - self.fparams.progress, - read_len, + offset, + size, unwritten_pdu.file_data_field_mut(), )?; let written_len = unwritten_pdu.finish(); @@ -823,70 +981,28 @@ impl< None, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; - self.fparams.progress += read_len; - /* - """Generic function to prepare a file data PDU. This function can also be used to - re-transmit file data PDUs of segments which were already sent.""" - assert self._put_req is not None - assert self._put_req.source_file is not None - with open(self._put_req.source_file, "rb") as of: - file_data = self.user.vfs.read_from_opened_file(of, offset, read_len) - # TODO: Support for record continuation state not implemented yet. Segment metadata - # flag is therefore always set to False. Segment metadata support also omitted - # for now. Implementing those generically could be done in form of a callback, - # e.g. abstractmethod of this handler as a first way, another one being - # to expect the user to supply some helper class to split up a file - fd_params = FileDataParams( - file_data=file_data, offset=offset, segment_metadata=None - ) - file_data_pdu = FileDataPdu( - pdu_conf=self._params.pdu_conf, params=fd_params - ) - self._add_packet_to_be_sent(file_data_pdu) - */ - /* - """Prepare the next file data PDU, which also progresses the file copy operation. - - :return: True if a packet was prepared, False if PDU handling is done and the next steps - in the Copy File procedure can be performed - """ - # This function should only be called if file segments still need to be sent. - assert self._params.fp.progress < self._params.fp.file_size - if self._params.fp.file_size < self._params.fp.segment_len: - read_len = self._params.fp.file_size - else: - if ( - self._params.fp.progress + self._params.fp.segment_len - > self._params.fp.file_size - ): - read_len = self._params.fp.file_size - self._params.fp.progress - else: - read_len = self._params.fp.segment_len - self._prepare_file_data_pdu(self._params.fp.progress, read_len) - self._params.fp.progress += read_len - */ - Ok(true) + self.file_params.progress += size; + Ok(()) } fn prepare_and_send_eof_pdu( - &mut self, + &self, cfdp_user: &mut impl CfdpUser, checksum: u32, ) -> Result<(), SourceError> { - let tstate = self - .tstate - .as_ref() - .expect("transfer state unexpectedly empty"); let eof_pdu = EofPdu::new( PduHeader::new_no_file_data(self.pdu_conf, 0), - tstate.cond_code_eof.unwrap_or(ConditionCode::NoError), + self.tstate_ref() + .cond_code_eof + .get() + .unwrap_or(ConditionCode::NoError), checksum, - self.fparams.progress, + self.file_params.progress, None, ); self.pdu_send_helper(&eof_pdu)?; if self.local_cfg.indication_cfg.eof_sent { - cfdp_user.eof_sent_indication(&tstate.transaction_id); + cfdp_user.eof_sent_indication(&self.transaction_id().unwrap()); } Ok(()) } @@ -902,196 +1018,184 @@ impl< Ok(()) } - fn handle_finished_pdu(&mut self, pdu_provider: &impl PduProvider) -> Result<(), SourceError> { + fn handle_finished_pdu(&mut self, finished_pdu: &FinishedPduReader) -> Result<(), SourceError> { // Ignore this packet when we are idle. - if self.state_helper.state == State::Idle { + if self.state() == State::Idle { return Ok(()); } - if self.state_helper.step != TransactionStep::WaitingForFinished { + if self.step() != TransactionStep::WaitingForFinished { return Err(SourceError::UnexpectedPdu { pdu_type: PduType::FileDirective, directive_type: Some(FileDirectiveType::FinishedPdu), }); } - let finished_pdu = FinishedPduReader::new(pdu_provider.pdu())?; + let tstate_ref = self.tstate_ref(); // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE // mode. - self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams { + tstate_ref.finished_params.set(Some(FinishedParams { condition_code: finished_pdu.condition_code(), delivery_code: finished_pdu.delivery_code(), file_status: finished_pdu.file_status(), - }); - if self.tstate.as_ref().unwrap().transmission_mode == TransmissionMode::Acknowledged { - // TODO: Send ACK packet here immediately and continue. - //self.state_helper.step = TransactionStep::SendingAckOfFinished; + })); + if tstate_ref.transmission_mode.get() == TransmissionMode::Acknowledged { + self.prepare_and_send_ack_pdu( + finished_pdu.condition_code(), + TransactionStatus::Active, + )?; } - self.state_helper.step = TransactionStep::NoticeOfCompletion; - - /* - if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: - self._prepare_finished_ack_packet(finished_pdu.condition_code) - self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED - else: - self.states.step = TransactionStep.NOTICE_OF_COMPLETION - */ + self.set_step(TransactionStep::NoticeOfCompletion); Ok(()) } - fn handle_nak_pdu(&mut self) {} - - fn handle_keep_alive_pdu(&mut self) {} - - pub fn transaction_id(&self) -> Option { - self.tstate.as_ref().map(|v| v.transaction_id) + fn handle_nak_pdu(&mut self, nak_pdu: &NakPduReader) -> Result { + self.handle_retransmission(nak_pdu) } - /// Returns the [TransmissionMode] for the active file operation. - #[inline] - pub fn transmission_mode(&self) -> Option { - self.tstate.as_ref().map(|v| v.transmission_mode) - } - - /// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when - /// applicable. - pub fn step(&self) -> TransactionStep { - self.state_helper.step - } - - pub fn state(&self) -> State { - self.state_helper.state - } - - pub fn local_cfg(&self) -> &LocalEntityConfig { - &self.local_cfg - } - - fn declare_fault( - &mut self, - user: &mut impl CfdpUser, - cond: ConditionCode, - ) -> Result<(), SourceError> { - // Need to cache those in advance, because a notice of cancellation can reset the handler. - let transaction_id = self.tstate.as_ref().unwrap().transaction_id; - let progress = self.fparams.progress; - let fh = self.local_cfg.fault_handler.get_fault_handler(cond); - match fh { - spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => { - if let ControlFlow::Break(_) = self.notice_of_cancellation(user, cond)? { - return Ok(()); - } - } - spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => { - self.notice_of_suspension(); - } - spacepackets::cfdp::FaultHandlerCode::IgnoreError => (), - spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(), + fn handle_ack_pdu(&mut self, ack_pdu: &AckPdu) -> Result<(), SourceError> { + if self.step() != TransactionStep::WaitingForEofAck { + // Drop the packet, wrong state to handle it.. + return Err(SourceError::UnexpectedPdu { + pdu_type: PduType::FileDirective, + directive_type: Some(FileDirectiveType::AckPdu), + }); + } + if ack_pdu.directive_code_of_acked_pdu() == FileDirectiveType::EofPdu { + self.set_step(TransactionStep::WaitingForFinished); + } else { + self.anomalies.invalid_ack_directive_code = + self.anomalies.invalid_ack_directive_code.wrapping_add(1); } - self.local_cfg - .fault_handler - .report_fault(transaction_id, cond, progress); Ok(()) } - fn notice_of_cancellation( + pub fn notice_of_cancellation( &mut self, user: &mut impl CfdpUser, condition_code: ConditionCode, - ) -> Result, SourceError> { - let transaction_id = self.tstate.as_ref().unwrap().transaction_id; + ) -> Result { + let mut sent_packets = 0; + match self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)? { + ControlFlow::Continue(ctx) | ControlFlow::Break(ctx) => { + if ctx == FsmContext::ResetWhenPossible { + self.reset(); + } + } + } + Ok(sent_packets) + } + + fn notice_of_cancellation_internal( + &self, + user: &mut impl CfdpUser, + condition_code: ConditionCode, + sent_packets: &mut u32, + ) -> Result, SourceError> { + let transaction_id = self.transaction_id().unwrap(); // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring // the EOF (cancel) PDU must result in abandonment of the transaction. - if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof { + if let Some(cond_code_eof) = self.tstate_ref().cond_code_eof.get() { if cond_code_eof != ConditionCode::NoError { // Still call the abandonment callback to ensure the fault is logged. self.local_cfg .fault_handler .user_hook - .get_mut() - .abandoned_cb(transaction_id, cond_code_eof, self.fparams.progress); - self.abandon_transaction(); - return Ok(ControlFlow::Break(())); + .borrow_mut() + .abandoned_cb(transaction_id, cond_code_eof, self.file_params.progress); + return Ok(ControlFlow::Break(FsmContext::ResetWhenPossible)); } } - let tstate = self.tstate.as_mut().unwrap(); - tstate.cond_code_eof = Some(condition_code); + self.tstate_ref().cond_code_eof.set(Some(condition_code)); // As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply // the checksum for the file copy progress sent so far. let checksum = self.vfs.calculate_checksum( self.put_request_cacher.source_file().unwrap(), - tstate.remote_cfg.default_crc_type, - self.fparams.progress, - self.pdu_and_cksum_buffer.get_mut(), + self.tstate_ref().remote_cfg.borrow().default_crc_type, + self.file_params.progress, + &mut self.pdu_and_cksum_buffer.borrow_mut(), )?; self.prepare_and_send_eof_pdu(user, checksum)?; + *sent_packets += 1; if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { // We are done. - self.reset(); + Ok(ControlFlow::Continue(FsmContext::ResetWhenPossible)) } else { - self.state_helper.step = TransactionStep::WaitingForEofAck; + self.start_positive_ack_procedure(); + Ok(ControlFlow::Continue(FsmContext::default())) } - Ok(ControlFlow::Continue(())) } - fn notice_of_suspension(&mut self) {} + pub fn notice_of_suspension(&mut self) { + self.notice_of_suspension_internal(); + } - fn abandon_transaction(&mut self) { + fn notice_of_suspension_internal(&self) {} + + pub fn abandon_transaction(&mut self) { // I guess an abandoned transaction just stops whatever the handler is doing and resets // it to a clean state.. The implementation for this is quite easy. self.reset(); } - /* - def _notice_of_cancellation(self, condition_code: ConditionCode) -> bool: - """Returns whether the fault declaration handler can returns prematurely.""" - # CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring - # the EOF (cancel) PDU must result in abandonment of the transaction. - if ( - self._params.cond_code_eof is not None - and self._params.cond_code_eof != ConditionCode.NO_ERROR - ): - assert self._params.transaction_id is not None - # We still call the abandonment callback to ensure the fault is logged. - self.cfg.default_fault_handlers.abandoned_cb( - self._params.transaction_id, - self._params.cond_code_eof, - self._params.fp.progress, - ) - self._abandon_transaction() - return False - self._params.cond_code_eof = condition_code - # As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply - # the checksum for the file copy progress sent so far. - self._prepare_eof_pdu(self._checksum_calculation(self._params.fp.progress)) - self.states.step = TransactionStep.SENDING_EOF - return True - */ - - /// This function is public to allow completely resetting the handler, but it is explicitely - /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. - /// Resetting the handler might interfere with these mechanisms and lead to unexpected - /// behaviour. - pub fn reset(&mut self) { - self.state_helper = Default::default(); - self.tstate = None; - self.fparams = Default::default(); - self.countdown = None; + // Returns the number of packets sent and a FSM context structure. + fn declare_fault( + &self, + user: &mut impl CfdpUser, + cond: ConditionCode, + ) -> Result<(u32, FsmContext), SourceError> { + let mut sent_packets = 0; + let fh = self.local_cfg.fault_handler.get_fault_handler(cond); + let mut ctx = FsmContext::default(); + match fh { + spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => { + match self.notice_of_cancellation_internal(user, cond, &mut sent_packets)? { + ControlFlow::Continue(ctx_cancellation) => { + ctx = ctx_cancellation; + } + ControlFlow::Break(ctx_cancellation) => { + return Ok((sent_packets, ctx_cancellation)); + } + } + } + spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => { + self.notice_of_suspension_internal(); + } + spacepackets::cfdp::FaultHandlerCode::IgnoreError => (), + spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => { + return Ok((sent_packets, FsmContext::ResetWhenPossible)); + } + } + self.local_cfg.fault_handler.report_fault( + self.transaction_id().unwrap(), + cond, + self.file_params.progress, + ); + Ok((sent_packets, ctx)) } + + // Internal helper function. + fn tstate_ref(&self) -> &TransferState { + self.transfer_state + .as_ref() + .expect("transfer state should be set in busy state") + } + + fn handle_keep_alive_pdu(&mut self) {} } #[cfg(test)] mod tests { - use core::time::Duration; - use std::{fs::OpenOptions, io::Write, path::PathBuf, thread, vec::Vec}; + use std::{fs::OpenOptions, io::Write, path::PathBuf, vec::Vec}; use alloc::string::String; use rand::Rng; use spacepackets::{ cfdp::{ + ChecksumType, CrcFlag, pdu::{ file_data::FileDataPdu, finished::FinishedPduCreator, metadata::MetadataPduReader, + nak::NakPduCreator, }, - ChecksumType, CrcFlag, }, util::UnsignedByteFieldU16, }; @@ -1099,14 +1203,16 @@ mod tests { use super::*; use crate::{ + CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, filestore::NativeFilestore, request::PutRequestOwned, source::TransactionStep, - tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler}, - FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown, - StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32, + tests::{ + SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, TestCheckTimerCreator, + TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table, + }, }; - use spacepackets::seq_count::SeqCountProviderSimple; + use spacepackets::seq_count::SequenceCounterSimple; const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); @@ -1124,13 +1230,15 @@ mod tests { TestFaultHandler, NativeFilestore, StdRemoteEntityConfigProvider, - StdTimerCreator, - StdCountdown, - SeqCountProviderSimple, + TestCheckTimerCreator, + TestCheckTimer, + SequenceCounterSimple, >; struct SourceHandlerTestbench { handler: TestSourceHandler, + expiry_control: TimerExpiryControl, + transmission_mode: TransmissionMode, #[allow(dead_code)] srcfile_handle: TempPath, srcfile: String, @@ -1142,29 +1250,48 @@ mod tests { #[allow(dead_code)] struct TransferInfo { id: TransactionId, + file_size: u64, closure_requested: bool, pdu_header: PduHeader, } + #[derive(Debug, Clone, Copy)] + struct EofParams { + file_size: u64, + file_checksum: u32, + condition_code: ConditionCode, + } + + impl EofParams { + pub const fn new_success(file_size: u64, file_checksum: u32) -> Self { + Self { + file_size, + file_checksum, + condition_code: ConditionCode::NoError, + } + } + } + impl SourceHandlerTestbench { fn new( + transmission_mode: TransmissionMode, crc_on_transmission_by_default: bool, - test_fault_handler: TestFaultHandler, - test_packet_sender: TestCfdpSender, max_packet_len: usize, ) -> Self { let local_entity_cfg = LocalEntityConfig { id: LOCAL_ID.into(), indication_cfg: IndicationConfig::default(), - fault_handler: FaultHandler::new(test_fault_handler), + fault_handler: FaultHandler::new(TestFaultHandler::default()), }; let static_put_request_cacher = StaticPutRequestCacher::new(2048); let (srcfile_handle, destfile) = init_full_filepaths_textfile(); let srcfile = String::from(srcfile_handle.to_path_buf().to_str().unwrap()); + let expiry_control = TimerExpiryControl::default(); + let sender = TestCfdpSender::default(); Self { handler: SourceHandler::new( local_entity_cfg, - test_packet_sender, + sender, NativeFilestore::default(), static_put_request_cacher, 1024, @@ -1173,9 +1300,11 @@ mod tests { max_packet_len, crc_on_transmission_by_default, ), - StdTimerCreator::new(core::time::Duration::from_millis(100)), - SeqCountProviderSimple::default(), + TestCheckTimerCreator::new(&expiry_control), + SequenceCounterSimple::default(), ), + transmission_mode, + expiry_control, srcfile_handle, srcfile, destfile: String::from(destfile.to_path_buf().to_str().unwrap()), @@ -1193,10 +1322,6 @@ mod tests { ) } - fn set_check_limit_timeout(&mut self, timeout: Duration) { - self.handler.timer_creator.check_limit_timeout = timeout; - } - fn put_request( &mut self, put_request: &impl ReadablePutRequest, @@ -1243,7 +1368,7 @@ mod tests { assert_eq!(pdu_header.common_pdu_conf().crc_flag, crc_flag); assert_eq!( pdu_header.common_pdu_conf().trans_mode, - TransmissionMode::Unacknowledged + self.transmission_mode ); assert_eq!( pdu_header.common_pdu_conf().direction, @@ -1256,12 +1381,32 @@ mod tests { assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2); } + fn nak_for_file_segments( + &mut self, + cfdp_user: &mut TestCfdpUser, + transfer_info: &TransferInfo, + seg_reqs: &[(u32, u32)], + ) { + let nak_pdu = NakPduCreator::new( + transfer_info.pdu_header, + 0, + transfer_info.file_size as u32, + seg_reqs, + ) + .unwrap(); + let nak_pdu_vec = nak_pdu.to_vec().unwrap(); + let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap(); + self.handler + .state_machine(cfdp_user, Some(&packet_info)) + .unwrap(); + } + fn generic_file_transfer( &mut self, cfdp_user: &mut TestCfdpUser, with_closure: bool, file_data: Vec, - ) -> (PduHeader, u32) { + ) -> (TransferInfo, u32) { let mut digest = CRC_32.digest(); digest.update(&file_data); let checksum = digest.finalize(); @@ -1272,11 +1417,11 @@ mod tests { REMOTE_ID.into(), &self.srcfile, &self.destfile, - Some(TransmissionMode::Unacknowledged), + Some(self.transmission_mode), Some(with_closure), ) .expect("creating put request failed"); - let transaction_info = self.common_no_acked_file_transfer( + let transaction_info = self.common_file_transfer_init_with_metadata_check( cfdp_user, put_request, cfdp_user.expected_file_size, @@ -1299,17 +1444,21 @@ mod tests { self.common_eof_pdu_check( cfdp_user, transaction_info.closure_requested, - cfdp_user.expected_file_size, - checksum, + EofParams { + file_size: cfdp_user.expected_file_size, + file_checksum: checksum, + condition_code: ConditionCode::NoError, + }, + 1, ); - (transaction_info.pdu_header, fd_pdus) + (transaction_info, fd_pdus) } - fn common_no_acked_file_transfer( + fn common_file_transfer_init_with_metadata_check( &mut self, cfdp_user: &mut TestCfdpUser, put_request: PutRequestOwned, - filesize: u64, + file_size: u64, ) -> TransferInfo { assert_eq!(cfdp_user.transaction_indication_call_count, 0); assert_eq!(cfdp_user.eof_sent_call_count, 0); @@ -1318,7 +1467,7 @@ mod tests { .expect("put_request call failed"); assert_eq!(self.handler.state(), State::Busy); assert_eq!(self.handler.step(), TransactionStep::Idle); - let id = self.handler.transaction_id().unwrap(); + let transaction_id = self.handler.transaction_id().unwrap(); let sent_packets = self .handler .state_machine_no_packet(cfdp_user) @@ -1327,6 +1476,30 @@ mod tests { assert!(!self.pdu_queue_empty()); let next_pdu = self.get_next_sent_pdu().unwrap(); assert!(!self.pdu_queue_empty()); + let metadata_pdu_reader = self.metadata_check(&next_pdu, file_size); + let closure_requested = if let Some(closure_requested) = put_request.closure_requested { + assert_eq!( + metadata_pdu_reader.metadata_params().closure_requested, + closure_requested + ); + closure_requested + } else { + assert!(metadata_pdu_reader.metadata_params().closure_requested); + metadata_pdu_reader.metadata_params().closure_requested + }; + TransferInfo { + pdu_header: *metadata_pdu_reader.pdu_header(), + closure_requested, + file_size, + id: transaction_id, + } + } + + fn metadata_check<'a>( + &self, + next_pdu: &'a SentPdu, + file_size: u64, + ) -> MetadataPduReader<'a> { assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type, @@ -1334,7 +1507,6 @@ mod tests { ); let metadata_pdu = MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format"); - let pdu_header = metadata_pdu.pdu_header(); self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc); assert_eq!( metadata_pdu @@ -1352,27 +1524,14 @@ mod tests { .unwrap(), self.destfile ); - assert_eq!(metadata_pdu.metadata_params().file_size, filesize); + assert_eq!(metadata_pdu.metadata_params().file_size, file_size); assert_eq!( metadata_pdu.metadata_params().checksum_type, ChecksumType::Crc32 ); - let closure_requested = if let Some(closure_requested) = put_request.closure_requested { - assert_eq!( - metadata_pdu.metadata_params().closure_requested, - closure_requested - ); - closure_requested - } else { - assert!(metadata_pdu.metadata_params().closure_requested); - metadata_pdu.metadata_params().closure_requested - }; + assert_eq!(metadata_pdu.transmission_mode(), self.transmission_mode); assert_eq!(metadata_pdu.options(), &[]); - TransferInfo { - pdu_header: *pdu_header, - closure_requested, - id, - } + metadata_pdu } fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) { @@ -1386,12 +1545,50 @@ mod tests { assert!(fd_pdu.segment_metadata().is_none()); } + fn acknowledge_eof_pdu( + &mut self, + cfdp_user: &mut impl CfdpUser, + transaction_info: &TransferInfo, + ) { + let ack_pdu = AckPdu::new( + transaction_info.pdu_header, + FileDirectiveType::EofPdu, + ConditionCode::NoError, + TransactionStatus::Active, + ) + .expect("creating ACK PDU failed"); + let ack_pdu_vec = ack_pdu.to_vec().unwrap(); + let packet_info = PduRawWithInfo::new(&ack_pdu_vec).unwrap(); + self.handler + .state_machine(cfdp_user, Some(&packet_info)) + .expect("state machine failed"); + } + + fn common_finished_pdu_ack_check(&mut self) { + assert!(!self.pdu_queue_empty()); + let next_pdu = self.get_next_sent_pdu().unwrap(); + assert!(self.pdu_queue_empty()); + assert_eq!(next_pdu.pdu_type, PduType::FileDirective); + assert_eq!( + next_pdu.file_directive_type, + Some(FileDirectiveType::AckPdu) + ); + let ack_pdu = AckPdu::from_bytes(&next_pdu.raw_pdu).unwrap(); + self.common_pdu_check_for_file_transfer(ack_pdu.pdu_header(), CrcFlag::NoCrc); + assert_eq!(ack_pdu.condition_code(), ConditionCode::NoError); + assert_eq!( + ack_pdu.directive_code_of_acked_pdu(), + FileDirectiveType::FinishedPdu + ); + assert_eq!(ack_pdu.transaction_status(), TransactionStatus::Active); + } + fn common_eof_pdu_check( &mut self, cfdp_user: &mut TestCfdpUser, closure_requested: bool, - filesize: u64, - checksum: u32, + eof_params: EofParams, + eof_sent_call_count: u32, ) { let next_pdu = self.get_next_sent_pdu().unwrap(); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); @@ -1401,9 +1598,9 @@ mod tests { ); let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format"); self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc); - assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError); - assert_eq!(eof_pdu.file_size(), filesize); - assert_eq!(eof_pdu.file_checksum(), checksum); + assert_eq!(eof_pdu.condition_code(), eof_params.condition_code); + assert_eq!(eof_pdu.file_size(), eof_params.file_size); + assert_eq!(eof_pdu.file_checksum(), eof_params.file_checksum); assert_eq!( eof_pdu .pdu_header() @@ -1412,15 +1609,21 @@ mod tests { .value_const(), 0 ); - if !closure_requested { - assert_eq!(self.handler.state(), State::Idle); - assert_eq!(self.handler.step(), TransactionStep::Idle); + if self.transmission_mode == TransmissionMode::Unacknowledged { + if !closure_requested { + assert_eq!(self.handler.state(), State::Idle); + assert_eq!(self.handler.step(), TransactionStep::Idle); + } else { + assert_eq!(self.handler.state(), State::Busy); + assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished); + } } else { assert_eq!(self.handler.state(), State::Busy); - assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished); + assert_eq!(self.handler.step(), TransactionStep::WaitingForEofAck); } + assert_eq!(cfdp_user.transaction_indication_call_count, 1); - assert_eq!(cfdp_user.eof_sent_call_count, 1); + assert_eq!(cfdp_user.eof_sent_call_count, eof_sent_call_count); self.all_fault_queues_empty(); } @@ -1428,7 +1631,7 @@ mod tests { &mut self, cfdp_user: &mut TestCfdpUser, with_closure: bool, - ) -> PduHeader { + ) -> (&'static str, TransferInfo) { let mut file = OpenOptions::new() .write(true) .open(&self.srcfile) @@ -1437,18 +1640,19 @@ mod tests { file.write_all(content_str.as_bytes()) .expect("writing file content failed"); drop(file); - let (pdu_header, fd_pdus) = self.generic_file_transfer( + let (transfer_info, fd_pdus) = self.generic_file_transfer( cfdp_user, with_closure, content_str.as_bytes().to_vec(), ); assert_eq!(fd_pdus, 1); - pdu_header + (content_str, transfer_info) } - fn finish_handling(&mut self, user: &mut TestCfdpUser, pdu_header: PduHeader) { + // Finish handling: Simulate completion from the destination side by insert finished PDU. + fn finish_handling(&mut self, user: &mut TestCfdpUser, transfer_info: &TransferInfo) { let finished_pdu = FinishedPduCreator::new_default( - pdu_header, + transfer_info.pdu_header, DeliveryCode::Complete, FileStatus::Retained, ); @@ -1472,19 +1676,15 @@ mod tests { #[test] fn test_basic() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + let tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); assert!(tb.handler.transmission_mode().is_none()); assert!(tb.pdu_queue_empty()); } #[test] fn test_empty_file_transfer_not_acked_no_closure() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); - let filesize = 0; + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1493,41 +1693,78 @@ mod tests { Some(false), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), + 1, ) } + #[test] + fn test_empty_file_transfer_acked() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transaction_info.closure_requested, + EofParams::new_success(file_size, CRC_32.digest().finalize()), + 1, + ); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transaction_info); + tb.finish_handling(&mut cfdp_user, &transaction_info); + tb.common_finished_pdu_ack_check(); + } + #[test] fn test_tiny_file_transfer_not_acked_no_closure() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); let mut cfdp_user = TestCfdpUser::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); tb.common_tiny_file_transfer(&mut cfdp_user, false); } #[test] - fn test_tiny_file_transfer_not_acked_with_closure() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + fn test_tiny_file_transfer_acked() { let mut cfdp_user = TestCfdpUser::default(); - let pdu_header = tb.common_tiny_file_transfer(&mut cfdp_user, true); - tb.finish_handling(&mut cfdp_user, pdu_header) + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, false); + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_tiny_file_transfer_not_acked_with_closure() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); + let mut cfdp_user = TestCfdpUser::default(); + let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); + tb.finish_handling(&mut cfdp_user, &transfer_info) } #[test] fn test_two_segment_file_transfer_not_acked_no_closure() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); let mut cfdp_user = TestCfdpUser::default(); let mut file = OpenOptions::new() .write(true) @@ -1544,9 +1781,7 @@ mod tests { #[test] fn test_two_segment_file_transfer_not_acked_with_closure() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); let mut cfdp_user = TestCfdpUser::default(); let mut file = OpenOptions::new() .write(true) @@ -1557,18 +1792,37 @@ mod tests { file.write_all(&rand_data) .expect("writing file content failed"); drop(file); - let (pdu_header, fd_pdus) = + let (transfer_info, fd_pdus) = tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec()); assert_eq!(fd_pdus, 2); - tb.finish_handling(&mut cfdp_user, pdu_header) + tb.finish_handling(&mut cfdp_user, &transfer_info) + } + + #[test] + fn test_two_segment_file_transfer_acked() { + let mut cfdp_user = TestCfdpUser::default(); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); + let mut file = OpenOptions::new() + .write(true) + .open(&tb.srcfile) + .expect("opening file failed"); + let mut rand_data = [0u8; 140]; + rand::rng().fill(&mut rand_data[..]); + file.write_all(&rand_data) + .expect("writing file content failed"); + drop(file); + let (transfer_info, fd_pdus) = + tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec()); + assert_eq!(fd_pdus, 2); + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); } #[test] fn test_empty_file_transfer_not_acked_with_closure() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); - let filesize = 0; + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1577,23 +1831,24 @@ mod tests { Some(true), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), + 1, ); - tb.finish_handling(&mut cfdp_user, transaction_info.pdu_header) + tb.finish_handling(&mut cfdp_user, &transaction_info) } #[test] fn test_put_request_no_remote_cfg() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); let (srcfile, destfile) = init_full_filepaths_textfile(); let srcfile_str = String::from(srcfile.to_str().unwrap()); @@ -1618,9 +1873,7 @@ mod tests { #[test] fn test_put_request_file_does_not_exist() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); let file_which_does_not_exist = "/tmp/this_file_does_not_exist.txt"; let destfile = "/tmp/tmp.txt"; @@ -1642,11 +1895,8 @@ mod tests { #[test] fn test_finished_pdu_check_timeout() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); - tb.set_check_limit_timeout(Duration::from_millis(45)); - let filesize = 0; + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); + let file_size = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), &tb.srcfile, @@ -1655,24 +1905,30 @@ mod tests { Some(true), ) .expect("creating put request failed"); - let mut cfdp_user = tb.create_user(0, filesize); - let transaction_info = - tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); let expected_id = tb.handler.transaction_id().unwrap(); tb.common_eof_pdu_check( &mut cfdp_user, transaction_info.closure_requested, - filesize, - CRC_32.digest().finalize(), + EofParams::new_success(file_size, CRC_32.digest().finalize()), + 1, ); - // After 50 ms delay, we run into a timeout, which leads to a check limit error - // declaration -> leads to a notice of cancellation -> leads to an EOF PDU with the - // appropriate error code. - thread::sleep(Duration::from_millis(50)); + assert!(tb.pdu_queue_empty()); + + // Enforce a check limit error by expiring the check limit timer -> leads to a notice of + // cancellation -> leads to an EOF PDU with the appropriate error code. + tb.expiry_control.set_check_limit_expired(); + assert_eq!( tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(), - 0 + 1 ); + assert!(!tb.pdu_queue_empty()); let next_pdu = tb.get_next_sent_pdu().unwrap(); let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format"); tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc); @@ -1694,9 +1950,7 @@ mod tests { #[test] fn test_cancelled_transfer_empty_file() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512); let filesize = 0; let put_request = PutRequestOwned::new_regular_request( REMOTE_ID.into(), @@ -1741,9 +1995,7 @@ mod tests { #[test] fn test_cancelled_transfer_mid_transfer() { - let fault_handler = TestFaultHandler::default(); - let test_sender = TestCfdpSender::default(); - let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128); let mut file = OpenOptions::new() .write(true) .open(&tb.srcfile) @@ -1763,8 +2015,11 @@ mod tests { .expect("creating put request failed"); let file_size = rand_data.len() as u64; let mut cfdp_user = tb.create_user(0, file_size); - let transaction_info = - tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, file_size); + let transaction_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); let mut chunks = rand_data.chunks( calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( &transaction_info.pdu_header, @@ -1781,10 +2036,11 @@ mod tests { let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap(); assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]); let expected_id = tb.handler.transaction_id().unwrap(); - assert!(tb - .handler - .cancel_request(&mut cfdp_user, &expected_id) - .expect("cancellation failed")); + assert!( + tb.handler + .cancel_request(&mut cfdp_user, &expected_id) + .expect("cancellation failed") + ); assert_eq!(tb.handler.state(), State::Idle); assert_eq!(tb.handler.step(), TransactionStep::Idle); let next_packet = tb.get_next_sent_pdu().unwrap(); @@ -1803,4 +2059,307 @@ mod tests { ConditionCode::CancelRequestReceived ); } + + #[test] + fn test_positive_ack_procedure() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let eof_params = EofParams { + file_size, + file_checksum: CRC_32.digest().finalize(), + condition_code: ConditionCode::NoError, + }; + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 1, + ); + + assert!(tb.pdu_queue_empty()); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 2, + ); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_positive_ack_procedure_ack_limit_reached() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize()); + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 1, + ); + + assert!(tb.pdu_queue_empty()); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 2, + ); + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + eof_params.condition_code = ConditionCode::PositiveAckLimitReached; + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 3, + ); + // This boilerplate handling is still expected. In a real-life use-case I would expect + // this to fail as well, leading to a transaction abandonment. This is tested separately. + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_positive_ack_procedure_ack_limit_reached_abandonment() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize()); + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 1, + ); + + assert!(tb.pdu_queue_empty()); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 2, + ); + // Enforce a postive ack timer expiry -> positive ACK limit reached -> Cancel EOF sent. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + eof_params.condition_code = ConditionCode::PositiveAckLimitReached; + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 3, + ); + // Cancellation fault should have been triggered. + let fault_handler = tb.test_fault_handler_mut(); + let fh_ref_mut = fault_handler.get_mut(); + assert!(!fh_ref_mut.cancellation_queue_empty()); + assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1); + let (id, cond_code, progress) = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap(); + assert_eq!(id, transfer_info.id); + assert_eq!(cond_code, ConditionCode::PositiveAckLimitReached); + assert_eq!(progress, file_size); + fh_ref_mut.all_queues_empty(); + + // Enforce a postive ack timer expiry -> leads to a re-send of the EOF Cancel PDU. + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 1); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + eof_params, + 4, + ); + + // Enforce a postive ack timer expiry -> positive ACK limit reached -> Transaction + // abandoned + tb.expiry_control.set_positive_ack_expired(); + let sent_packets = tb + .handler + .state_machine_no_packet(&mut cfdp_user) + .expect("source handler FSM failure"); + assert_eq!(sent_packets, 0); + // Abandonment fault should have been triggered. + let fault_handler = tb.test_fault_handler_mut(); + let fh_ref_mut = fault_handler.get_mut(); + assert!(!fh_ref_mut.abandoned_queue_empty()); + assert_eq!(fh_ref_mut.abandoned_queue.len(), 1); + let (id, cond_code, progress) = fh_ref_mut.abandoned_queue.pop_back().unwrap(); + assert_eq!(id, transfer_info.id); + assert_eq!(cond_code, ConditionCode::PositiveAckLimitReached); + assert_eq!(progress, file_size); + fh_ref_mut.all_queues_empty(); + } + + #[test] + fn test_nak_for_whole_file() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let mut cfdp_user = TestCfdpUser::default(); + let (data, transfer_info) = tb.common_tiny_file_transfer(&mut cfdp_user, true); + let seg_reqs = &[(0, transfer_info.file_size as u32)]; + tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, seg_reqs); + tb.check_next_file_pdu(0, data.as_bytes()); + tb.all_fault_queues_empty(); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_nak_for_file_segment() { + let mut cfdp_user = TestCfdpUser::default(); + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128); + let mut file = OpenOptions::new() + .write(true) + .open(&tb.srcfile) + .expect("opening file failed"); + let mut rand_data = [0u8; 140]; + rand::rng().fill(&mut rand_data[..]); + file.write_all(&rand_data) + .expect("writing file content failed"); + drop(file); + let (transfer_info, fd_pdus) = + tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec()); + assert_eq!(fd_pdus, 2); + tb.nak_for_file_segments(&mut cfdp_user, &transfer_info, &[(0, 90)]); + tb.check_next_file_pdu(0, &rand_data[0..90]); + tb.all_fault_queues_empty(); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } + + #[test] + fn test_nak_for_metadata() { + let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512); + let file_size = 0; + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Acknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, file_size); + let transfer_info = tb.common_file_transfer_init_with_metadata_check( + &mut cfdp_user, + put_request, + file_size, + ); + tb.common_eof_pdu_check( + &mut cfdp_user, + transfer_info.closure_requested, + EofParams::new_success(file_size, CRC_32.digest().finalize()), + 1, + ); + + // NAK to cause re-transmission of metadata PDU. + let nak_pdu = NakPduCreator::new( + transfer_info.pdu_header, + 0, + transfer_info.file_size as u32, + &[(0, 0)], + ) + .unwrap(); + let nak_pdu_vec = nak_pdu.to_vec().unwrap(); + let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap(); + let sent_packets = tb + .handler + .state_machine(&mut cfdp_user, Some(&packet_info)) + .unwrap(); + assert_eq!(sent_packets, 1); + let next_pdu = tb.get_next_sent_pdu().unwrap(); + // Check the metadata PDU. + tb.metadata_check(&next_pdu, file_size); + tb.all_fault_queues_empty(); + + tb.acknowledge_eof_pdu(&mut cfdp_user, &transfer_info); + tb.finish_handling(&mut cfdp_user, &transfer_info); + tb.common_finished_pdu_ack_check(); + } } diff --git a/src/time.rs b/src/time.rs index abd3fac..23d69d2 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,6 +1,6 @@ use core::fmt::Debug; -/// Generic abstraction for a check/countdown timer. +/// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone. pub trait CountdownProvider: Debug { fn has_expired(&self) -> bool; fn reset(&mut self); diff --git a/src/user.rs b/src/user.rs index f0a0931..6d058f4 100644 --- a/src/user.rs +++ b/src/user.rs @@ -2,12 +2,12 @@ use spacepackets::cfdp::tlv::WritableTlv; use spacepackets::{ cfdp::{ + ConditionCode, pdu::{ file_data::SegmentMetadata, finished::{DeliveryCode, FileStatus}, }, tlv::msg_to_user::MsgToUserTlv, - ConditionCode, }, util::UnsignedByteField, }; diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 0f5bbca..2f7ba87 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -2,23 +2,26 @@ use std::{ fs::OpenOptions, io::Write, - sync::{atomic::AtomicBool, mpsc, Arc}, + sync::{ + Arc, + atomic::{AtomicBool, AtomicU16}, + mpsc, + }, thread, time::Duration, }; use cfdp::{ + EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig, + StdTimerCreator, TransactionId, UserFaultHookProvider, dest::DestinationHandler, filestore::NativeFilestore, request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, - EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig, - StdTimerCreator, TransactionId, UserFaultHookProvider, }; use spacepackets::{ cfdp::{ChecksumType, ConditionCode, TransmissionMode}, - seq_count::SeqCountProviderSyncU16, util::UnsignedByteFieldU16, }; @@ -194,7 +197,7 @@ fn end_to_end_test(with_closure: bool) { spacepackets::cfdp::TransmissionMode::Unacknowledged, ChecksumType::Crc32, ); - let seq_count_provider = SeqCountProviderSyncU16::default(); + let seq_count_provider = AtomicU16::default(); let mut source_handler = SourceHandler::new( local_cfg_source, source_tx,