Add acknowledged destination handler

This commit is contained in:
Robin Mueller
2025-09-12 15:56:17 +02:00
parent 8e755fd7b2
commit c3554774f4
13 changed files with 4824 additions and 1224 deletions

View File

@@ -29,7 +29,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.85.0
- uses: dtolnay/rust-toolchain@1.86.0
- run: cargo check --release
cross-check:

View File

@@ -8,9 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
- Bumped `spacepackets` to v0.15
- Bumped `spacepackets` to v0.16
- Bumped `defmt` to v1
## Added
- Acknowledged mode support for both source and destination handler.
- `FaultInfo` structure which is passed to user fault callbacks.
# [v0.2.0] 2024-11-26
- Bumped `thiserror` to v2

View File

@@ -2,7 +2,7 @@
name = "cfdp-rs"
version = "0.2.0"
edition = "2024"
rust-version = "1.85.0"
rust-version = "1.86.0"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
description = "High level CCSDS File Delivery Protocol components"
homepage = "https://egit.irs.uni-stuttgart.de/rust/cfdp"
@@ -22,6 +22,7 @@ derive-new = ">=0.6, <=0.7"
hashbrown = { version = ">=0.14, <=0.15", optional = true }
spacepackets = { git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git", version = "0.16", default-features = false }
thiserror = { version = "2", default-features = false }
heapless = "0.9"
serde = { version = "1", optional = true }
defmt = { version = "1", optional = true }
@@ -36,7 +37,7 @@ alloc = [
"hashbrown",
"spacepackets/alloc"
]
serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde"]
serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde", "heapless/serde"]
defmt = ["dep:defmt", "spacepackets/defmt"]
[dev-dependencies]

View File

@@ -17,11 +17,10 @@ The underlying base packet library used to generate the packets to be sent is th
`cfdp-rs` currently supports following high-level features:
- Unacknowledged (class 1) file transfers for both source and destination side.
- Acknowledged (class 2) file transfers for the source side.
- Acknowledged (class 2) file transfers for both source side and destination side.
The following features have not been implemented yet. PRs or notifications for demand are welcome!
- Acknowledged (class 2) file transfers for the destination side.
- Suspending transfers
- Inactivity handling
- Start and end of transmission and reception opportunity handling

View File

@@ -12,13 +12,13 @@ You can run both applications with `-h` to get more information about the availa
## Running the Python App
It is recommended to run the Python App in a dedicated virtual environment. For example, on a
Unix system you can use `python3 -m venv venv` and then `source venv/bin/activate` to create
Unix system you can use `uv venv` and then `source .venv/bin/activate` to create
and activate a virtual environment.
After that, you can install the required dependencies using
```sh
pip install -r requirements.txt
uv pip install -r requirements.txt
```
and then run the application using `./main.py` or `python3 main.py`.

View File

@@ -16,11 +16,11 @@ from typing import Any, Dict, List, Tuple, Optional
from multiprocessing import Queue
from queue import Empty
from cfdppy.handler import DestHandler, RemoteEntityCfgTable, SourceHandler
from cfdppy.handler import DestHandler, RemoteEntityConfigTable, SourceHandler
from cfdppy.exceptions import InvalidDestinationId, SourceFileDoesNotExist
from cfdppy import (
CfdpUserBase,
LocalEntityCfg,
LocalEntityConfig,
PacketDestination,
PutRequest,
TransactionId,
@@ -31,8 +31,8 @@ from cfdppy.mib import (
CheckTimerProvider,
DefaultFaultHandlerBase,
EntityType,
IndicationCfg,
RemoteEntityCfg,
IndicationConfig,
RemoteEntityConfig,
)
from cfdppy.user import (
FileSegmentRecvdParams,
@@ -58,7 +58,7 @@ from spacepackets.util import ByteFieldU16, UnsignedByteField
PYTHON_ENTITY_ID = ByteFieldU16(1)
RUST_ENTITY_ID = ByteFieldU16(2)
# Enable all indications for both local and remote entity.
INDICATION_CFG = IndicationCfg()
INDICATION_CFG = IndicationConfig()
BASE_STR_SRC = "PY SRC"
BASE_STR_DEST = "PY DEST"
@@ -79,7 +79,7 @@ DEST_ENTITY_QUEUE = Queue()
# be sent by the UDP server.
TM_QUEUE = Queue()
REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg(
REMOTE_CFG_OF_PY_ENTITY = RemoteEntityConfig(
entity_id=PYTHON_ENTITY_ID,
max_packet_len=MAX_PACKET_LEN,
max_file_segment_len=FILE_SEGMENT_SIZE,
@@ -585,7 +585,7 @@ def main():
logging.basicConfig(level=logging_level)
remote_cfg_table = RemoteEntityCfgTable()
remote_cfg_table = RemoteEntityConfigTable()
remote_cfg_table.add_config(REMOTE_CFG_OF_REMOTE_ENTITY)
src_fault_handler = CfdpFaultHandler(BASE_STR_SRC)
@@ -594,7 +594,7 @@ def main():
src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE)
check_timer_provider = CustomCheckTimerProvider()
source_handler = SourceHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
cfg=LocalEntityConfig(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
seq_num_provider=src_seq_count_provider,
remote_cfg_table=remote_cfg_table,
user=src_user,
@@ -614,7 +614,7 @@ def main():
dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST)
dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE)
dest_handler = DestHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
cfg=LocalEntityConfig(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
user=dest_user,
remote_cfg_table=remote_cfg_table,
check_timer_provider=check_timer_provider,

View File

@@ -12,10 +12,11 @@ use std::{
};
use cfdp::{
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider,
EntityType, FaultInfo, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHook,
dest::DestinationHandler,
filestore::NativeFilestore,
lost_segments::LostSegmentsList,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
@@ -62,43 +63,21 @@ pub struct Cli {
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
impl UserFaultHook for ExampleFaultHandler {
fn notice_of_suspension_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected suspension, {:?}", fault_info);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn notice_of_cancellation_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected cancellation, {:?}", fault_info);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn abandoned_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected abandonment, {:?}", fault_info);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn ignore_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected ignore, {:?}", fault_info);
}
}
@@ -261,7 +240,7 @@ impl UdpServer {
while let Ok(tm) = receiver.try_recv() {
debug!("Sending PDU: {:?}", tm);
pdu_printout(&tm);
let result = self.socket.send_to(tm.pdu(), self.remote_addr());
let result = self.socket.send_to(tm.raw_pdu(), self.remote_addr());
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
@@ -284,7 +263,7 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) {
spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
let meta_pdu =
MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed");
MetadataPduReader::new(pdu.raw_pdu()).expect("creating metadata pdu failed");
debug!("Metadata PDU: {:?}", meta_pdu)
}
spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
@@ -292,7 +271,8 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) {
spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
},
spacepackets::cfdp::PduType::FileData => {
let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed");
let fd_pdu =
FileDataPdu::from_bytes(pdu.raw_pdu()).expect("creating file data pdu failed");
debug!("File data PDU: {:?}", fd_pdu);
}
}
@@ -367,6 +347,7 @@ fn main() {
NativeFilestore::default(),
remote_cfg_python,
StdTimerCreator::default(),
LostSegmentsList::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);

File diff suppressed because it is too large Load Diff

View File

@@ -8,11 +8,10 @@
//! `cfdp-rs` currently supports following high-level features:
//!
//! - Unacknowledged (class 1) file transfers for both source and destination side.
//! - Acknowledged (class 2) file transfers for the source side.
//! - Acknowledged (class 2) file transfers for both source side and destination side.
//!
//! The following features have not been implemented yet. PRs or notifications for demand are welcome!
//!
//! - Acknowledged (class 2) file transfers for the destination side.
//! - Suspending transfers
//! - Inactivity handling
//! - Start and end of transmission and reception opportunity handling
@@ -72,7 +71,7 @@
//!
//! # Notes on the user hooks and scheduling
//!
//! Both examples feature implementations of the [UserFaultHookProvider] and the [user::CfdpUser]
//! Both examples feature implementations of the [UserFaultHook] and the [user::CfdpUser]
//! trait which simply print some information to the console to monitor the progress of a file
//! copy operation. These implementations could be adapted for other handler integrations. For
//! example, they could signal a GUI application to display some information for the user.
@@ -93,13 +92,14 @@ extern crate std;
#[cfg(feature = "alloc")]
pub mod dest;
pub mod filestore;
pub mod lost_segments;
pub mod request;
#[cfg(feature = "alloc")]
pub mod source;
pub mod time;
pub mod user;
use crate::time::CountdownProvider;
use crate::time::Countdown;
use core::{cell::RefCell, fmt::Debug, hash::Hash};
use crc::{CRC_32_ISCSI, CRC_32_ISO_HDLC, Crc};
@@ -180,8 +180,8 @@ pub enum TimerContext {
/// The timer will be used to perform the Positive Acknowledgement Procedures as specified in
/// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer
/// interval of the remote entity configuration.
pub trait TimerCreatorProvider {
type Countdown: CountdownProvider;
pub trait TimerCreator {
type Countdown: Countdown;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown;
}
@@ -256,12 +256,12 @@ pub struct RemoteEntityConfig {
pub crc_on_transmission_by_default: bool,
pub default_transmission_mode: TransmissionMode,
pub default_crc_type: ChecksumType,
pub positive_ack_timer_interval_seconds: f32,
pub positive_ack_timer_interval: Duration,
pub positive_ack_timer_expiration_limit: u32,
pub check_limit: u32,
pub disposition_on_cancellation: bool,
pub immediate_nak_mode: bool,
pub nak_timer_interval_seconds: f32,
pub nak_timer_interval: Duration,
pub nak_timer_expiration_limit: u32,
}
@@ -283,61 +283,74 @@ impl RemoteEntityConfig {
default_transmission_mode,
default_crc_type,
check_limit: 2,
positive_ack_timer_interval_seconds: 10.0,
positive_ack_timer_interval: Duration::from_secs(10),
positive_ack_timer_expiration_limit: 2,
disposition_on_cancellation: false,
immediate_nak_mode: true,
nak_timer_interval_seconds: 10.0,
nak_timer_interval: Duration::from_secs(10),
nak_timer_expiration_limit: 2,
}
}
}
pub trait RemoteEntityConfigProvider {
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum RemoteConfigStoreError {
#[error("store is full")]
Full,
}
pub trait RemoteConfigStore {
/// Retrieve the remote entity configuration for the given remote ID.
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
/// Add a new remote configuration. Return [true] if the configuration was
/// inserted successfully, and [false] if a configuration already exists.
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool;
/// Remote a configuration. Returns [true] if the configuration was removed successfully,
/// and [false] if no configuration exists for the given remote ID.
fn remove_config(&mut self, remote_id: u64) -> bool;
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError>;
}
/// This is a thin wrapper around a [hashbrown::HashMap] to store remote entity configurations.
/// It implements the full [RemoteEntityConfigProvider] trait.
/// It implements the full [RemoteEntityConfig] trait.
#[cfg(feature = "alloc")]
#[derive(Default, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct StdRemoteEntityConfigProvider(pub hashbrown::HashMap<u64, RemoteEntityConfig>);
pub struct RemoteConfigStoreStd(pub hashbrown::HashMap<u64, RemoteEntityConfig>);
#[cfg(feature = "std")]
impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider {
impl RemoteConfigStore for RemoteConfigStoreStd {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.0.get(&remote_id)
}
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.0.get_mut(&remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool {
self.0.insert(cfg.entity_id.value(), *cfg).is_some()
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
Ok(self.0.insert(cfg.entity_id.value(), *cfg).is_some())
}
fn remove_config(&mut self, remote_id: u64) -> bool {
}
#[cfg(feature = "std")]
impl RemoteConfigStoreStd {
pub fn remove_config(&mut self, remote_id: u64) -> bool {
self.0.remove(&remote_id).is_some()
}
}
/// This is a thin wrapper around a [alloc::vec::Vec] to store remote entity configurations.
/// It implements the full [RemoteEntityConfigProvider] trait.
/// It implements the full [RemoteEntityConfig] trait.
#[cfg(feature = "alloc")]
#[derive(Default, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct VecRemoteEntityConfigProvider(pub alloc::vec::Vec<RemoteEntityConfig>);
pub struct RemoteConfigList(pub alloc::vec::Vec<RemoteEntityConfig>);
#[cfg(feature = "alloc")]
impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider {
impl RemoteConfigStore for RemoteConfigList {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.0
.iter()
@@ -350,12 +363,19 @@ impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider {
.find(|cfg| cfg.entity_id.value() == remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool {
self.0.push(*cfg);
true
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
for other_cfg in self.0.iter() {
if cfg.entity_id.value() == other_cfg.entity_id.value() {
return Ok(false);
}
}
self.0.push(*cfg);
Ok(true)
}
}
fn remove_config(&mut self, remote_id: u64) -> bool {
impl RemoteConfigList {
pub fn remove_config(&mut self, remote_id: u64) -> bool {
for (idx, cfg) in self.0.iter().enumerate() {
if cfg.entity_id.value() == remote_id {
self.0.remove(idx);
@@ -366,10 +386,55 @@ impl RemoteEntityConfigProvider for VecRemoteEntityConfigProvider {
}
}
/// A remote entity configurations also implements the [RemoteEntityConfigProvider], but the
/// [RemoteEntityConfigProvider::add_config] and [RemoteEntityConfigProvider::remove_config]
/// are no-ops and always returns [false].
impl RemoteEntityConfigProvider for RemoteEntityConfig {
/// This is a thin wrapper around a [alloc::vec::Vec] to store remote entity configurations.
/// It implements the full [RemoteEntityConfig] trait.
#[derive(Default, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RemoteConfigListHeapless<const N: usize>(pub heapless::vec::Vec<RemoteEntityConfig, N>);
impl<const N: usize> RemoteConfigStore for RemoteConfigListHeapless<N> {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.0
.iter()
.find(|&cfg| cfg.entity_id.value() == remote_id)
}
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.0
.iter_mut()
.find(|cfg| cfg.entity_id.value() == remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
if self.0.is_full() {
return Err(RemoteConfigStoreError::Full);
}
for other_cfg in self.0.iter() {
if cfg.entity_id.value() == other_cfg.entity_id.value() {
return Ok(false);
}
}
self.0.push(*cfg).unwrap();
Ok(true)
}
}
impl<const N: usize> RemoteConfigListHeapless<N> {
pub fn remove_config(&mut self, remote_id: u64) -> bool {
for (idx, cfg) in self.0.iter().enumerate() {
if cfg.entity_id.value() == remote_id {
self.0.remove(idx);
return true;
}
}
false
}
}
/// A remote entity configurations also implements the [RemoteConfigStore], but the
/// [RemoteConfigStore::add_config] always returns [RemoteConfigStoreError::Full].
impl RemoteConfigStore for RemoteEntityConfig {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
if remote_id == self.entity_id.value() {
return Some(self);
@@ -384,76 +449,44 @@ impl RemoteEntityConfigProvider for RemoteEntityConfig {
None
}
fn add_config(&mut self, _cfg: &RemoteEntityConfig) -> bool {
false
}
fn remove_config(&mut self, _remote_id: u64) -> bool {
false
fn add_config(&mut self, _cfg: &RemoteEntityConfig) -> Result<bool, RemoteConfigStoreError> {
Err(RemoteConfigStoreError::Full)
}
}
/// This trait introduces some callbacks which will be called when a particular CFDP fault
/// handler is called.
///
/// It is passed into the CFDP handlers as part of the [UserFaultHookProvider] and the local entity
/// It is passed into the CFDP handlers as part of the [UserFaultHook] and the local entity
/// configuration and provides a way to specify custom user error handlers. This allows to
/// implement some CFDP features like fault handler logging, which would not be possible
/// generically otherwise.
///
/// For each error reported by the [FaultHandler], the appropriate fault handler callback
/// will be called depending on the [FaultHandlerCode].
pub trait UserFaultHookProvider {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
);
pub trait UserFaultHook {
fn notice_of_suspension_cb(&mut self, fault_info: FaultInfo);
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
);
fn notice_of_cancellation_cb(&mut self, fault_info: FaultInfo);
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64);
fn abandoned_cb(&mut self, fault_info: FaultInfo);
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64);
fn ignore_cb(&mut self, fault_info: FaultInfo);
}
/// Dummy fault hook which implements [UserFaultHookProvider] but only provides empty
/// Dummy fault hook which implements [UserFaultHook] but only provides empty
/// implementations.
#[derive(Default, Debug, PartialEq, Eq, Copy, Clone)]
pub struct DummyFaultHook {}
impl UserFaultHookProvider for DummyFaultHook {
fn notice_of_suspension_cb(
&mut self,
_transaction_id: TransactionId,
_cond: ConditionCode,
_progress: u64,
) {
}
impl UserFaultHook for DummyFaultHook {
fn notice_of_suspension_cb(&mut self, _fault_info: FaultInfo) {}
fn notice_of_cancellation_cb(
&mut self,
_transaction_id: TransactionId,
_cond: ConditionCode,
_progress: u64,
) {
}
fn notice_of_cancellation_cb(&mut self, _fault_info: FaultInfo) {}
fn abandoned_cb(
&mut self,
_transaction_id: TransactionId,
_cond: ConditionCode,
_progress: u64,
) {
}
fn abandoned_cb(&mut self, _fault_info: FaultInfo) {}
fn ignore_cb(&mut self, _transaction_id: TransactionId, _cond: ConditionCode, _progress: u64) {}
fn ignore_cb(&mut self, _fault_info: FaultInfo) {}
}
/// This structure is used to implement the fault handling as specified in chapter 4.8 of the CFDP
@@ -462,7 +495,7 @@ impl UserFaultHookProvider for DummyFaultHook {
/// It does so by mapping each applicable [spacepackets::cfdp::ConditionCode] to a fault handler
/// which is denoted by the four [spacepackets::cfdp::FaultHandlerCode]s. This code is used
/// to select the error handling inside the CFDP handler itself in addition to dispatching to a
/// user-provided callback function provided by the [UserFaultHookProvider].
/// user-provided callback function provided by the [UserFaultHook].
///
/// Some note on the provided default settings:
///
@@ -476,14 +509,51 @@ impl UserFaultHookProvider for DummyFaultHook {
/// These defaults can be overriden by using the [Self::set_fault_handler] method.
/// Please note that in any case, fault handler overrides can be specified by the sending CFDP
/// entity.
pub struct FaultHandler<UserHandler: UserFaultHookProvider> {
pub struct FaultHandler<UserHandler: UserFaultHook> {
handler_array: [FaultHandlerCode; 10],
// Could also change the user fault handler trait to have non mutable methods, but that limits
// flexbility on the user side..
pub user_hook: RefCell<UserHandler>,
}
impl<UserHandler: UserFaultHookProvider> FaultHandler<UserHandler> {
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct FaultInfo {
transaction_id: TransactionId,
condition_code: ConditionCode,
progress: u64,
}
impl FaultInfo {
pub const fn new(
transaction_id: TransactionId,
condition_code: ConditionCode,
progress: u64,
) -> Self {
Self {
transaction_id,
condition_code,
progress,
}
}
#[inline]
pub const fn transaction_id(&self) -> TransactionId {
self.transaction_id
}
#[inline]
pub const fn condition_code(&self) -> ConditionCode {
self.condition_code
}
#[inline]
pub const fn progress(&self) -> u64 {
self.progress
}
}
impl<UserHandler: UserFaultHook> FaultHandler<UserHandler> {
fn condition_code_to_array_index(conditon_code: ConditionCode) -> Option<usize> {
Some(match conditon_code {
ConditionCode::PositiveAckLimitReached => 0,
@@ -533,33 +603,23 @@ impl<UserHandler: UserFaultHookProvider> FaultHandler<UserHandler> {
self.handler_array[array_idx.unwrap()]
}
pub fn report_fault(
&self,
transaction_id: TransactionId,
condition: ConditionCode,
progress: u64,
) -> FaultHandlerCode {
let array_idx = Self::condition_code_to_array_index(condition);
if array_idx.is_none() {
return FaultHandlerCode::IgnoreError;
}
let fh_code = self.handler_array[array_idx.unwrap()];
pub fn report_fault(&self, code: FaultHandlerCode, fault_info: FaultInfo) -> FaultHandlerCode {
let mut handler_mut = self.user_hook.borrow_mut();
match fh_code {
match code {
FaultHandlerCode::NoticeOfCancellation => {
handler_mut.notice_of_cancellation_cb(transaction_id, condition, progress);
handler_mut.notice_of_cancellation_cb(fault_info);
}
FaultHandlerCode::NoticeOfSuspension => {
handler_mut.notice_of_suspension_cb(transaction_id, condition, progress);
handler_mut.notice_of_suspension_cb(fault_info);
}
FaultHandlerCode::IgnoreError => {
handler_mut.ignore_cb(transaction_id, condition, progress);
handler_mut.ignore_cb(fault_info);
}
FaultHandlerCode::AbandonTransaction => {
handler_mut.abandoned_cb(transaction_id, condition, progress);
handler_mut.abandoned_cb(fault_info);
}
}
fh_code
code
}
}
@@ -589,17 +649,17 @@ impl Default for IndicationConfig {
}
/// Each CFDP entity handler has a [LocalEntityConfig]uration.
pub struct LocalEntityConfig<UserFaultHook: UserFaultHookProvider> {
pub struct LocalEntityConfig<UserFaultHookInstance: UserFaultHook> {
pub id: UnsignedByteField,
pub indication_cfg: IndicationConfig,
pub fault_handler: FaultHandler<UserFaultHook>,
pub fault_handler: FaultHandler<UserFaultHookInstance>,
}
impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> {
impl<UserFaultHookInstance: UserFaultHook> LocalEntityConfig<UserFaultHookInstance> {
pub fn new(
id: UnsignedByteField,
indication_cfg: IndicationConfig,
hook: UserFaultHook,
hook: UserFaultHookInstance,
) -> Self {
Self {
id,
@@ -609,12 +669,12 @@ impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> {
}
}
impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> {
pub fn user_fault_hook_mut(&mut self) -> &mut RefCell<UserFaultHook> {
impl<UserFaultHookInstance: UserFaultHook> LocalEntityConfig<UserFaultHookInstance> {
pub fn user_fault_hook_mut(&mut self) -> &mut RefCell<UserFaultHookInstance> {
&mut self.fault_handler.user_hook
}
pub fn user_fault_hook(&self) -> &RefCell<UserFaultHook> {
pub fn user_fault_hook(&self) -> &RefCell<UserFaultHookInstance> {
&self.fault_handler.user_hook
}
}
@@ -631,13 +691,21 @@ pub enum GenericSendError {
Other,
}
pub trait PduSendProvider {
pub trait PduSender {
fn send_pdu(
&self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), GenericSendError>;
fn send_file_directive_pdu(
&self,
file_directive_type: FileDirectiveType,
raw_pdu: &[u8],
) -> Result<(), GenericSendError> {
self.send_pdu(PduType::FileDirective, Some(file_directive_type), raw_pdu)
}
}
#[cfg(feature = "std")]
@@ -646,7 +714,7 @@ pub mod std_mod {
use super::*;
impl PduSendProvider for mpsc::Sender<PduOwnedWithInfo> {
impl PduSender for mpsc::Sender<PduOwnedWithInfo> {
fn send_pdu(
&self,
pdu_type: PduType,
@@ -663,7 +731,7 @@ pub mod std_mod {
}
}
/// Simple implementation of the [CountdownProvider] trait assuming a standard runtime.
/// Simple implementation of the [Countdown] trait assuming a standard runtime.
#[derive(Debug)]
pub struct StdCountdown {
expiry_time: Duration,
@@ -683,7 +751,7 @@ pub mod std_mod {
}
}
impl CountdownProvider for StdCountdown {
impl Countdown for StdCountdown {
fn has_expired(&self) -> bool {
if self.start_time.elapsed() > self.expiry_time {
return true;
@@ -714,7 +782,7 @@ pub mod std_mod {
}
}
impl TimerCreatorProvider for StdTimerCreator {
impl TimerCreator for StdTimerCreator {
type Countdown = StdCountdown;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown {
@@ -800,7 +868,7 @@ pub enum PacketTarget {
pub trait PduProvider {
fn pdu_type(&self) -> PduType;
fn file_directive_type(&self) -> Option<FileDirectiveType>;
fn pdu(&self) -> &[u8];
fn raw_pdu(&self) -> &[u8];
fn packet_target(&self) -> Result<PacketTarget, PduError>;
}
@@ -815,7 +883,7 @@ impl PduProvider for DummyPduProvider {
None
}
fn pdu(&self) -> &[u8] {
fn raw_pdu(&self) -> &[u8] {
&[]
}
@@ -927,7 +995,7 @@ impl PduProvider for PduRawWithInfo<'_> {
self.file_directive_type
}
fn pdu(&self) -> &[u8] {
fn raw_pdu(&self) -> &[u8] {
self.raw_packet
}
@@ -989,7 +1057,7 @@ pub mod alloc_mod {
self.file_directive_type
}
fn pdu(&self) -> &[u8] {
fn raw_pdu(&self) -> &[u8] {
&self.pdu
}
@@ -999,6 +1067,12 @@ pub mod alloc_mod {
}
}
#[derive(Debug, Clone, Copy)]
struct PositiveAckParams {
ack_counter: u32,
positive_ack_of_cancellation: bool,
}
#[cfg(test)]
pub(crate) mod tests {
use core::{
@@ -1016,6 +1090,7 @@ pub(crate) mod tests {
CommonPduConfig, FileDirectiveType, PduHeader,
eof::EofPdu,
file_data::FileDataPdu,
finished::{DeliveryCode, FileStatus},
metadata::{MetadataGenericParams, MetadataPduCreator},
},
},
@@ -1035,6 +1110,7 @@ pub(crate) mod tests {
pub(crate) struct TimerExpiryControl {
pub(crate) check_limit: Arc<AtomicBool>,
pub(crate) positive_ack: Arc<AtomicBool>,
pub(crate) nak_activity: Arc<AtomicBool>,
}
impl TimerExpiryControl {
@@ -1043,11 +1119,15 @@ pub(crate) mod tests {
.store(true, core::sync::atomic::Ordering::Release);
}
#[allow(dead_code)]
pub fn set_positive_ack_expired(&mut self) {
self.positive_ack
.store(true, core::sync::atomic::Ordering::Release);
}
pub fn set_nak_activity_expired(&mut self) {
self.nak_activity
.store(true, core::sync::atomic::Ordering::Release);
}
}
#[derive(Debug)]
@@ -1057,7 +1137,7 @@ pub(crate) mod tests {
expiry_control: TimerExpiryControl,
}
impl CountdownProvider for TestCheckTimer {
impl Countdown for TestCheckTimer {
fn has_expired(&self) -> bool {
match self.context {
TimerContext::CheckLimit {
@@ -1072,7 +1152,10 @@ pub(crate) mod tests {
.expiry_control
.positive_ack
.load(core::sync::atomic::Ordering::Acquire),
TimerContext::NakActivity { expiry_time: _ } => todo!(),
TimerContext::NakActivity { expiry_time: _ } => self
.expiry_control
.nak_activity
.load(core::sync::atomic::Ordering::Acquire),
}
}
fn reset(&mut self) {
@@ -1085,7 +1168,10 @@ pub(crate) mod tests {
.expiry_control
.check_limit
.store(false, core::sync::atomic::Ordering::Release),
TimerContext::NakActivity { expiry_time: _ } => todo!(),
TimerContext::NakActivity { expiry_time: _ } => self
.expiry_control
.nak_activity
.store(false, core::sync::atomic::Ordering::Release),
TimerContext::PositiveAck { expiry_time: _ } => self
.expiry_control
.positive_ack
@@ -1117,7 +1203,7 @@ pub(crate) mod tests {
}
}
impl TimerCreatorProvider for TestCheckTimerCreator {
impl TimerCreator for TestCheckTimerCreator {
type Countdown = TestCheckTimer;
fn create_countdown(&self, timer_context: TimerContext) -> Self::Countdown {
@@ -1128,13 +1214,14 @@ pub(crate) mod tests {
TimerContext::PositiveAck { expiry_time: _ } => {
TestCheckTimer::new(timer_context, &self.expiry_control)
}
_ => {
panic!("invalid check timer creator, can only be used for check limit handling")
TimerContext::NakActivity { expiry_time: _ } => {
TestCheckTimer::new(timer_context, &self.expiry_control)
}
}
}
}
#[derive(Debug)]
pub struct FileSegmentRecvdParamsNoSegMetadata {
#[allow(dead_code)]
pub id: TransactionId,
@@ -1142,8 +1229,9 @@ pub(crate) mod tests {
pub length: usize,
}
#[derive(Default)]
#[derive(Default, Debug)]
pub struct TestCfdpUser {
pub check_queues_empty_on_drop: bool,
pub next_expected_seq_num: u64,
pub expected_full_src_name: String,
pub expected_full_dest_name: String,
@@ -1164,6 +1252,7 @@ pub(crate) mod tests {
expected_file_size: u64,
) -> Self {
Self {
check_queues_empty_on_drop: true,
next_expected_seq_num,
expected_full_src_name,
expected_full_dest_name,
@@ -1181,6 +1270,36 @@ pub(crate) mod tests {
assert_eq!(id.source_id, LOCAL_ID.into());
assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
}
pub fn indication_queues_empty(&self) -> bool {
self.finished_indic_queue.is_empty()
&& self.metadata_recv_queue.is_empty()
&& self.file_seg_recvd_queue.is_empty()
}
pub fn verify_finished_indication_retained(
&mut self,
delivery_code: DeliveryCode,
cond_code: ConditionCode,
id: TransactionId,
) {
self.verify_finished_indication(delivery_code, cond_code, id, FileStatus::Retained);
}
pub fn verify_finished_indication(
&mut self,
delivery_code: DeliveryCode,
cond_code: ConditionCode,
id: TransactionId,
file_status: FileStatus,
) {
assert_eq!(self.finished_indic_queue.len(), 1);
let finished_indication = self.finished_indic_queue.pop_front().unwrap();
assert_eq!(finished_indication.id, id);
assert_eq!(finished_indication.condition_code, cond_code);
assert_eq!(finished_indication.delivery_code, delivery_code);
assert_eq!(finished_indication.file_status, file_status);
}
}
impl CfdpUser for TestCfdpUser {
@@ -1270,48 +1389,43 @@ pub(crate) mod tests {
}
}
impl Drop for TestCfdpUser {
fn drop(&mut self) {
if self.check_queues_empty_on_drop {
assert!(
self.indication_queues_empty(),
"indication queues not empty on drop: finished: {}, metadata: {}, file seg: {}",
self.finished_indic_queue.len(),
self.metadata_recv_queue.len(),
self.file_seg_recvd_queue.len()
);
}
}
}
#[derive(Default, Debug)]
pub(crate) struct TestFaultHandler {
pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub abandoned_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub notice_of_suspension_queue: VecDeque<FaultInfo>,
pub notice_of_cancellation_queue: VecDeque<FaultInfo>,
pub abandoned_queue: VecDeque<FaultInfo>,
pub ignored_queue: VecDeque<FaultInfo>,
}
impl UserFaultHookProvider for TestFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.notice_of_suspension_queue
.push_back((transaction_id, cond, progress))
impl UserFaultHook for TestFaultHandler {
fn notice_of_suspension_cb(&mut self, fault_info: FaultInfo) {
self.notice_of_suspension_queue.push_back(fault_info)
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.notice_of_cancellation_queue
.push_back((transaction_id, cond, progress))
fn notice_of_cancellation_cb(&mut self, fault_info: FaultInfo) {
self.notice_of_cancellation_queue.push_back(fault_info)
}
fn abandoned_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.abandoned_queue
.push_back((transaction_id, cond, progress))
fn abandoned_cb(&mut self, fault_info: FaultInfo) {
self.abandoned_queue.push_back(fault_info)
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
self.ignored_queue
.push_back((transaction_id, cond, progress))
fn ignore_cb(&mut self, fault_info: FaultInfo) {
self.ignored_queue.push_back(fault_info)
}
}
@@ -1347,7 +1461,7 @@ pub(crate) mod tests {
pub packet_queue: RefCell<VecDeque<SentPdu>>,
}
impl PduSendProvider for TestCfdpSender {
impl PduSender for TestCfdpSender {
fn send_pdu(
&self,
pdu_type: PduType,
@@ -1370,6 +1484,10 @@ pub(crate) mod tests {
}
impl TestCfdpSender {
pub fn queue_len(&self) -> usize {
self.packet_queue.borrow_mut().len()
}
pub fn retrieve_next_pdu(&self) -> Option<SentPdu> {
self.packet_queue.borrow_mut().pop_front()
}
@@ -1383,8 +1501,8 @@ pub(crate) mod tests {
dest_id: impl Into<UnsignedByteField>,
max_packet_len: usize,
crc_on_transmission_by_default: bool,
) -> StdRemoteEntityConfigProvider {
let mut table = StdRemoteEntityConfigProvider::default();
) -> RemoteConfigStoreStd {
let mut table = RemoteConfigStoreStd::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
dest_id.into(),
max_packet_len,
@@ -1393,13 +1511,13 @@ pub(crate) mod tests {
TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
table.add_config(&remote_entity_cfg);
table.add_config(&remote_entity_cfg).unwrap();
table
}
fn generic_pdu_header() -> PduHeader {
let pdu_conf = CommonPduConfig::default();
PduHeader::new_no_file_data(pdu_conf, 0)
PduHeader::new_for_file_directive(pdu_conf, 0)
}
#[test]
@@ -1534,9 +1652,10 @@ pub(crate) mod tests {
TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
assert!(!remote_entity_cfg.add_config(&dummy));
// Removal is no-op.
assert!(!remote_entity_cfg.remove_config(REMOTE_ID.value()));
assert_eq!(
remote_entity_cfg.add_config(&dummy).unwrap_err(),
RemoteConfigStoreError::Full
);
let remote_entity_retrieved = remote_entity_cfg.get(REMOTE_ID.value()).unwrap();
assert_eq!(remote_entity_retrieved.entity_id, REMOTE_ID.into());
// Does not exist.
@@ -1554,9 +1673,9 @@ pub(crate) mod tests {
TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
let mut remote_cfg_provider = StdRemoteEntityConfigProvider::default();
let mut remote_cfg_provider = RemoteConfigStoreStd::default();
assert!(remote_cfg_provider.0.is_empty());
remote_cfg_provider.add_config(&remote_entity_cfg);
remote_cfg_provider.add_config(&remote_entity_cfg).unwrap();
assert_eq!(remote_cfg_provider.0.len(), 1);
let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(),
@@ -1568,7 +1687,9 @@ pub(crate) mod tests {
);
let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap();
assert_eq!(cfg_0.entity_id, REMOTE_ID.into());
remote_cfg_provider.add_config(&remote_entity_cfg_2);
remote_cfg_provider
.add_config(&remote_entity_cfg_2)
.unwrap();
assert_eq!(remote_cfg_provider.0.len(), 2);
let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap();
assert_eq!(cfg_1.entity_id, LOCAL_ID.into());
@@ -1582,7 +1703,7 @@ pub(crate) mod tests {
#[test]
fn test_remote_cfg_provider_vector() {
let mut remote_cfg_provider = VecRemoteEntityConfigProvider::default();
let mut remote_cfg_provider = RemoteConfigList::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(),
1024,
@@ -1592,7 +1713,7 @@ pub(crate) mod tests {
ChecksumType::Crc32,
);
assert!(remote_cfg_provider.0.is_empty());
remote_cfg_provider.add_config(&remote_entity_cfg);
remote_cfg_provider.add_config(&remote_entity_cfg).unwrap();
assert_eq!(remote_cfg_provider.0.len(), 1);
let remote_entity_cfg_2 = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(),
@@ -1604,7 +1725,11 @@ pub(crate) mod tests {
);
let cfg_0 = remote_cfg_provider.get(REMOTE_ID.value()).unwrap();
assert_eq!(cfg_0.entity_id, REMOTE_ID.into());
remote_cfg_provider.add_config(&remote_entity_cfg_2);
assert!(
remote_cfg_provider
.add_config(&remote_entity_cfg_2)
.unwrap()
);
assert_eq!(remote_cfg_provider.0.len(), 2);
let cfg_1 = remote_cfg_provider.get(LOCAL_ID.value()).unwrap();
assert_eq!(cfg_1.entity_id, LOCAL_ID.into());
@@ -1623,10 +1748,18 @@ pub(crate) mod tests {
UnsignedByteFieldU8::new(0).into(),
UnsignedByteFieldU8::new(0).into(),
);
user_hook_dummy.notice_of_cancellation_cb(transaction_id, ConditionCode::NoError, 0);
user_hook_dummy.notice_of_suspension_cb(transaction_id, ConditionCode::NoError, 0);
user_hook_dummy.abandoned_cb(transaction_id, ConditionCode::NoError, 0);
user_hook_dummy.ignore_cb(transaction_id, ConditionCode::NoError, 0);
user_hook_dummy.notice_of_cancellation_cb(FaultInfo::new(
transaction_id,
ConditionCode::NoError,
0,
));
user_hook_dummy.notice_of_suspension_cb(FaultInfo::new(
transaction_id,
ConditionCode::NoError,
0,
));
user_hook_dummy.abandoned_cb(FaultInfo::new(transaction_id, ConditionCode::NoError, 0));
user_hook_dummy.ignore_cb(FaultInfo::new(transaction_id, ConditionCode::NoError, 0));
}
#[test]
@@ -1634,7 +1767,7 @@ pub(crate) mod tests {
let dummy_pdu_provider = DummyPduProvider(());
assert_eq!(dummy_pdu_provider.pdu_type(), PduType::FileData);
assert!(dummy_pdu_provider.file_directive_type().is_none());
assert_eq!(dummy_pdu_provider.pdu(), &[]);
assert_eq!(dummy_pdu_provider.raw_pdu(), &[]);
assert_eq!(
dummy_pdu_provider.packet_target(),
Ok(PacketTarget::SourceEntity)

1361
src/lost_segments.rs Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
use core::fmt::Debug;
/// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone.
pub trait CountdownProvider: Debug {
pub trait Countdown: Debug {
fn has_expired(&self) -> bool;
fn reset(&mut self);
}

View File

@@ -12,10 +12,11 @@ use std::{
};
use cfdp::{
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
StdTimerCreator, TransactionId, UserFaultHookProvider,
EntityType, FaultInfo, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHook,
dest::DestinationHandler,
filestore::NativeFilestore,
lost_segments::LostSegmentsList,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
@@ -33,43 +34,21 @@ const FILE_DATA: &str = "Hello World!";
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
impl UserFaultHook for ExampleFaultHandler {
fn notice_of_suspension_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected suspension, {:?}", fault_info);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn notice_of_cancellation_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected cancellation, {:?}", fault_info);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn abandoned_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected abandonment, {:?}", fault_info);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn ignore_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected ignore, {:?}", fault_info);
}
}
@@ -159,7 +138,7 @@ impl CfdpUser for ExampleCfdpUser {
}
}
fn end_to_end_test(with_closure: bool) {
fn end_to_end_test(transmission_mode: TransmissionMode, with_closure: bool) {
// Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone();
@@ -194,7 +173,7 @@ fn end_to_end_test(with_closure: bool) {
1024,
with_closure,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
transmission_mode,
ChecksumType::Crc32,
);
let seq_count_provider = AtomicU16::default();
@@ -220,7 +199,7 @@ fn end_to_end_test(with_closure: bool) {
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
transmission_mode,
ChecksumType::Crc32,
);
let mut dest_handler = DestinationHandler::new(
@@ -230,6 +209,7 @@ fn end_to_end_test(with_closure: bool) {
NativeFilestore::default(),
remote_cfg_of_source,
StdTimerCreator::default(),
LostSegmentsList::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);
@@ -237,7 +217,7 @@ fn end_to_end_test(with_closure: bool) {
REMOTE_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged),
Some(transmission_mode),
Some(with_closure),
)
.expect("put request creation failed");
@@ -346,11 +326,16 @@ fn end_to_end_test(with_closure: bool) {
}
#[test]
fn end_to_end_test_no_closure() {
end_to_end_test(false);
fn end_to_end_unacknowledged_no_closure() {
end_to_end_test(TransmissionMode::Unacknowledged, false);
}
#[test]
fn end_to_end_test_with_closure() {
end_to_end_test(true);
fn end_to_end_unacknowledged_with_closure() {
end_to_end_test(TransmissionMode::Unacknowledged, true);
}
#[test]
fn end_to_end_acknowledged() {
end_to_end_test(TransmissionMode::Acknowledged, true);
}