Add acknowledged source handler

This commit is contained in:
Robin Mueller
2025-08-14 16:29:40 +02:00
parent fa4657274f
commit ada26f626e
15 changed files with 1404 additions and 731 deletions

View File

@@ -29,7 +29,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.81.0 - uses: dtolnay/rust-toolchain@1.85.0
- run: cargo check --release - run: cargo check --release
cross-check: cross-check:
@@ -45,7 +45,7 @@ jobs:
- uses: dtolnay/rust-toolchain@stable - uses: dtolnay/rust-toolchain@stable
with: with:
targets: "armv7-unknown-linux-gnueabihf, thumbv7em-none-eabihf" targets: "armv7-unknown-linux-gnueabihf, thumbv7em-none-eabihf"
- run: cargo check --release --target=${{matrix.target}} --no-default-features - run: cargo check --release --target=${{matrix.target}} --no-default-features --features "alloc"
fmt: fmt:
name: Check formatting name: Check formatting

View File

@@ -1,8 +1,8 @@
[package] [package]
name = "cfdp-rs" name = "cfdp-rs"
version = "0.2.0" version = "0.2.0"
edition = "2021" edition = "2024"
rust-version = "1.81.0" rust-version = "1.85.0"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"] authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
description = "High level CCSDS File Delivery Protocol components" description = "High level CCSDS File Delivery Protocol components"
homepage = "https://egit.irs.uni-stuttgart.de/rust/cfdp" homepage = "https://egit.irs.uni-stuttgart.de/rust/cfdp"
@@ -20,7 +20,7 @@ crc = "3"
smallvec = "1" smallvec = "1"
derive-new = ">=0.6, <=0.7" derive-new = ">=0.6, <=0.7"
hashbrown = { version = ">=0.14, <=0.15", optional = true } hashbrown = { version = ">=0.14, <=0.15", optional = true }
spacepackets = { version = "0.15", default-features = false } spacepackets = { git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git", version = "0.16", default-features = false }
thiserror = { version = "2", default-features = false } thiserror = { version = "2", default-features = false }
serde = { version = "1", optional = true } serde = { version = "1", optional = true }
defmt = { version = "1", optional = true } defmt = { version = "1", optional = true }

View File

@@ -17,10 +17,11 @@ The underlying base packet library used to generate the packets to be sent is th
`cfdp-rs` currently supports following high-level features: `cfdp-rs` currently supports following high-level features:
- Unacknowledged (class 1) file transfers for both source and destination side. - Unacknowledged (class 1) file transfers for both source and destination side.
- Acknowledged (class 2) file transfers for the source side.
The following features have not been implemented yet. PRs or notifications for demand are welcome! The following features have not been implemented yet. PRs or notifications for demand are welcome!
- Acknowledged (class 2) file transfers for both source and destination side. - Acknowledged (class 2) file transfers for the destination side.
- Suspending transfers - Suspending transfers
- Inactivity handling - Inactivity handling
- Start and end of transmission and reception opportunity handling - Start and end of transmission and reception opportunity handling

View File

@@ -1,3 +0,0 @@
#!/bin/sh
export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options"
cargo +nightly doc --all-features --open

View File

@@ -3,31 +3,35 @@ use std::{
fs::OpenOptions, fs::OpenOptions,
io::{self, ErrorKind, Write}, io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::mpsc, sync::{
atomic::{AtomicBool, AtomicU16},
mpsc,
},
thread, thread,
time::Duration, time::Duration,
}; };
use cfdp::{ use cfdp::{
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider,
dest::DestinationHandler, dest::DestinationHandler,
filestore::NativeFilestore, filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher}, request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler, source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider,
}; };
use clap::Parser; use clap::Parser;
use log::{debug, info, warn}; use log::{debug, info, warn};
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError},
ChecksumType, ConditionCode, TransmissionMode, ChecksumType, ConditionCode, TransmissionMode,
pdu::{PduError, file_data::FileDataPdu, metadata::MetadataPduReader},
}, },
seq_count::SeqCountProviderSyncU16,
util::{UnsignedByteFieldU16, UnsignedEnum}, util::{UnsignedByteFieldU16, UnsignedEnum},
}; };
static KILL_APP: AtomicBool = AtomicBool::new(false);
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
@@ -231,7 +235,7 @@ impl UdpServer {
Ok(None) Ok(None)
} else { } else {
Err(e.into()) Err(e.into())
} };
} }
}; };
let (_, from) = res; let (_, from) = res;
@@ -338,7 +342,7 @@ fn main() {
spacepackets::cfdp::TransmissionMode::Unacknowledged, spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32C, ChecksumType::Crc32C,
); );
let seq_count_provider = SeqCountProviderSyncU16::default(); let seq_count_provider = AtomicU16::default();
let mut source_handler = SourceHandler::new( let mut source_handler = SourceHandler::new(
local_cfg_source, local_cfg_source,
source_tm_tx, source_tm_tx,
@@ -411,6 +415,9 @@ fn main() {
.expect("put request failed"); .expect("put request failed");
} }
loop { loop {
if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut next_delay = None; let mut next_delay = None;
let mut undelayed_call_count = 0; let mut undelayed_call_count = 0;
let packet_info = match source_tc_rx.try_recv() { let packet_info = match source_tc_rx.try_recv() {
@@ -453,6 +460,9 @@ fn main() {
loop { loop {
let mut next_delay = None; let mut next_delay = None;
let mut undelayed_call_count = 0; let mut undelayed_call_count = 0;
if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let packet_info = match dest_tc_rx.try_recv() { let packet_info = match dest_tc_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info), Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e { Err(e) => match e {
@@ -494,6 +504,9 @@ fn main() {
info!("Starting UDP server on {}", remote_addr); info!("Starting UDP server on {}", remote_addr);
loop { loop {
loop { loop {
if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
match udp_server.try_recv_tc() { match udp_server.try_recv_tc() {
Ok(result) => match result { Ok(result) => match result {
Some((pdu, _addr)) => { Some((pdu, _addr)) => {

34
justfile Normal file
View File

@@ -0,0 +1,34 @@
all: check build clippy fmt docs test coverage
clippy:
cargo clippy -- -D warnings
fmt:
cargo fmt --all -- --check
check:
cargo check --all-features
test:
cargo nextest r --all-features
cargo test --doc
build:
cargo build --all-features
embedded:
cargo build --target thumbv7em-none-eabihf --no-default-features --features "alloc"
docs:
export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options"
cargo +nightly doc --all-features
docs-html:
export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options"
cargo +nightly doc --all-features --open
coverage:
cargo llvm-cov nextest
coverage-html:
cargo llvm-cov nextest --html --open

View File

@@ -29,29 +29,28 @@
//! 3. An EOF ACK PDU has been sent back to the remote side. //! 3. An EOF ACK PDU has been sent back to the remote side.
//! 4. A Finished PDU has been sent back to the remote side. //! 4. A Finished PDU has been sent back to the remote side.
//! 5. A Finished PDU ACK was received. //! 5. A Finished PDU ACK was received.
use crate::{user::TransactionFinishedParams, DummyPduProvider, GenericSendError, PduProvider}; use crate::{DummyPduProvider, GenericSendError, PduProvider, user::TransactionFinishedParams};
use core::str::{from_utf8, from_utf8_unchecked, Utf8Error}; use core::str::{Utf8Error, from_utf8, from_utf8_unchecked};
use super::{ use super::{
filestore::{FilestoreError, NativeFilestore, VirtualFilestore},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider, CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, PduSendProvider,
RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCountdown, RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TimerCreatorProvider,
StdRemoteEntityConfigProvider, StdTimerCreator, TimerContext, TimerCreatorProvider,
TransactionId, UserFaultHookProvider, TransactionId, UserFaultHookProvider,
filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
}; };
use smallvec::SmallVec; use smallvec::SmallVec;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode,
pdu::{ pdu::{
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader,
eof::EofPdu, eof::EofPdu,
file_data::FileDataPdu, file_data::FileDataPdu,
finished::{DeliveryCode, FileStatus, FinishedPduCreator}, finished::{DeliveryCode, FileStatus, FinishedPduCreator},
metadata::{MetadataGenericParams, MetadataPduReader}, metadata::{MetadataGenericParams, MetadataPduReader},
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
}, },
tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, GenericTlv, ReadableTlv, TlvType}, tlv::{EntityIdTlv, GenericTlv, ReadableTlv, TlvType, msg_to_user::MsgToUserTlv},
ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode,
}, },
util::{UnsignedByteField, UnsignedEnum}, util::{UnsignedByteField, UnsignedEnum},
}; };
@@ -278,20 +277,20 @@ pub struct DestinationHandler<
pub type StdDestinationHandler<PduSender, UserFaultHook> = DestinationHandler< pub type StdDestinationHandler<PduSender, UserFaultHook> = DestinationHandler<
PduSender, PduSender,
UserFaultHook, UserFaultHook,
NativeFilestore, crate::filestore::NativeFilestore,
StdRemoteEntityConfigProvider, crate::StdRemoteEntityConfigProvider,
StdTimerCreator, crate::StdTimerCreator,
StdCountdown, crate::StdCountdown,
>; >;
impl< impl<
PduSender: PduSendProvider, PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider, UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore, Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider, RemoteCfgTable: RemoteEntityConfigProvider,
TimerCreator: TimerCreatorProvider<Countdown = Countdown>, TimerCreator: TimerCreatorProvider<Countdown = Countdown>,
Countdown: CountdownProvider, Countdown: CountdownProvider,
> DestinationHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable, TimerCreator, Countdown> > DestinationHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable, TimerCreator, Countdown>
{ {
/// Constructs a new destination handler. /// Constructs a new destination handler.
/// ///
@@ -990,7 +989,6 @@ impl<
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use core::{cell::Cell, sync::atomic::AtomicBool};
#[allow(unused_imports)] #[allow(unused_imports)]
use std::println; use std::println;
use std::{ use std::{
@@ -999,80 +997,28 @@ mod tests {
string::String, string::String,
}; };
use alloc::{sync::Arc, vec::Vec}; use alloc::vec::Vec;
use rand::Rng; use rand::Rng;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
lv::Lv,
pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket},
ChecksumType, TransmissionMode, ChecksumType, TransmissionMode,
lv::Lv,
pdu::{WritablePduPacket, finished::FinishedPduReader, metadata::MetadataPduCreator},
}, },
util::{UbfU16, UnsignedByteFieldU8}, util::{UbfU16, UnsignedByteFieldU8},
}; };
use crate::{ use crate::{
CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider,
filestore::NativeFilestore, filestore::NativeFilestore,
tests::{ tests::{
basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler, LOCAL_ID, REMOTE_ID, SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer,
LOCAL_ID, REMOTE_ID, TestCheckTimerCreator, TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table,
}, },
CountdownProvider, FaultHandler, IndicationConfig, PduRawWithInfo,
StdRemoteEntityConfigProvider, TimerCreatorProvider, CRC_32,
}; };
use super::*; use super::*;
#[derive(Debug)]
struct TestCheckTimer {
counter: Cell<u32>,
expired: Arc<AtomicBool>,
}
impl CountdownProvider for TestCheckTimer {
fn has_expired(&self) -> bool {
self.expired.load(core::sync::atomic::Ordering::Relaxed)
}
fn reset(&mut self) {
self.counter.set(0);
}
}
impl TestCheckTimer {
pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
Self {
counter: Cell::new(0),
expired: expired_flag,
}
}
}
struct TestCheckTimerCreator {
check_limit_expired_flag: Arc<AtomicBool>,
}
impl TestCheckTimerCreator {
pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
Self {
check_limit_expired_flag: expired_flag,
}
}
}
impl TimerCreatorProvider for TestCheckTimerCreator {
type Countdown = TestCheckTimer;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown {
match timer_context {
TimerContext::CheckLimit { .. } => {
TestCheckTimer::new(self.check_limit_expired_flag.clone())
}
_ => {
panic!("invalid check timer creator, can only be used for check limit handling")
}
}
}
}
type TestDestHandler = DestinationHandler< type TestDestHandler = DestinationHandler<
TestCfdpSender, TestCfdpSender,
TestFaultHandler, TestFaultHandler,
@@ -1083,7 +1029,7 @@ mod tests {
>; >;
struct DestHandlerTestbench { struct DestHandlerTestbench {
check_timer_expired: Arc<AtomicBool>, expiry_control: TimerExpiryControl,
handler: TestDestHandler, handler: TestDestHandler,
src_path: PathBuf, src_path: PathBuf,
dest_path: PathBuf, dest_path: PathBuf,
@@ -1110,12 +1056,11 @@ mod tests {
src_path: PathBuf, src_path: PathBuf,
dest_path: PathBuf, dest_path: PathBuf,
) -> Self { ) -> Self {
let check_timer_expired = Arc::new(AtomicBool::new(false)); let expiry_control = TimerExpiryControl::default();
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let dest_handler = let dest_handler = default_dest_handler(fault_handler, test_sender, &expiry_control);
default_dest_handler(fault_handler, test_sender, check_timer_expired.clone());
let handler = Self { let handler = Self {
check_timer_expired, expiry_control,
handler: dest_handler, handler: dest_handler,
src_path, src_path,
closure_requested, closure_requested,
@@ -1157,8 +1102,9 @@ mod tests {
} }
fn set_check_timer_expired(&mut self) { fn set_check_timer_expired(&mut self) {
self.check_timer_expired self.expiry_control
.store(true, core::sync::atomic::Ordering::Relaxed); .check_limit
.store(true, core::sync::atomic::Ordering::Release);
} }
fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser { fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser {
@@ -1301,7 +1247,7 @@ mod tests {
fn default_dest_handler( fn default_dest_handler(
test_fault_handler: TestFaultHandler, test_fault_handler: TestFaultHandler,
test_packet_sender: TestCfdpSender, test_packet_sender: TestCfdpSender,
check_timer_expired: Arc<AtomicBool>, expiry_control: &TimerExpiryControl,
) -> TestDestHandler { ) -> TestDestHandler {
let local_entity_cfg = LocalEntityConfig { let local_entity_cfg = LocalEntityConfig {
id: REMOTE_ID.into(), id: REMOTE_ID.into(),
@@ -1314,7 +1260,7 @@ mod tests {
test_packet_sender, test_packet_sender,
NativeFilestore::default(), NativeFilestore::default(),
basic_remote_cfg_table(LOCAL_ID, 1024, true), basic_remote_cfg_table(LOCAL_ID, 1024, true),
TestCheckTimerCreator::new(check_timer_expired), TestCheckTimerCreator::new(expiry_control),
) )
} }
@@ -1372,14 +1318,17 @@ mod tests {
fn test_basic() { fn test_basic() {
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); let dest_handler =
default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default());
assert!(dest_handler.transmission_mode().is_none()); assert!(dest_handler.transmission_mode().is_none());
assert!(dest_handler assert!(
.local_cfg dest_handler
.fault_handler .local_cfg
.user_hook .fault_handler
.borrow() .user_hook
.all_queues_empty()); .borrow()
.all_queues_empty()
);
assert!(dest_handler.pdu_sender.queue_empty()); assert!(dest_handler.pdu_sender.queue_empty());
assert_eq!(dest_handler.state(), State::Idle); assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle); assert_eq!(dest_handler.step(), TransactionStep::Idle);
@@ -1389,7 +1338,8 @@ mod tests {
fn test_cancelling_idle_fsm() { fn test_cancelling_idle_fsm() {
let fault_handler = TestFaultHandler::default(); let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default(); let test_sender = TestCfdpSender::default();
let mut dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default()); let mut dest_handler =
default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default());
assert!(!dest_handler.cancel_request(&TransactionId::new( assert!(!dest_handler.cancel_request(&TransactionId::new(
UnsignedByteFieldU8::new(0).into(), UnsignedByteFieldU8::new(0).into(),
UnsignedByteFieldU8::new(0).into() UnsignedByteFieldU8::new(0).into()

View File

@@ -1,5 +1,5 @@
use spacepackets::cfdp::ChecksumType;
use spacepackets::ByteConversionError; use spacepackets::ByteConversionError;
use spacepackets::cfdp::ChecksumType;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use std_mod::*; pub use std_mod::*;
@@ -375,9 +375,11 @@ mod tests {
.create_dir(dir_path.to_str().expect("getting str for file failed")) .create_dir(dir_path.to_str().expect("getting str for file failed"))
.unwrap(); .unwrap();
assert!(NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap()); assert!(NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS assert!(
.is_dir(dir_path.as_path().to_str().unwrap()) NATIVE_FS
.unwrap()); .is_dir(dir_path.as_path().to_str().unwrap())
.unwrap()
);
} }
#[test] #[test]

View File

@@ -5,13 +5,14 @@
//! //!
//! # Features //! # Features
//! //!
//! The crate currently supports following features: //! `cfdp-rs` currently supports following high-level features:
//! //!
//! - Unacknowledged (class 1) file transfers for both source and destination side. //! - Unacknowledged (class 1) file transfers for both source and destination side.
//! - Acknowledged (class 2) file transfers for the source side.
//! //!
//! The following features have not been implemented yet. PRs or notifications for demand are welcome! //! The following features have not been implemented yet. PRs or notifications for demand are welcome!
//! //!
//! - Acknowledged (class 2) file transfers for both source and destination side. //! - Acknowledged (class 2) file transfers for the destination side.
//! - Suspending transfers //! - Suspending transfers
//! - Inactivity handling //! - Inactivity handling
//! - Start and end of transmission and reception opportunity handling //! - Start and end of transmission and reception opportunity handling
@@ -100,7 +101,7 @@ pub mod user;
use crate::time::CountdownProvider; use crate::time::CountdownProvider;
use core::{cell::RefCell, fmt::Debug, hash::Hash}; use core::{cell::RefCell, fmt::Debug, hash::Hash};
use crc::{Crc, CRC_32_ISCSI, CRC_32_ISO_HDLC}; use crc::{CRC_32_ISCSI, CRC_32_ISO_HDLC, Crc};
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
@@ -109,8 +110,8 @@ use core::time::Duration;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
pdu::{FileDirectiveType, PduError, PduHeader},
ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode, ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode,
pdu::{FileDirectiveType, PduError, PduHeader},
}, },
util::{UnsignedByteField, UnsignedEnum}, util::{UnsignedByteField, UnsignedEnum},
}; };
@@ -619,7 +620,6 @@ impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> {
} }
/// Generic error type for sending a PDU via a message queue. /// Generic error type for sending a PDU via a message queue.
#[cfg(feature = "std")]
#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)] #[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive] #[non_exhaustive]
pub enum GenericSendError { pub enum GenericSendError {
@@ -631,7 +631,6 @@ pub enum GenericSendError {
Other, Other,
} }
#[cfg(feature = "std")]
pub trait PduSendProvider { pub trait PduSendProvider {
fn send_pdu( fn send_pdu(
&self, &self,
@@ -846,40 +845,39 @@ pub fn determine_packet_target(raw_pdu: &[u8]) -> Result<PacketTarget, PduError>
expected: None, expected: None,
} }
})?; })?;
let packet_target = let packet_target = match file_directive_type {
match file_directive_type { // Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a.
// Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a. // the source handler
// the source handler FileDirectiveType::NakPdu
FileDirectiveType::NakPdu | FileDirectiveType::FinishedPdu
| FileDirectiveType::FinishedPdu | FileDirectiveType::KeepAlivePdu => PacketTarget::SourceEntity,
| FileDirectiveType::KeepAlivePdu => PacketTarget::SourceEntity, // Section b) of 4.5.3: These PDUs should always be targeted towards the file receiver a.k.a.
// Section b) of 4.5.3: These PDUs should always be targeted towards the file receiver a.k.a. // the destination handler
// the destination handler FileDirectiveType::MetadataPdu
FileDirectiveType::MetadataPdu | FileDirectiveType::EofPdu
| FileDirectiveType::EofPdu | FileDirectiveType::PromptPdu => PacketTarget::DestEntity,
| FileDirectiveType::PromptPdu => PacketTarget::DestEntity, // Section a): Recipient depends of the type of PDU that is being acknowledged. We can simply
// Section a): Recipient depends of the type of PDU that is being acknowledged. We can simply // extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to
// extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to // the source handler, for a Finished PDU, it is passed to the destination handler.
// the source handler, for a Finished PDU, it is passed to the destination handler. FileDirectiveType::AckPdu => {
FileDirectiveType::AckPdu => { let acked_directive = FileDirectiveType::try_from(raw_pdu[header_len + 1] >> 4)
let acked_directive = FileDirectiveType::try_from(raw_pdu[header_len + 1]) .map_err(|_| PduError::InvalidDirectiveType {
.map_err(|_| PduError::InvalidDirectiveType { found: (raw_pdu[header_len + 1] >> 4),
found: raw_pdu[header_len], expected: None,
expected: None, })?;
})?; if acked_directive == FileDirectiveType::EofPdu {
if acked_directive == FileDirectiveType::EofPdu { PacketTarget::SourceEntity
PacketTarget::SourceEntity } else if acked_directive == FileDirectiveType::FinishedPdu {
} else if acked_directive == FileDirectiveType::FinishedPdu { PacketTarget::DestEntity
PacketTarget::DestEntity } else {
} else { // TODO: Maybe a better error? This might be confusing..
// TODO: Maybe a better error? This might be confusing.. return Err(PduError::InvalidDirectiveType {
return Err(PduError::InvalidDirectiveType { found: raw_pdu[header_len + 1],
found: raw_pdu[header_len + 1], expected: None,
expected: None, });
});
}
} }
}; }
};
Ok(packet_target) Ok(packet_target)
} }
@@ -941,11 +939,11 @@ impl PduProvider for PduRawWithInfo<'_> {
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub mod alloc_mod { pub mod alloc_mod {
use spacepackets::cfdp::{ use spacepackets::cfdp::{
pdu::{FileDirectiveType, PduError},
PduType, PduType,
pdu::{FileDirectiveType, PduError},
}; };
use crate::{determine_packet_target, PacketTarget, PduProvider, PduRawWithInfo}; use crate::{PacketTarget, PduProvider, PduRawWithInfo, determine_packet_target};
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub struct PduOwnedWithInfo { pub struct PduOwnedWithInfo {
@@ -1003,21 +1001,25 @@ pub mod alloc_mod {
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use core::cell::RefCell; use core::{
cell::{Cell, RefCell},
sync::atomic::AtomicBool,
};
use std::{println, sync::Arc};
use alloc::{collections::VecDeque, string::String, vec::Vec}; use alloc::{collections::VecDeque, string::String, vec::Vec};
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
ChecksumType, ConditionCode, PduType, TransmissionMode,
lv::Lv, lv::Lv,
pdu::{ pdu::{
CommonPduConfig, FileDirectiveType, PduHeader,
eof::EofPdu, eof::EofPdu,
file_data::FileDataPdu, file_data::FileDataPdu,
metadata::{MetadataGenericParams, MetadataPduCreator}, metadata::{MetadataGenericParams, MetadataPduCreator},
CommonPduConfig, FileDirectiveType, PduHeader, WritablePduPacket,
}, },
ChecksumType, ConditionCode, PduType, TransmissionMode,
}, },
util::{UnsignedByteField, UnsignedByteFieldU16, UnsignedByteFieldU8, UnsignedEnum}, util::{UnsignedByteField, UnsignedByteFieldU8, UnsignedByteFieldU16, UnsignedEnum},
}; };
use user::{CfdpUser, OwnedMetadataRecvdParams, TransactionFinishedParams}; use user::{CfdpUser, OwnedMetadataRecvdParams, TransactionFinishedParams};
@@ -1028,6 +1030,111 @@ pub(crate) mod tests {
pub const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); pub const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
pub const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); pub const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
// This test structure allows to precisely control the expiry of CFDP timers.
#[derive(Debug, Default, Clone)]
pub(crate) struct TimerExpiryControl {
pub(crate) check_limit: Arc<AtomicBool>,
pub(crate) positive_ack: Arc<AtomicBool>,
}
impl TimerExpiryControl {
pub fn set_check_limit_expired(&mut self) {
self.check_limit
.store(true, core::sync::atomic::Ordering::Release);
}
#[allow(dead_code)]
pub fn set_positive_ack_expired(&mut self) {
self.positive_ack
.store(true, core::sync::atomic::Ordering::Release);
}
}
#[derive(Debug)]
pub(crate) struct TestCheckTimer {
counter: Cell<u32>,
context: TimerContext,
expiry_control: TimerExpiryControl,
}
impl CountdownProvider for TestCheckTimer {
fn has_expired(&self) -> bool {
match self.context {
TimerContext::CheckLimit {
local_id: _,
remote_id: _,
entity_type: _,
} => self
.expiry_control
.check_limit
.load(core::sync::atomic::Ordering::Acquire),
TimerContext::PositiveAck { expiry_time: _ } => self
.expiry_control
.positive_ack
.load(core::sync::atomic::Ordering::Acquire),
TimerContext::NakActivity { expiry_time: _ } => todo!(),
}
}
fn reset(&mut self) {
match self.context {
TimerContext::CheckLimit {
local_id: _,
remote_id: _,
entity_type: _,
} => self
.expiry_control
.check_limit
.store(false, core::sync::atomic::Ordering::Release),
TimerContext::NakActivity { expiry_time: _ } => todo!(),
TimerContext::PositiveAck { expiry_time: _ } => self
.expiry_control
.positive_ack
.store(false, core::sync::atomic::Ordering::Release),
}
self.counter.set(0);
}
}
impl TestCheckTimer {
pub fn new(context: TimerContext, expiry_control: &TimerExpiryControl) -> Self {
Self {
counter: Cell::new(0),
context,
expiry_control: expiry_control.clone(),
}
}
}
pub(crate) struct TestCheckTimerCreator {
expiry_control: TimerExpiryControl,
}
impl TestCheckTimerCreator {
pub fn new(expiry_control: &TimerExpiryControl) -> Self {
Self {
expiry_control: expiry_control.clone(),
}
}
}
impl TimerCreatorProvider for TestCheckTimerCreator {
type Countdown = TestCheckTimer;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown {
match timer_context {
TimerContext::CheckLimit { .. } => {
TestCheckTimer::new(timer_context, &self.expiry_control)
}
TimerContext::PositiveAck { expiry_time: _ } => {
TestCheckTimer::new(timer_context, &self.expiry_control)
}
_ => {
panic!("invalid check timer creator, can only be used for check limit handling")
}
}
}
}
pub struct FileSegmentRecvdParamsNoSegMetadata { pub struct FileSegmentRecvdParamsNoSegMetadata {
#[allow(dead_code)] #[allow(dead_code)]
pub id: TransactionId, pub id: TransactionId,
@@ -1247,6 +1354,12 @@ pub(crate) mod tests {
file_directive_type: Option<FileDirectiveType>, file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8], raw_pdu: &[u8],
) -> Result<(), GenericSendError> { ) -> Result<(), GenericSendError> {
println!(
"sent pdu: {:?}, directive: {:?}, len: {}",
pdu_type,
file_directive_type,
raw_pdu.len()
);
self.packet_queue.borrow_mut().push_back(SentPdu { self.packet_queue.borrow_mut().push_back(SentPdu {
pdu_type, pdu_type,
file_directive_type, file_directive_type,
@@ -1260,6 +1373,7 @@ pub(crate) mod tests {
pub fn retrieve_next_pdu(&self) -> Option<SentPdu> { pub fn retrieve_next_pdu(&self) -> Option<SentPdu> {
self.packet_queue.borrow_mut().pop_front() self.packet_queue.borrow_mut().pop_front()
} }
pub fn queue_empty(&self) -> bool { pub fn queue_empty(&self) -> bool {
self.packet_queue.borrow_mut().is_empty() self.packet_queue.borrow_mut().is_empty()
} }

View File

@@ -1,7 +1,7 @@
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
tlv::{GenericTlv, Tlv, TlvType},
SegmentationControl, TransmissionMode, SegmentationControl, TransmissionMode,
tlv::{GenericTlv, Tlv, TlvType},
}, },
util::UnsignedByteField, util::UnsignedByteField,
}; };
@@ -233,8 +233,8 @@ pub mod alloc_mod {
use super::*; use super::*;
use alloc::string::ToString; use alloc::string::ToString;
use spacepackets::{ use spacepackets::{
cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv, TlvOwned, WritableTlv},
ByteConversionError, ByteConversionError,
cfdp::tlv::{ReadableTlv, TlvOwned, WritableTlv, msg_to_user::MsgToUserTlv},
}; };
/// Owned variant of [PutRequest] with no lifetimes which is also [Clone]able. /// Owned variant of [PutRequest] with no lifetimes which is also [Clone]able.
@@ -557,7 +557,7 @@ mod tests {
use std::string::String; use std::string::String;
use spacepackets::{ use spacepackets::{
cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv}, cfdp::tlv::{ReadableTlv, msg_to_user::MsgToUserTlv},
util::UbfU16, util::UbfU16,
}; };

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
use core::fmt::Debug; use core::fmt::Debug;
/// Generic abstraction for a check/countdown timer. /// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone.
pub trait CountdownProvider: Debug { pub trait CountdownProvider: Debug {
fn has_expired(&self) -> bool; fn has_expired(&self) -> bool;
fn reset(&mut self); fn reset(&mut self);

View File

@@ -2,12 +2,12 @@
use spacepackets::cfdp::tlv::WritableTlv; use spacepackets::cfdp::tlv::WritableTlv;
use spacepackets::{ use spacepackets::{
cfdp::{ cfdp::{
ConditionCode,
pdu::{ pdu::{
file_data::SegmentMetadata, file_data::SegmentMetadata,
finished::{DeliveryCode, FileStatus}, finished::{DeliveryCode, FileStatus},
}, },
tlv::msg_to_user::MsgToUserTlv, tlv::msg_to_user::MsgToUserTlv,
ConditionCode,
}, },
util::UnsignedByteField, util::UnsignedByteField,
}; };

View File

@@ -2,23 +2,26 @@
use std::{ use std::{
fs::OpenOptions, fs::OpenOptions,
io::Write, io::Write,
sync::{atomic::AtomicBool, mpsc, Arc}, sync::{
Arc,
atomic::{AtomicBool, AtomicU16},
mpsc,
},
thread, thread,
time::Duration, time::Duration,
}; };
use cfdp::{ use cfdp::{
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
StdTimerCreator, TransactionId, UserFaultHookProvider,
dest::DestinationHandler, dest::DestinationHandler,
filestore::NativeFilestore, filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher}, request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler, source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
StdTimerCreator, TransactionId, UserFaultHookProvider,
}; };
use spacepackets::{ use spacepackets::{
cfdp::{ChecksumType, ConditionCode, TransmissionMode}, cfdp::{ChecksumType, ConditionCode, TransmissionMode},
seq_count::SeqCountProviderSyncU16,
util::UnsignedByteFieldU16, util::UnsignedByteFieldU16,
}; };
@@ -194,7 +197,7 @@ fn end_to_end_test(with_closure: bool) {
spacepackets::cfdp::TransmissionMode::Unacknowledged, spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32, ChecksumType::Crc32,
); );
let seq_count_provider = SeqCountProviderSyncU16::default(); let seq_count_provider = AtomicU16::default();
let mut source_handler = SourceHandler::new( let mut source_handler = SourceHandler::new(
local_cfg_source, local_cfg_source,
source_tx, source_tx,