18 Commits

Author SHA1 Message Date
1bb840d3c2 minor docs fixes 2025-09-25 13:51:52 +02:00
17a79dcbf1 prepare v0.3.0 2025-09-25 13:30:34 +02:00
2e8134dd08 changelog 2025-09-25 13:28:56 +02:00
5b5856d551 Merge pull request 'start removing alloc dependency' (#13) from remove-alloc-dep into main
Reviewed-on: #13
2025-09-25 13:27:01 +02:00
Robin Mueller
b79d5d7de8 start removing alloc dependency 2025-09-25 13:26:46 +02:00
f48267692c Merge pull request 'released spacepackets' (#12) from use-released-spacepackets into main
Reviewed-on: #12
2025-09-24 20:06:04 +02:00
Robin Mueller
7ded3316e2 update changelog 2025-09-24 20:04:53 +02:00
Robin Mueller
09c48abd18 use spacepackets v0.16.0 2025-09-24 20:03:33 +02:00
6352a94cd7 Merge pull request 'add acknowledged destination handler' (#11) from add-acknowledged-dest-handler into main
Reviewed-on: #11
2025-09-24 20:02:30 +02:00
Robin Mueller
c3554774f4 Add acknowledged destination handler 2025-09-24 19:44:08 +02:00
8e755fd7b2 Merge pull request 'Add acknowledged source handler' (#10) from add-acknowledged-source-handler into main
Reviewed-on: #10
2025-09-12 13:02:47 +02:00
Robin Mueller
ada26f626e Add acknowledged source handler 2025-09-12 12:00:25 +02:00
fa4657274f Merge pull request 'small improvement' (#9) from add-chat-badge into main
Reviewed-on: #9
2025-08-14 14:26:06 +02:00
Robin Mueller
b78a8c4eee small improvement 2025-08-14 14:25:55 +02:00
ca30d523c3 Merge pull request 'add chat badge' (#8) from add-chat-badge into main
Reviewed-on: #8
2025-08-14 14:25:06 +02:00
Robin Mueller
8b5617ffd4 add chat badge 2025-08-14 14:24:57 +02:00
732fac1d1c Merge pull request 'Dependency updates and doc improvements' (#7) from dep-update-docs-improvements into main
Reviewed-on: #7
2025-08-13 18:17:17 +02:00
226a7494a0 Dependency updates and Doc improvements 2025-08-13 18:12:33 +02:00
20 changed files with 6201 additions and 1873 deletions

View File

@@ -21,7 +21,7 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
- name: Install nextest
uses: taiki-e/install-action@nextest
- run: cargo nextest run --all-features
- run: cargo nextest run --features "serde, defmt"
- run: cargo test --doc
msrv:
@@ -29,7 +29,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.81.0
- uses: dtolnay/rust-toolchain@1.86.0
- run: cargo check --release
cross-check:
@@ -45,7 +45,7 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
with:
targets: "armv7-unknown-linux-gnueabihf, thumbv7em-none-eabihf"
- run: cargo check --release --target=${{matrix.target}} --no-default-features
- run: cargo check --release --target=${{matrix.target}} --no-default-features --features "packet-buf-1k, defmt"
fmt:
name: Check formatting
@@ -53,6 +53,8 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt
- run: cargo fmt --all -- --check
docs:
@@ -61,7 +63,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
- run: RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" cargo +nightly doc --all-features
- run: RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" cargo +nightly doc --features "serde, defmt"
clippy:
name: Clippy
@@ -69,4 +71,6 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- run: cargo clippy -- -D warnings

View File

@@ -8,6 +8,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.3.0] 2025-09-25
- Bumped `spacepackets` to v0.16
- Bumped `defmt` to v1
## Added
- Acknowledged mode support for both source and destination handler.
- `FaultInfo` structure which is passed to user fault callbacks.
# [v0.2.0] 2024-11-26
- Bumped `thiserror` to v2
@@ -18,3 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [v0.1.0] 2024-09-11
Initial release
[unreleased]: https://egit.irs.uni-stuttgart.de/rust/cfdp/compare/v0.3.0...HEAD
[v0.3.0]: https://egit.irs.uni-stuttgart.de/rust/cfdp/compare/v0.2.0...v0.3.0
[v0.2.0]: https://egit.irs.uni-stuttgart.de/rust/cfdp/compare/v0.1.0...v0.2.0

View File

@@ -1,8 +1,8 @@
[package]
name = "cfdp-rs"
version = "0.2.0"
edition = "2021"
rust-version = "1.81.0"
version = "0.3.0"
edition = "2024"
rust-version = "1.86.0"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
description = "High level CCSDS File Delivery Protocol components"
homepage = "https://egit.irs.uni-stuttgart.de/rust/cfdp"
@@ -19,29 +19,15 @@ name = "cfdp"
crc = "3"
smallvec = "1"
derive-new = ">=0.6, <=0.7"
[dependencies.spacepackets]
version = "0.13"
default-features = false
[dependencies.thiserror]
version = "2"
default-features = false
[dependencies.hashbrown]
version = ">=0.14, <=0.15"
optional = true
[dependencies.serde]
version = "1"
optional = true
[dependencies.defmt]
version = "0.3"
optional = true
hashbrown = { version = ">=0.14, <=0.15", optional = true }
spacepackets = { version = "0.16", default-features = false }
thiserror = { version = "2", default-features = false }
heapless = "0.9"
serde = { version = "1", optional = true }
defmt = { version = "1", optional = true }
[features]
default = ["std"]
default = ["std", "packet-buf-2k"]
std = [
"alloc",
"thiserror/std",
@@ -51,17 +37,29 @@ alloc = [
"hashbrown",
"spacepackets/alloc"
]
serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde"]
serde = ["dep:serde", "spacepackets/serde", "hashbrown/serde", "heapless/serde"]
defmt = ["dep:defmt", "spacepackets/defmt"]
# Available packet buffer sizes. Only one should be enabled.
# 256 bytes
packet-buf-256 = []
# 512 bytes
packet-buf-512 = []
# 1024 bytes
packet-buf-1k = []
# 2048 bytes
packet-buf-2k = []
# 4096 bytes
packet-buf-4k = []
[dev-dependencies]
tempfile = "3"
rand = "0.8"
rand = "0.9"
log = "0.4"
fern = "0.7"
chrono = "0.4"
clap = { version = "4", features = ["derive"] }
[package.metadata.docs.rs]
all-features = true
features = ["serde", "defmt"]
rustdoc-args = ["--generate-link-to-definition"]

View File

@@ -1,7 +1,8 @@
[![Crates.io](https://img.shields.io/crates/v/cfdp-rs)](https://crates.io/crates/cfdp-rs)
[![docs.rs](https://img.shields.io/docsrs/cfdp-rs)](https://docs.rs/cfdp-rs)
[![ci](https://github.com/us-irs/cfdp-rs/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/us-irs/cfdp-rs/actions/workflows/ci.yml)
[![coverage](https://shields.io/endpoint?url=https://absatsw.irs.uni-stuttgart.de/projects/cfdp/coverage-rs/latest/coverage.json)](https://absatsw.irs.uni-stuttgart.de/projects/cfdp/coverage-rs/latest/index.html)
[![matrix chat](https://img.shields.io/matrix/sat-rs%3Amatrix.org)](https://matrix.to/#/#sat-rs:matrix.org)
<!-- Does not work right now, I'd need to host that myself. [![coverage](https://shields.io/endpoint?url=https://absatsw.irs.uni-stuttgart.de/projects/cfdp/coverage-rs/latest/coverage.json)](https://absatsw.irs.uni-stuttgart.de/projects/cfdp/coverage-rs/latest/index.html) -->
cfdp-rs - High level Rust crate for CFDP components
======================
@@ -13,26 +14,20 @@ The underlying base packet library used to generate the packets to be sent is th
# Features
The goal of this library is to be flexible enough to support the use-cases of both on-board
software and of ground software. It has support to make integration on `std` systems as simple
as possible, but also has sufficient abstraction to allow for integration on`no_std` environments
and can be used on these systems as well as long as the `alloc` feature is activated.
`cfdp-rs` currently supports following high-level features:
Please note even though the `alloc` feature is required for the core handlers, these components
will only allocate memory at initialization time and thus are still viable for systems where
run-time allocation is prohibited.
- Unacknowledged (class 1) file transfers for both source and destination side.
- Acknowledged (class 2) file transfers for both source and destination side.
## Default features
The following features have not been implemented yet. PRs or notifications for demand are welcome!
- [`std`](https://doc.rust-lang.org/std/): Enables functionality relying on the standard library.
- [`alloc`](https://doc.rust-lang.org/alloc/): Enables features which require allocation support.
Enabled by the `std` feature.
- Suspending transfers
- Inactivity handling
- Start and end of transmission and reception opportunity handling
- Keep Alive and Prompt PDU handling
## Optional Features
- [`serde`](https://serde.rs/): Adds `serde` support for most types by adding `Serialize` and `Deserialize` `derive`s
- [`defmt`](https://defmt.ferrous-systems.com/): Add support for the `defmt` by adding the
[`defmt::Format`](https://defmt.ferrous-systems.com/format) derive on many types.
Check out the [documentation](https://docs.rs/cfdp-rs) for more information on available
Rust features.
# Examples
@@ -41,13 +36,11 @@ examples.
# Coverage
Coverage was generated using [`grcov`](https://github.com/mozilla/grcov). If you have not done so
already, install the `llvm-tools-preview`:
Coverage can be generated using [`llvm-cov`](https://github.com/taiki-e/cargo-llvm-cov). If you have not done so
already, install the tool:
```sh
rustup component add llvm-tools-preview
cargo install grcov --locked
cargo +stable install cargo-llvm-cov --locked
```
After that, you can simply run `coverage.py` to test the project with coverage. You can optionally
supply the `--open` flag to open the coverage report in your webbrowser.
After this, you can run `cargo llvm-cov nextest` to run all the tests and display coverage.

View File

@@ -1,3 +0,0 @@
#!/bin/sh
export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options"
cargo +nightly doc --all-features --open

View File

@@ -12,13 +12,13 @@ You can run both applications with `-h` to get more information about the availa
## Running the Python App
It is recommended to run the Python App in a dedicated virtual environment. For example, on a
Unix system you can use `python3 -m venv venv` and then `source venv/bin/activate` to create
Unix system you can use `uv venv` and then `source .venv/bin/activate` to create
and activate a virtual environment.
After that, you can install the required dependencies using
```sh
pip install -r requirements.txt
uv pip install -r requirements.txt
```
and then run the application using `./main.py` or `python3 main.py`.

View File

@@ -16,11 +16,11 @@ from typing import Any, Dict, List, Tuple, Optional
from multiprocessing import Queue
from queue import Empty
from cfdppy.handler import DestHandler, RemoteEntityCfgTable, SourceHandler
from cfdppy.handler import DestHandler, RemoteEntityConfigTable, SourceHandler
from cfdppy.exceptions import InvalidDestinationId, SourceFileDoesNotExist
from cfdppy import (
CfdpUserBase,
LocalEntityCfg,
LocalEntityConfig,
PacketDestination,
PutRequest,
TransactionId,
@@ -31,8 +31,8 @@ from cfdppy.mib import (
CheckTimerProvider,
DefaultFaultHandlerBase,
EntityType,
IndicationCfg,
RemoteEntityCfg,
IndicationConfig,
RemoteEntityConfig,
)
from cfdppy.user import (
FileSegmentRecvdParams,
@@ -58,7 +58,7 @@ from spacepackets.util import ByteFieldU16, UnsignedByteField
PYTHON_ENTITY_ID = ByteFieldU16(1)
RUST_ENTITY_ID = ByteFieldU16(2)
# Enable all indications for both local and remote entity.
INDICATION_CFG = IndicationCfg()
INDICATION_CFG = IndicationConfig()
BASE_STR_SRC = "PY SRC"
BASE_STR_DEST = "PY DEST"
@@ -79,7 +79,7 @@ DEST_ENTITY_QUEUE = Queue()
# be sent by the UDP server.
TM_QUEUE = Queue()
REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg(
REMOTE_CFG_OF_PY_ENTITY = RemoteEntityConfig(
entity_id=PYTHON_ENTITY_ID,
max_packet_len=MAX_PACKET_LEN,
max_file_segment_len=FILE_SEGMENT_SIZE,
@@ -585,7 +585,7 @@ def main():
logging.basicConfig(level=logging_level)
remote_cfg_table = RemoteEntityCfgTable()
remote_cfg_table = RemoteEntityConfigTable()
remote_cfg_table.add_config(REMOTE_CFG_OF_REMOTE_ENTITY)
src_fault_handler = CfdpFaultHandler(BASE_STR_SRC)
@@ -594,7 +594,7 @@ def main():
src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE)
check_timer_provider = CustomCheckTimerProvider()
source_handler = SourceHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
cfg=LocalEntityConfig(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
seq_num_provider=src_seq_count_provider,
remote_cfg_table=remote_cfg_table,
user=src_user,
@@ -614,7 +614,7 @@ def main():
dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST)
dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE)
dest_handler = DestHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
cfg=LocalEntityConfig(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
user=dest_user,
remote_cfg_table=remote_cfg_table,
check_timer_provider=check_timer_provider,

View File

@@ -3,31 +3,36 @@ use std::{
fs::OpenOptions,
io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::mpsc,
sync::{
atomic::{AtomicBool, AtomicU16},
mpsc,
},
thread,
time::Duration,
};
use cfdp::{
EntityType, FaultInfo, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHook,
dest::DestinationHandler,
filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher},
lost_segments::LostSegmentsList,
request::PutRequestOwned,
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider,
};
use clap::Parser;
use log::{debug, info, warn};
use spacepackets::{
cfdp::{
pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError},
ChecksumType, ConditionCode, TransmissionMode,
pdu::{PduError, file_data::FileDataPdu, metadata::MetadataPduReader},
},
seq_count::SeqCountProviderSyncU16,
util::{UnsignedByteFieldU16, UnsignedEnum},
};
static KILL_APP: AtomicBool = AtomicBool::new(false);
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
@@ -58,43 +63,21 @@ pub struct Cli {
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
impl UserFaultHook for ExampleFaultHandler {
fn notice_of_suspension_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected suspension, {:?}", fault_info);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn notice_of_cancellation_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected cancellation, {:?}", fault_info);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn abandoned_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected abandonment, {:?}", fault_info);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn ignore_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected ignore, {:?}", fault_info);
}
}
@@ -231,7 +214,7 @@ impl UdpServer {
Ok(None)
} else {
Err(e.into())
}
};
}
};
let (_, from) = res;
@@ -257,7 +240,7 @@ impl UdpServer {
while let Ok(tm) = receiver.try_recv() {
debug!("Sending PDU: {:?}", tm);
pdu_printout(&tm);
let result = self.socket.send_to(tm.pdu(), self.remote_addr());
let result = self.socket.send_to(tm.raw_pdu(), self.remote_addr());
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
@@ -280,7 +263,7 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) {
spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
let meta_pdu =
MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed");
MetadataPduReader::new(pdu.raw_pdu()).expect("creating metadata pdu failed");
debug!("Metadata PDU: {:?}", meta_pdu)
}
spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
@@ -288,7 +271,8 @@ fn pdu_printout(pdu: &PduOwnedWithInfo) {
spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
},
spacepackets::cfdp::PduType::FileData => {
let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed");
let fd_pdu =
FileDataPdu::from_bytes(pdu.raw_pdu()).expect("creating file data pdu failed");
debug!("File data PDU: {:?}", fd_pdu);
}
}
@@ -329,7 +313,6 @@ fn main() {
);
let (source_tm_tx, source_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
let (dest_tm_tx, dest_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg_python = RemoteEntityConfig::new_with_default_values(
PYTHON_ID.into(),
1024,
@@ -338,13 +321,11 @@ fn main() {
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32C,
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let seq_count_provider = AtomicU16::default();
let mut source_handler = SourceHandler::new(
local_cfg_source,
source_tm_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg_python,
StdTimerCreator::default(),
seq_count_provider,
@@ -358,11 +339,11 @@ fn main() {
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tm_tx,
NativeFilestore::default(),
remote_cfg_python,
StdTimerCreator::default(),
LostSegmentsList::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);
@@ -411,6 +392,9 @@ fn main() {
.expect("put request failed");
}
loop {
if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match source_tc_rx.try_recv() {
@@ -453,6 +437,9 @@ fn main() {
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let packet_info = match dest_tc_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
@@ -494,6 +481,9 @@ fn main() {
info!("Starting UDP server on {}", remote_addr);
loop {
loop {
if KILL_APP.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
match udp_server.try_recv_tc() {
Ok(result) => match result {
Some((pdu, _addr)) => {

34
justfile Normal file
View File

@@ -0,0 +1,34 @@
all: check build embedded clippy fmt docs test coverage
clippy:
cargo clippy -- -D warnings
fmt:
cargo fmt --all -- --check
check:
cargo check --features "serde, defmt"
test:
cargo nextest r --features "serde, defmt"
cargo test --doc
build:
cargo build --features "serde, defmt"
embedded:
cargo build --target thumbv7em-none-eabihf --no-default-features --features "defmt, packet-buf-1k"
docs:
export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options"
cargo +nightly doc --features "serde, defmt"
docs-html:
export RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options"
cargo +nightly doc --features "serde, defmt" --open
coverage:
cargo llvm-cov nextest
coverage-html:
cargo llvm-cov nextest --html --open

21
src/buf_len.rs Normal file
View File

@@ -0,0 +1,21 @@
#[cfg(not(any(
feature = "packet-buf-256",
feature = "packet-buf-512",
feature = "packet-buf-1k",
feature = "packet-buf-2k",
feature = "packet-buf-4k"
)))]
compile_error!(
"One of the features `packet-buf-256`, `packet-buf-512`, `packet-buf-1k`, `packet-buf-2k`, or `packet-buf-4k` must be enabled."
);
#[cfg(feature = "packet-buf-256")]
pub const PACKET_BUF_LEN: usize = 256;
#[cfg(feature = "packet-buf-512")]
pub const PACKET_BUF_LEN: usize = 512;
#[cfg(feature = "packet-buf-1k")]
pub const PACKET_BUF_LEN: usize = 1024;
#[cfg(feature = "packet-buf-2k")]
pub const PACKET_BUF_LEN: usize = 2048;
#[cfg(feature = "packet-buf-4k")]
pub const PACKET_BUF_LEN: usize = 4096;

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
use spacepackets::cfdp::ChecksumType;
use spacepackets::ByteConversionError;
use spacepackets::cfdp::ChecksumType;
#[cfg(feature = "std")]
pub use std_mod::*;
@@ -375,9 +375,11 @@ mod tests {
.create_dir(dir_path.to_str().expect("getting str for file failed"))
.unwrap();
assert!(NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS
.is_dir(dir_path.as_path().to_str().unwrap())
.unwrap());
assert!(
NATIVE_FS
.is_dir(dir_path.as_path().to_str().unwrap())
.unwrap()
);
}
#[test]

File diff suppressed because it is too large Load Diff

1361
src/lost_segments.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,10 @@
use core::str::Utf8Error;
use spacepackets::{
ByteConversionError,
cfdp::{
tlv::{GenericTlv, Tlv, TlvType},
SegmentationControl, TransmissionMode,
tlv::{GenericTlv, ReadableTlv as _, Tlv, TlvType, WritableTlv as _},
},
util::UnsignedByteField,
};
@@ -24,10 +27,10 @@ pub trait ReadablePutRequest {
fn closure_requested(&self) -> Option<bool>;
fn seg_ctrl(&self) -> Option<SegmentationControl>;
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv>>;
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv>>;
fn flow_label(&self) -> Option<Tlv>;
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv>>;
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv<'_>>>;
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv<'_>>>;
fn flow_label(&self) -> Option<Tlv<'_>>;
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv<'_>>>;
}
#[derive(Debug, PartialEq, Eq)]
@@ -101,25 +104,25 @@ impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> {
self.seg_ctrl
}
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv>> {
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv<'_>>> {
if let Some(msgs_to_user) = self.msgs_to_user {
return Some(msgs_to_user.iter().copied());
}
None
}
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv>> {
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv<'_>>> {
if let Some(fh_overrides) = self.fault_handler_overrides {
return Some(fh_overrides.iter().copied());
}
None
}
fn flow_label(&self) -> Option<Tlv> {
fn flow_label(&self) -> Option<Tlv<'_>> {
self.flow_label
}
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv>> {
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv<'_>>> {
if let Some(fs_requests) = self.msgs_to_user {
return Some(fs_requests.iter().copied());
}
@@ -226,16 +229,171 @@ pub fn generic_tlv_list_type_check<TlvProvider: GenericTlv>(
true
}
pub struct StaticPutRequestFields {
pub destination_id: UnsignedByteField,
/// Static buffer to store source file path.
pub source_file_buf: [u8; u8::MAX as usize],
/// Current source path length.
pub source_file_len: usize,
/// Static buffer to store dest file path.
pub dest_file_buf: [u8; u8::MAX as usize],
/// Current destination path length.
pub dest_file_len: usize,
pub trans_mode: Option<TransmissionMode>,
pub closure_requested: Option<bool>,
pub seg_ctrl: Option<SegmentationControl>,
}
impl Default for StaticPutRequestFields {
fn default() -> Self {
Self {
destination_id: UnsignedByteField::new(0, 0),
source_file_buf: [0; u8::MAX as usize],
source_file_len: Default::default(),
dest_file_buf: [0; u8::MAX as usize],
dest_file_len: Default::default(),
trans_mode: Default::default(),
closure_requested: Default::default(),
seg_ctrl: Default::default(),
}
}
}
impl StaticPutRequestFields {
pub fn clear(&mut self) {
self.destination_id = UnsignedByteField::new(0, 0);
self.source_file_len = 0;
self.dest_file_len = 0;
self.trans_mode = None;
self.closure_requested = None;
self.seg_ctrl = None;
}
}
/// This is a put request cache structure which can be used to cache [ReadablePutRequest]s
/// without requiring run-time allocation. The user must specify the static buffer sizes used
/// to store TLVs or list of TLVs.
pub struct StaticPutRequestCacher<const BUF_SIZE: usize> {
pub static_fields: StaticPutRequestFields,
opts_buf: [u8; BUF_SIZE],
opts_len: usize,
}
impl<const BUF_SIZE: usize> Default for StaticPutRequestCacher<BUF_SIZE> {
fn default() -> Self {
Self::new()
}
}
impl<const BUF_SIZE: usize> StaticPutRequestCacher<BUF_SIZE> {
pub fn new() -> Self {
Self {
static_fields: StaticPutRequestFields::default(),
opts_buf: [0; BUF_SIZE],
opts_len: 0,
}
}
pub fn set(
&mut self,
put_request: &impl ReadablePutRequest,
) -> Result<(), ByteConversionError> {
self.static_fields.destination_id = put_request.destination_id();
if let Some(source_file) = put_request.source_file() {
if source_file.len() > u8::MAX as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.static_fields.source_file_buf.len(),
expected: source_file.len(),
});
}
self.static_fields.source_file_buf[..source_file.len()]
.copy_from_slice(source_file.as_bytes());
self.static_fields.source_file_len = source_file.len();
}
if let Some(dest_file) = put_request.dest_file() {
if dest_file.len() > u8::MAX as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.static_fields.source_file_buf.len(),
expected: dest_file.len(),
});
}
self.static_fields.dest_file_buf[..dest_file.len()]
.copy_from_slice(dest_file.as_bytes());
self.static_fields.dest_file_len = dest_file.len();
}
self.static_fields.trans_mode = put_request.trans_mode();
self.static_fields.closure_requested = put_request.closure_requested();
self.static_fields.seg_ctrl = put_request.seg_ctrl();
let mut current_idx = 0;
let mut store_tlv = |tlv: &Tlv| {
if current_idx + tlv.len_full() > self.opts_buf.len() {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.opts_buf.len(),
expected: current_idx + tlv.len_full(),
});
}
// We checked the buffer lengths, so this should never fail.
tlv.write_to_bytes(&mut self.opts_buf[current_idx..current_idx + tlv.len_full()])
.unwrap();
current_idx += tlv.len_full();
Ok(())
};
if let Some(fs_req) = put_request.fs_requests() {
for fs_req in fs_req {
store_tlv(&fs_req)?;
}
}
if let Some(msgs_to_user) = put_request.msgs_to_user() {
for msg_to_user in msgs_to_user {
store_tlv(&msg_to_user)?;
}
}
self.opts_len = current_idx;
Ok(())
}
pub fn has_source_file(&self) -> bool {
self.static_fields.source_file_len > 0
}
pub fn has_dest_file(&self) -> bool {
self.static_fields.dest_file_len > 0
}
pub fn source_file(&self) -> Result<&str, Utf8Error> {
core::str::from_utf8(
&self.static_fields.source_file_buf[0..self.static_fields.source_file_len],
)
}
pub fn dest_file(&self) -> Result<&str, Utf8Error> {
core::str::from_utf8(&self.static_fields.dest_file_buf[0..self.static_fields.dest_file_len])
}
pub fn opts_len(&self) -> usize {
self.opts_len
}
pub fn opts_slice(&self) -> &[u8] {
&self.opts_buf[0..self.opts_len]
}
/// This clears the cacher structure. This is a cheap operation because it only
/// sets [Option]al values to [None] and the length of stores TLVs to 0.
///
/// Please note that this method will not set the values in the buffer to 0.
pub fn clear(&mut self) {
self.static_fields.clear();
self.opts_len = 0;
}
}
#[cfg(feature = "alloc")]
pub mod alloc_mod {
use core::str::Utf8Error;
use super::*;
use alloc::string::ToString;
use spacepackets::{
cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv, TlvOwned, WritableTlv},
ByteConversionError,
};
use spacepackets::cfdp::tlv::{TlvOwned, msg_to_user::MsgToUserTlv};
/// Owned variant of [PutRequest] with no lifetimes which is also [Clone]able.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -370,186 +528,31 @@ pub mod alloc_mod {
self.seg_ctrl
}
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv>> {
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv<'_>>> {
if let Some(msgs_to_user) = &self.msgs_to_user {
return Some(msgs_to_user.iter().map(|tlv_owned| tlv_owned.as_tlv()));
}
None
}
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv>> {
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv<'_>>> {
if let Some(fh_overrides) = &self.fault_handler_overrides {
return Some(fh_overrides.iter().map(|tlv_owned| tlv_owned.as_tlv()));
}
None
}
fn flow_label(&self) -> Option<Tlv> {
fn flow_label(&self) -> Option<Tlv<'_>> {
self.flow_label.as_ref().map(|tlv| tlv.as_tlv())
}
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv>> {
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv<'_>>> {
if let Some(requests) = &self.fs_requests {
return Some(requests.iter().map(|tlv_owned| tlv_owned.as_tlv()));
}
None
}
}
pub struct StaticPutRequestFields {
pub destination_id: UnsignedByteField,
/// Static buffer to store source file path.
pub source_file_buf: [u8; u8::MAX as usize],
/// Current source path length.
pub source_file_len: usize,
/// Static buffer to store dest file path.
pub dest_file_buf: [u8; u8::MAX as usize],
/// Current destination path length.
pub dest_file_len: usize,
pub trans_mode: Option<TransmissionMode>,
pub closure_requested: Option<bool>,
pub seg_ctrl: Option<SegmentationControl>,
}
impl Default for StaticPutRequestFields {
fn default() -> Self {
Self {
destination_id: UnsignedByteField::new(0, 0),
source_file_buf: [0; u8::MAX as usize],
source_file_len: Default::default(),
dest_file_buf: [0; u8::MAX as usize],
dest_file_len: Default::default(),
trans_mode: Default::default(),
closure_requested: Default::default(),
seg_ctrl: Default::default(),
}
}
}
impl StaticPutRequestFields {
pub fn clear(&mut self) {
self.destination_id = UnsignedByteField::new(0, 0);
self.source_file_len = 0;
self.dest_file_len = 0;
self.trans_mode = None;
self.closure_requested = None;
self.seg_ctrl = None;
}
}
/// This is a put request cache structure which can be used to cache [ReadablePutRequest]s
/// without requiring run-time allocation. The user must specify the static buffer sizes used
/// to store TLVs or list of TLVs.
pub struct StaticPutRequestCacher {
pub static_fields: StaticPutRequestFields,
opts_buf: alloc::vec::Vec<u8>,
opts_len: usize, // fs_request_start_end_pos: Option<(usize, usize)>
}
impl StaticPutRequestCacher {
pub fn new(max_len_opts_buf: usize) -> Self {
Self {
static_fields: StaticPutRequestFields::default(),
opts_buf: alloc::vec![0; max_len_opts_buf],
opts_len: 0,
}
}
pub fn set(
&mut self,
put_request: &impl ReadablePutRequest,
) -> Result<(), ByteConversionError> {
self.static_fields.destination_id = put_request.destination_id();
if let Some(source_file) = put_request.source_file() {
if source_file.len() > u8::MAX as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.static_fields.source_file_buf.len(),
expected: source_file.len(),
});
}
self.static_fields.source_file_buf[..source_file.len()]
.copy_from_slice(source_file.as_bytes());
self.static_fields.source_file_len = source_file.len();
}
if let Some(dest_file) = put_request.dest_file() {
if dest_file.len() > u8::MAX as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.static_fields.source_file_buf.len(),
expected: dest_file.len(),
});
}
self.static_fields.dest_file_buf[..dest_file.len()]
.copy_from_slice(dest_file.as_bytes());
self.static_fields.dest_file_len = dest_file.len();
}
self.static_fields.trans_mode = put_request.trans_mode();
self.static_fields.closure_requested = put_request.closure_requested();
self.static_fields.seg_ctrl = put_request.seg_ctrl();
let mut current_idx = 0;
let mut store_tlv = |tlv: &Tlv| {
if current_idx + tlv.len_full() > self.opts_buf.len() {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.opts_buf.len(),
expected: current_idx + tlv.len_full(),
});
}
// We checked the buffer lengths, so this should never fail.
tlv.write_to_bytes(&mut self.opts_buf[current_idx..current_idx + tlv.len_full()])
.unwrap();
current_idx += tlv.len_full();
Ok(())
};
if let Some(fs_req) = put_request.fs_requests() {
for fs_req in fs_req {
store_tlv(&fs_req)?;
}
}
if let Some(msgs_to_user) = put_request.msgs_to_user() {
for msg_to_user in msgs_to_user {
store_tlv(&msg_to_user)?;
}
}
self.opts_len = current_idx;
Ok(())
}
pub fn has_source_file(&self) -> bool {
self.static_fields.source_file_len > 0
}
pub fn has_dest_file(&self) -> bool {
self.static_fields.dest_file_len > 0
}
pub fn source_file(&self) -> Result<&str, Utf8Error> {
core::str::from_utf8(
&self.static_fields.source_file_buf[0..self.static_fields.source_file_len],
)
}
pub fn dest_file(&self) -> Result<&str, Utf8Error> {
core::str::from_utf8(
&self.static_fields.dest_file_buf[0..self.static_fields.dest_file_len],
)
}
pub fn opts_len(&self) -> usize {
self.opts_len
}
pub fn opts_slice(&self) -> &[u8] {
&self.opts_buf[0..self.opts_len]
}
/// This clears the cacher structure. This is a cheap operation because it only
/// sets [Option]al values to [None] and the length of stores TLVs to 0.
///
/// Please note that this method will not set the values in the buffer to 0.
pub fn clear(&mut self) {
self.static_fields.clear();
self.opts_len = 0;
}
}
}
#[cfg(test)]
@@ -557,7 +560,7 @@ mod tests {
use std::string::String;
use spacepackets::{
cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv},
cfdp::tlv::{ReadableTlv, msg_to_user::MsgToUserTlv},
util::UbfU16,
};
@@ -689,7 +692,7 @@ mod tests {
#[test]
fn test_put_request_cacher_basic() {
let put_request_cached = StaticPutRequestCacher::new(128);
let put_request_cached = StaticPutRequestCacher::<128>::new();
assert_eq!(put_request_cached.static_fields.source_file_len, 0);
assert_eq!(put_request_cached.static_fields.dest_file_len, 0);
assert_eq!(put_request_cached.opts_len(), 0);
@@ -698,7 +701,7 @@ mod tests {
#[test]
fn test_put_request_cacher_set() {
let mut put_request_cached = StaticPutRequestCacher::new(128);
let mut put_request_cached = StaticPutRequestCacher::<128>::new();
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =
@@ -720,7 +723,7 @@ mod tests {
#[test]
fn test_put_request_cacher_set_and_clear() {
let mut put_request_cached = StaticPutRequestCacher::new(128);
let mut put_request_cached = StaticPutRequestCacher::<128>::new();
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
use core::fmt::Debug;
/// Generic abstraction for a check/countdown timer.
pub trait CountdownProvider: Debug {
/// Generic abstraction for a check/countdown timer. Should also be cheap to copy and clone.
pub trait Countdown: Debug {
fn has_expired(&self) -> bool;
fn reset(&mut self);
}

View File

@@ -2,12 +2,12 @@
use spacepackets::cfdp::tlv::WritableTlv;
use spacepackets::{
cfdp::{
ConditionCode,
pdu::{
file_data::SegmentMetadata,
finished::{DeliveryCode, FileStatus},
},
tlv::msg_to_user::MsgToUserTlv,
ConditionCode,
},
util::UnsignedByteField,
};

View File

@@ -2,23 +2,27 @@
use std::{
fs::OpenOptions,
io::Write,
sync::{atomic::AtomicBool, mpsc, Arc},
sync::{
Arc,
atomic::{AtomicBool, AtomicU16},
mpsc,
},
thread,
time::Duration,
};
use cfdp::{
EntityType, FaultInfo, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHook,
dest::DestinationHandler,
filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher},
lost_segments::LostSegmentsList,
request::PutRequestOwned,
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
StdTimerCreator, TransactionId, UserFaultHookProvider,
};
use spacepackets::{
cfdp::{ChecksumType, ConditionCode, TransmissionMode},
seq_count::SeqCountProviderSyncU16,
util::UnsignedByteFieldU16,
};
@@ -30,43 +34,21 @@ const FILE_DATA: &str = "Hello World!";
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
impl UserFaultHook for ExampleFaultHandler {
fn notice_of_suspension_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected suspension, {:?}", fault_info);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn notice_of_cancellation_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected cancellation, {:?}", fault_info);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn abandoned_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected abandonment, {:?}", fault_info);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
fn ignore_cb(&mut self, fault_info: FaultInfo) {
panic!("unexpected ignore, {:?}", fault_info);
}
}
@@ -156,7 +138,7 @@ impl CfdpUser for ExampleCfdpUser {
}
}
fn end_to_end_test(with_closure: bool) {
fn end_to_end_test(transmission_mode: TransmissionMode, with_closure: bool) {
// Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone();
@@ -185,22 +167,19 @@ fn end_to_end_test(with_closure: bool) {
);
let (source_tx, source_rx) = mpsc::channel::<PduOwnedWithInfo>();
let (dest_tx, dest_rx) = mpsc::channel::<PduOwnedWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(),
1024,
with_closure,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
transmission_mode,
ChecksumType::Crc32,
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let seq_count_provider = AtomicU16::default();
let mut source_handler = SourceHandler::new(
local_cfg_source,
source_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg_of_dest,
StdTimerCreator::default(),
seq_count_provider,
@@ -217,16 +196,16 @@ fn end_to_end_test(with_closure: bool) {
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
transmission_mode,
ChecksumType::Crc32,
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tx,
NativeFilestore::default(),
remote_cfg_of_source,
StdTimerCreator::default(),
LostSegmentsList::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);
@@ -234,7 +213,7 @@ fn end_to_end_test(with_closure: bool) {
REMOTE_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged),
Some(transmission_mode),
Some(with_closure),
)
.expect("put request creation failed");
@@ -343,11 +322,16 @@ fn end_to_end_test(with_closure: bool) {
}
#[test]
fn end_to_end_test_no_closure() {
end_to_end_test(false);
fn end_to_end_unacknowledged_no_closure() {
end_to_end_test(TransmissionMode::Unacknowledged, false);
}
#[test]
fn end_to_end_test_with_closure() {
end_to_end_test(true);
fn end_to_end_unacknowledged_with_closure() {
end_to_end_test(TransmissionMode::Unacknowledged, true);
}
#[test]
fn end_to_end_acknowledged() {
end_to_end_test(TransmissionMode::Acknowledged, true);
}