init commit
All checks were successful
Rust/cfdp/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2024-08-20 11:50:13 +02:00
commit 23774c95b1
23 changed files with 8161 additions and 0 deletions

72
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,72 @@
name: ci
on: [push, pull_request]
jobs:
check:
name: Check build
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo check --release
test:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- name: Install nextest
uses: taiki-e/install-action@nextest
- run: cargo nextest run --all-features
- run: cargo test --doc
msrv:
name: Check MSRV
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.75.0
- run: cargo check --release
cross-check:
name: Check Cross-Compilation
runs-on: ubuntu-latest
strategy:
matrix:
target:
- armv7-unknown-linux-gnueabihf
- thumbv7em-none-eabihf
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
targets: "armv7-unknown-linux-gnueabihf, thumbv7em-none-eabihf"
- run: cargo check --release --target=${{matrix.target}} --no-default-features
fmt:
name: Check formatting
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo fmt --all -- --check
docs:
name: Check Documentation Build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
- run: RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" cargo +nightly doc --all-features
clippy:
name: Clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo clippy -- -D warnings

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
# Rust
/target
/Cargo.lock
# CLion
/.idea/*
!/.idea/runConfigurations

9
CHANGELOG.md Normal file
View File

@ -0,0 +1,9 @@
Change Log
=======
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]

63
Cargo.toml Normal file
View File

@ -0,0 +1,63 @@
[package]
name = "cfdp-rs"
version = "0.1.0"
edition = "2021"
rust-version = "1.75.0"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
description = "High level CCSDS File Delivery Protocol components"
homepage = "https://egit.irs.uni-stuttgart.de/rust/cfdp"
repository = "https://egit.irs.uni-stuttgart.de/rust/cfdp"
license = "Apache-2.0"
keywords = ["no-std", "space", "packets", "ccsds", "ecss"]
categories = ["aerospace", "aerospace::space-protocols", "no-std", "filesystem"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "cfdp"
[dependencies]
crc = "3"
smallvec = "1"
derive-new = "0.6"
[dependencies.thiserror]
version = "1"
optional = true
[dependencies.hashbrown]
version = "0.14"
optional = true
[dependencies.serde]
version = "1"
optional = true
[dependencies.spacepackets]
version = "0.12"
default-features = false
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets"
branch = "main"
[features]
default = ["std"]
std = [
"alloc",
"thiserror",
"spacepackets/std"
]
alloc = [
"hashbrown",
"spacepackets/alloc"
]
serde = ["dep:serde", "spacepackets/serde"]
[dev-dependencies]
tempfile = "3"
rand = "0.8"
log = "0.4"
fern = "0.6"
chrono = "0.4"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--generate-link-to-definition"]

201
LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

1
NOTICE Normal file
View File

@ -0,0 +1 @@
This software contains code developed at the University of Stuttgart's Institute of Space Systems.

49
README.md Normal file
View File

@ -0,0 +1,49 @@
[![Crates.io](https://img.shields.io/crates/v/cfdp-rs)](https://crates.io/crates/cfdp-rs)
[![docs.rs](https://img.shields.io/docsrs/cfdp-rs)](https://docs.rs/cfdp-rs)
[![ci](https://github.com/us-irs/cfdp-rs/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/us-irs/cfdp-rs/actions/workflows/ci.yml)
[![coverage](https://shields.io/endpoint?url=https://absatsw.irs.uni-stuttgart.de/projects/cfdp/coverage-rs/latest/coverage.json)](https://absatsw.irs.uni-stuttgart.de/projects/cfdp/coverage-rs/latest/index.html)
cfdp-rs - High level Rust crate for CFDP components
======================
The `cfdp-rs` Rust crate offers some high-level CCSDS File Delivery Protocol (CFDP) components to
perform file transfers according to the [CCSDS Blue Book 727.0-B-5](https://public.ccsds.org/Pubs/727x0b5.pdf).
The underlying base packet library used to generate the packets to be sent is the
[spacepackets](https://egit.irs.uni-stuttgart.de/rust/spacepackets) library.
# Features
`cfdp-rs` supports various runtime environments and is also suitable for `no_std` environments.
It is recommended to activate the `alloc` feature at the very least to allow using the primary
components provided by this crate. These components will only allocate memory at initialization
time and thus are still viable for systems where run-time allocation is prohibited.
## Default features
- [`std`](https://doc.rust-lang.org/std/): Enables functionality relying on the standard library.
- [`alloc`](https://doc.rust-lang.org/alloc/): Enables features which require allocation support.
Enabled by the `std` feature.
## Optional Features
- [`serde`](https://serde.rs/): Adds `serde` support for most types by adding `Serialize` and `Deserialize` `derive`s
- [`defmt`](https://defmt.ferrous-systems.com/): Add support for the `defmt` by adding the
[`defmt::Format`](https://defmt.ferrous-systems.com/format) derive on many types.
# Examples
You can check the [documentation](https://docs.rs/cfdp-rs) of individual modules for various usage
examples.
# Coverage
Coverage was generated using [`grcov`](https://github.com/mozilla/grcov). If you have not done so
already, install the `llvm-tools-preview`:
```sh
rustup component add llvm-tools-preview
cargo install grcov --locked
```
After that, you can simply run `coverage.py` to test the project with coverage. You can optionally
supply the `--open` flag to open the coverage report in your webbrowser.

28
automation/Dockerfile Normal file
View File

@ -0,0 +1,28 @@
# Run the following commands from root directory to build and run locally
# docker build -f automation/Dockerfile -t <NAME> .
# docker run -it <NAME>
FROM rust:latest
RUN apt-get update
RUN apt-get --yes upgrade
# tzdata is a dependency, won't install otherwise
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get --yes install rsync curl
# set CROSS_CONTAINER_IN_CONTAINER to inform `cross` that it is executed from within a container
ENV CROSS_CONTAINER_IN_CONTAINER=true
RUN rustup install nightly && \
rustup target add thumbv7em-none-eabihf armv7-unknown-linux-gnueabihf && \
rustup component add rustfmt clippy llvm-tools-preview
# Get grcov
RUN curl -sSL https://github.com/mozilla/grcov/releases/download/v0.8.19/grcov-x86_64-unknown-linux-gnu.tar.bz2 | tar -xj --directory /usr/local/bin
# Get nextest
RUN curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin
# SSH stuff to allow deployment to doc server
RUN adduser --uid 114 jenkins
# Add documentation server to known hosts
RUN echo "|1|/LzCV4BuTmTb2wKnD146l9fTKgQ=|NJJtVjvWbtRt8OYqFgcYRnMQyVw= ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBNL8ssTonYtgiR/6RRlSIK9WU1ywOcJmxFTLcEblAwH7oifZzmYq3XRfwXrgfMpylEfMFYfCU8JRqtmi19xc21A=" >> /etc/ssh/ssh_known_hosts
RUN echo "|1|CcBvBc3EG03G+XM5rqRHs6gK/Gg=|oGeJQ+1I8NGI2THIkJsW92DpTzs= ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBNL8ssTonYtgiR/6RRlSIK9WU1ywOcJmxFTLcEblAwH7oifZzmYq3XRfwXrgfMpylEfMFYfCU8JRqtmi19xc21A=" >> /etc/ssh/ssh_known_hosts

81
automation/Jenkinsfile vendored Normal file
View File

@ -0,0 +1,81 @@
pipeline {
agent {
dockerfile {
dir 'automation'
reuseNode true
args '--network host'
}
}
stages {
stage('Rust Toolchain Info') {
steps {
sh 'rustc --version'
}
}
stage('Clippy') {
steps {
sh 'cargo clippy'
}
}
stage('Docs') {
steps {
sh """
RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" cargo +nightly doc --all-features
"""
}
}
stage('Rustfmt') {
steps {
sh 'cargo fmt --all --check'
}
}
stage('Test') {
steps {
sh 'cargo nextest r --all-features'
sh 'cargo test --doc'
}
}
stage('Check with all features') {
steps {
sh 'cargo check --all-features'
}
}
stage('Check with no features') {
steps {
sh 'cargo check --no-default-features'
}
}
stage('Check Cross Embedded Bare Metal') {
steps {
sh 'cargo check --target thumbv7em-none-eabihf --no-default-features'
}
}
stage('Check Cross Embedded Linux') {
steps {
sh 'cargo check --target armv7-unknown-linux-gnueabihf'
}
}
stage('Run test with Coverage') {
when {
anyOf {
branch 'main';
branch pattern: 'cov-deployment*'
}
}
steps {
withEnv(['RUSTFLAGS=-Cinstrument-coverage', 'LLVM_PROFILE_FILE=target/coverage/%p-%m.profraw']) {
echo "Executing tests with coverage"
sh 'cargo clean'
sh 'cargo test --all-features'
sh 'grcov . -s . --binary-path ./target/debug -t html --branch --ignore-not-existing -o ./target/debug/coverage/'
sshagent(credentials: ['documentation-buildfix']) {
// Deploy to Apache webserver
sh 'rsync --mkpath -r --delete ./target/debug/coverage/ buildfix@documentation.irs.uni-stuttgart.de:/projects/cfdp/coverage-rs/latest/'
}
}
}
}
}
}

54
coverage.py Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
import os
import logging
import argparse
import webbrowser
_LOGGER = logging.getLogger()
def generate_cov_report(open_report: bool, format: str):
logging.basicConfig(level=logging.INFO)
os.environ["RUSTFLAGS"] = "-Cinstrument-coverage"
os.environ["LLVM_PROFILE_FILE"] = "target/coverage/%p-%m.profraw"
_LOGGER.info("Executing tests with coverage")
os.system("cargo test --all-features")
out_path = "./target/debug/coverage"
if format == "lcov":
out_path = "./target/debug/lcov.info"
os.system(
f"grcov . -s . --binary-path ./target/debug/ -t {format} --branch --ignore-not-existing "
f"-o {out_path}"
)
if format == "lcov":
os.system(
"genhtml -o ./target/debug/coverage/ --show-details --highlight --ignore-errors source "
"--legend ./target/debug/lcov.info"
)
if open_report:
coverage_report_path = os.path.abspath("./target/debug/coverage/index.html")
webbrowser.open_new_tab(coverage_report_path)
_LOGGER.info("Done")
def main():
parser = argparse.ArgumentParser(
description="Generate coverage report and optionally open it in a browser"
)
parser.add_argument(
"--open", action="store_true", help="Open the coverage report in a browser"
)
parser.add_argument(
"--format",
choices=["html", "lcov"],
default="html",
help="Choose report format (html or lcov)",
)
args = parser.parse_args()
generate_cov_report(args.open, args.format)
if __name__ == "__main__":
main()

1
examples/python-interop/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/venv

650
examples/python-interop/main.py Executable file
View File

@ -0,0 +1,650 @@
#!/usr/bin/env python3
from datetime import timedelta
from pathlib import Path
import ipaddress
import socket
import select
import threading
import argparse
import logging
import time
import copy
from threading import Thread, Event
from typing import Any, Dict, List, Tuple, Optional
from multiprocessing import Queue
from queue import Empty
from cfdppy.handler import DestHandler, RemoteEntityCfgTable, SourceHandler
from cfdppy.exceptions import InvalidDestinationId, SourceFileDoesNotExist
from cfdppy import (
CfdpUserBase,
LocalEntityCfg,
PacketDestination,
PutRequest,
TransactionId,
get_packet_destination,
CfdpState,
)
from cfdppy.mib import (
CheckTimerProvider,
DefaultFaultHandlerBase,
EntityType,
IndicationCfg,
RemoteEntityCfg,
)
from cfdppy.user import (
FileSegmentRecvdParams,
MetadataRecvParams,
TransactionFinishedParams,
TransactionParams,
)
from spacepackets.cfdp import ChecksumType, ConditionCode, TransmissionMode
from spacepackets.cfdp.pdu import AbstractFileDirectiveBase, PduFactory, PduHolder
from spacepackets.cfdp.tlv import (
MessageToUserTlv,
OriginatingTransactionId,
ProxyMessageType,
ProxyPutResponse,
ReservedCfdpMessage,
)
from spacepackets.cfdp.tlv.msg_to_user import ProxyPutResponseParams
from spacepackets.countdown import Countdown
from spacepackets.seqcount import SeqCountProvider
from spacepackets.util import ByteFieldU16, UnsignedByteField
from tmtccmd.config.cfdp import CfdpParams, generic_cfdp_params_to_put_request
from tmtccmd.config.args import add_cfdp_procedure_arguments, cfdp_args_to_cfdp_params
PYTHON_ENTITY_ID = ByteFieldU16(1)
RUST_ENTITY_ID = ByteFieldU16(2)
# Enable all indications for both local and remote entity.
INDICATION_CFG = IndicationCfg()
BASE_STR_SRC = "PY SRC"
BASE_STR_DEST = "PY DEST"
FILE_CONTENT = "Hello World!\n"
FILE_SEGMENT_SIZE = 256
MAX_PACKET_LEN = 512
# This queue is used to send put requests.
PUT_REQ_QUEUE = Queue()
# All telecommands which should go to the source handler should be put into this queue by
# the UDP server.
SOURCE_ENTITY_QUEUE = Queue()
# All telecommands which should go to the destination handler should be put into this queue by
# the UDP server.
DEST_ENTITY_QUEUE = Queue()
# All telemetry which should be sent to the remote entity is put into this queue and will then
# be sent by the UDP server.
TM_QUEUE = Queue()
REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg(
entity_id=PYTHON_ENTITY_ID,
max_packet_len=MAX_PACKET_LEN,
max_file_segment_len=FILE_SEGMENT_SIZE,
closure_requested=True,
crc_on_transmission=False,
default_transmission_mode=TransmissionMode.ACKNOWLEDGED,
crc_type=ChecksumType.CRC_32,
)
REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_PY_ENTITY)
REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = RUST_ENTITY_ID
RUST_PORT = 5111
PY_PORT = 5222
_LOGGER = logging.getLogger(__name__)
class UdpServer(Thread):
def __init__(
self,
sleep_time: float,
addr: Tuple[str, int],
explicit_remote_addr: Optional[Tuple[str, int]],
tx_queue: Queue,
source_entity_rx_queue: Queue,
dest_entity_rx_queue: Queue,
stop_signal: Event,
):
super().__init__()
self.sleep_time = sleep_time
self.udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
self.addr = addr
self.explicit_remote_addr = explicit_remote_addr
self.udp_socket.bind(addr)
self.tm_queue = tx_queue
self.last_sender = None
self.stop_signal = stop_signal
self.source_entity_queue = source_entity_rx_queue
self.dest_entity_queue = dest_entity_rx_queue
def run(self):
_LOGGER.info(f"Starting UDP server on {self.addr}")
while True:
if self.stop_signal.is_set():
break
self.periodic_operation()
time.sleep(self.sleep_time)
def periodic_operation(self):
while True:
next_packet = self.poll_next_udp_packet()
if next_packet is None or next_packet.pdu is None:
break
# Perform PDU routing.
packet_dest = get_packet_destination(next_packet.pdu)
_LOGGER.debug(f"UDP server: Routing {next_packet} to {packet_dest}")
if packet_dest == PacketDestination.DEST_HANDLER:
self.dest_entity_queue.put(next_packet.pdu)
elif packet_dest == PacketDestination.SOURCE_HANDLER:
self.source_entity_queue.put(next_packet.pdu)
self.send_packets()
def poll_next_udp_packet(self) -> Optional[PduHolder]:
ready = select.select([self.udp_socket], [], [], 0)
if ready[0]:
data, self.last_sender = self.udp_socket.recvfrom(4096)
return PduFactory.from_raw_to_holder(data)
return None
def send_packets(self):
while True:
try:
next_tm = self.tm_queue.get(False)
if not isinstance(next_tm, bytes) and not isinstance(
next_tm, bytearray
):
_LOGGER.error(
f"UDP server can only sent bytearray, received {next_tm}"
)
continue
if self.explicit_remote_addr is not None:
self.udp_socket.sendto(next_tm, self.explicit_remote_addr)
elif self.last_sender is not None:
self.udp_socket.sendto(next_tm, self.last_sender)
else:
_LOGGER.warning(
"UDP Server: No packet destination found, dropping TM"
)
except Empty:
break
class SourceEntityHandler(Thread):
def __init__(
self,
base_str: str,
verbose_level: int,
source_handler: SourceHandler,
put_req_queue: Queue,
source_entity_queue: Queue,
tm_queue: Queue,
stop_signal: Event,
):
super().__init__()
self.base_str = base_str
self.verbose_level = verbose_level
self.source_handler = source_handler
self.put_req_queue = put_req_queue
self.source_entity_queue = source_entity_queue
self.tm_queue = tm_queue
self.stop_signal = stop_signal
def _idle_handling(self) -> bool:
try:
put_req: PutRequest = self.put_req_queue.get(False)
_LOGGER.info(f"{self.base_str}: Handling Put Request: {put_req}")
if put_req.destination_id not in [PYTHON_ENTITY_ID, RUST_ENTITY_ID]:
_LOGGER.warning(
f"can only handle put requests target towards {RUST_ENTITY_ID} or "
f"{PYTHON_ENTITY_ID}"
)
else:
try:
self.source_handler.put_request(put_req)
except SourceFileDoesNotExist as e:
_LOGGER.warning(
f"can not handle put request, source file {e.file} does not exist"
)
return True
except Empty:
return False
def _busy_handling(self):
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
packet_received = False
packet = None
try:
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
packet = self.source_entity_queue.get(False)
packet_received = True
except Empty:
pass
try:
packet_sent = self._call_source_state_machine(packet)
# If there is no work to do, put the thread to sleep.
if not packet_received and not packet_sent:
return False
except SourceFileDoesNotExist:
_LOGGER.warning("Source file does not exist")
self.source_handler.reset()
def _call_source_state_machine(
self, packet: Optional[AbstractFileDirectiveBase]
) -> bool:
"""Returns whether a packet was sent."""
if packet is not None:
_LOGGER.debug(f"{self.base_str}: Inserting {packet}")
try:
fsm_result = self.source_handler.state_machine(packet)
except InvalidDestinationId as e:
_LOGGER.warning(
f"invalid destination ID {e.found_dest_id} on packet {packet}, expected "
f"{e.expected_dest_id}"
)
fsm_result = self.source_handler.state_machine(None)
packet_sent = False
if fsm_result.states.num_packets_ready > 0:
while fsm_result.states.num_packets_ready > 0:
next_pdu_wrapper = self.source_handler.get_next_packet()
assert next_pdu_wrapper is not None
if self.verbose_level >= 1:
_LOGGER.debug(
f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}"
)
# Send all packets which need to be sent.
self.tm_queue.put(next_pdu_wrapper.pack())
packet_sent = True
return packet_sent
def run(self):
_LOGGER.info(f"Starting {self.base_str}")
while True:
if self.stop_signal.is_set():
break
if self.source_handler.state == CfdpState.IDLE:
if not self._idle_handling():
time.sleep(0.2)
continue
if self.source_handler.state == CfdpState.BUSY:
if not self._busy_handling():
time.sleep(0.2)
class DestEntityHandler(Thread):
def __init__(
self,
base_str: str,
verbose_level: int,
dest_handler: DestHandler,
dest_entity_queue: Queue,
tm_queue: Queue,
stop_signal: Event,
):
super().__init__()
self.base_str = base_str
self.verbose_level = verbose_level
self.dest_handler = dest_handler
self.dest_entity_queue = dest_entity_queue
self.tm_queue = tm_queue
self.stop_signal = stop_signal
def run(self):
_LOGGER.info(
f"Starting {self.base_str}. Local ID {self.dest_handler.cfg.local_entity_id}"
)
while True:
packet_received = False
packet = None
if self.stop_signal.is_set():
break
try:
packet = self.dest_entity_queue.get(False)
packet_received = True
except Empty:
pass
if packet is not None:
_LOGGER.debug(f"{self.base_str}: Inserting {packet}")
fsm_result = self.dest_handler.state_machine(packet)
packet_sent = False
if fsm_result.states.num_packets_ready > 0:
while fsm_result.states.num_packets_ready > 0:
next_pdu_wrapper = self.dest_handler.get_next_packet()
assert next_pdu_wrapper is not None
if self.verbose_level >= 1:
_LOGGER.debug(
f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}"
)
self.tm_queue.put(next_pdu_wrapper.pack())
packet_sent = True
# If there is no work to do, put the thread to sleep.
if not packet_received and not packet_sent:
time.sleep(0.5)
class CfdpFaultHandler(DefaultFaultHandlerBase):
def __init__(self, base_str: str):
self.base_str = base_str
super().__init__()
def notice_of_suspension_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Received Notice of Suspension for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
def notice_of_cancellation_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Received Notice of Cancellation for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
def abandoned_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Abandoned fault for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
def ignore_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Ignored fault for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
class CfdpUser(CfdpUserBase):
def __init__(self, base_str: str, put_req_queue: Queue):
self.base_str = base_str
self.put_req_queue = put_req_queue
# This is a dictionary where the key is the current transaction ID for a transaction which
# was triggered by a proxy request with a originating ID.
self.active_proxy_put_reqs: Dict[TransactionId, TransactionId] = {}
super().__init__()
def transaction_indication(
self,
transaction_indication_params: TransactionParams,
):
"""This indication is used to report the transaction ID to the CFDP user"""
_LOGGER.info(
f"{self.base_str}: Transaction.indication for {transaction_indication_params.transaction_id}"
)
if transaction_indication_params.originating_transaction_id is not None:
_LOGGER.info(
f"Originating Transaction ID: {transaction_indication_params.originating_transaction_id}"
)
self.active_proxy_put_reqs.update(
{
transaction_indication_params.transaction_id: transaction_indication_params.originating_transaction_id
}
)
def eof_sent_indication(self, transaction_id: TransactionId):
_LOGGER.info(f"{self.base_str}: EOF-Sent.indication for {transaction_id}")
def transaction_finished_indication(self, params: TransactionFinishedParams):
_LOGGER.info(
f"{self.base_str}: Transaction-Finished.indication for {params.transaction_id}."
)
_LOGGER.info(f"Condition Code: {params.finished_params.condition_code!r}")
_LOGGER.info(f"Delivery Code: {params.finished_params.delivery_code!r}")
_LOGGER.info(f"File Status: {params.finished_params.file_status!r}")
if params.transaction_id in self.active_proxy_put_reqs:
proxy_put_response = ProxyPutResponse(
ProxyPutResponseParams.from_finished_params(params.finished_params)
).to_generic_msg_to_user_tlv()
originating_id = self.active_proxy_put_reqs.get(params.transaction_id)
assert originating_id is not None
put_req = PutRequest(
destination_id=originating_id.source_id,
source_file=None,
dest_file=None,
trans_mode=None,
closure_requested=None,
msgs_to_user=[
proxy_put_response,
OriginatingTransactionId(
originating_id
).to_generic_msg_to_user_tlv(),
],
)
_LOGGER.info(
f"Requesting Proxy Put Response concluding Proxy Put originating from "
f"{originating_id}"
)
self.put_req_queue.put(put_req)
self.active_proxy_put_reqs.pop(params.transaction_id)
def metadata_recv_indication(self, params: MetadataRecvParams):
_LOGGER.info(
f"{self.base_str}: Metadata-Recv.indication for {params.transaction_id}."
)
if params.msgs_to_user is not None:
self._handle_msgs_to_user(params.transaction_id, params.msgs_to_user)
def _handle_msgs_to_user(
self, transaction_id: TransactionId, msgs_to_user: List[MessageToUserTlv]
):
for msg_to_user in msgs_to_user:
if msg_to_user.is_reserved_cfdp_message():
reserved_msg_tlv = msg_to_user.to_reserved_msg_tlv()
assert reserved_msg_tlv is not None
self._handle_reserved_cfdp_message(transaction_id, reserved_msg_tlv)
else:
_LOGGER.info(f"Received custom message to user: {msg_to_user}")
def _handle_reserved_cfdp_message(
self, transaction_id: TransactionId, reserved_cfdp_msg: ReservedCfdpMessage
):
if reserved_cfdp_msg.is_cfdp_proxy_operation():
self._handle_cfdp_proxy_operation(transaction_id, reserved_cfdp_msg)
elif reserved_cfdp_msg.is_originating_transaction_id():
_LOGGER.info(
f"Received originating transaction ID: "
f"{reserved_cfdp_msg.get_originating_transaction_id()}"
)
def _handle_cfdp_proxy_operation(
self, transaction_id: TransactionId, reserved_cfdp_msg: ReservedCfdpMessage
):
if (
reserved_cfdp_msg.get_cfdp_proxy_message_type()
== ProxyMessageType.PUT_REQUEST
):
put_req_params = reserved_cfdp_msg.get_proxy_put_request_params()
_LOGGER.info(f"Received Proxy Put Request: {put_req_params}")
assert put_req_params is not None
put_req = PutRequest(
destination_id=put_req_params.dest_entity_id,
source_file=Path(put_req_params.source_file_as_path),
dest_file=Path(put_req_params.dest_file_as_path),
trans_mode=None,
closure_requested=None,
msgs_to_user=[
OriginatingTransactionId(
transaction_id
).to_generic_msg_to_user_tlv()
],
)
self.put_req_queue.put(put_req)
elif (
reserved_cfdp_msg.get_cfdp_proxy_message_type()
== ProxyMessageType.PUT_RESPONSE
):
put_response_params = reserved_cfdp_msg.get_proxy_put_response_params()
_LOGGER.info(f"Received Proxy Put Response: {put_response_params}")
def file_segment_recv_indication(self, params: FileSegmentRecvdParams):
_LOGGER.info(
f"{self.base_str}: File-Segment-Recv.indication for {params.transaction_id}."
)
def report_indication(self, transaction_id: TransactionId, status_report: Any):
# TODO: p.28 of the CFDP standard specifies what information the status report parameter
# could contain. I think it would be better to not hardcode the type of the status
# report here, but something like Union[any, CfdpStatusReport] with CfdpStatusReport
# being an implementation which supports all three information suggestions would be
# nice
pass
def suspended_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode
):
_LOGGER.info(
f"{self.base_str}: Suspended.indication for {transaction_id} | Condition Code: {cond_code}"
)
def resumed_indication(self, transaction_id: TransactionId, progress: int):
_LOGGER.info(
f"{self.base_str}: Resumed.indication for {transaction_id} | Progress: {progress} bytes"
)
def fault_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
_LOGGER.info(
f"{self.base_str}: Fault.indication for {transaction_id} | Condition Code: {cond_code} | "
f"Progress: {progress} bytes"
)
def abandoned_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
_LOGGER.info(
f"{self.base_str}: Abandoned.indication for {transaction_id} | Condition Code: {cond_code} |"
f" Progress: {progress} bytes"
)
def eof_recv_indication(self, transaction_id: TransactionId):
_LOGGER.info(f"{self.base_str}: EOF-Recv.indication for {transaction_id}")
class CustomCheckTimerProvider(CheckTimerProvider):
def provide_check_timer(
self,
local_entity_id: UnsignedByteField,
remote_entity_id: UnsignedByteField,
entity_type: EntityType,
) -> Countdown:
return Countdown(timedelta(seconds=5.0))
def main():
parser = argparse.ArgumentParser(prog="CFDP Local Entity Application")
parser.add_argument("-v", "--verbose", action="count", default=0)
add_cfdp_procedure_arguments(parser)
args = parser.parse_args()
stop_signal = threading.Event()
logging_level = logging.INFO
if args.verbose >= 1:
logging_level = logging.DEBUG
if args.source is not None and args.target is not None:
# Generate a put request from the CLI arguments.
cfdp_params = CfdpParams()
cfdp_args_to_cfdp_params(args, cfdp_params)
put_req = generic_cfdp_params_to_put_request(
cfdp_params, PYTHON_ENTITY_ID, RUST_ENTITY_ID, PYTHON_ENTITY_ID
)
PUT_REQ_QUEUE.put(put_req)
logging.basicConfig(level=logging_level)
remote_cfg_table = RemoteEntityCfgTable()
remote_cfg_table.add_config(REMOTE_CFG_OF_REMOTE_ENTITY)
src_fault_handler = CfdpFaultHandler(BASE_STR_SRC)
# 16 bit sequence count for transactions.
src_seq_count_provider = SeqCountProvider(16)
src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE)
check_timer_provider = CustomCheckTimerProvider()
source_handler = SourceHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
seq_num_provider=src_seq_count_provider,
remote_cfg_table=remote_cfg_table,
user=src_user,
check_timer_provider=check_timer_provider,
)
source_entity_task = SourceEntityHandler(
BASE_STR_SRC,
logging_level,
source_handler,
PUT_REQ_QUEUE,
SOURCE_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)
# Enable all indications.
dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST)
dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE)
dest_handler = DestHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
user=dest_user,
remote_cfg_table=remote_cfg_table,
check_timer_provider=check_timer_provider,
)
dest_entity_task = DestEntityHandler(
BASE_STR_DEST,
logging_level,
dest_handler,
DEST_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)
# Address Any to accept CFDP packets from other address than localhost.
local_addr = ipaddress.ip_address("0.0.0.0")
# Localhost as default.
remote_addr = ipaddress.ip_address("127.0.0.1")
"""
if Path(LOCAL_CFG_JSON_PATH).exists():
addr_from_cfg = parse_remote_addr_from_json(Path(LOCAL_CFG_JSON_PATH))
if addr_from_cfg is not None:
try:
remote_addr = ipaddress.ip_address(addr_from_cfg)
except ValueError:
_LOGGER.warning(f"invalid remote address {remote_addr} from JSON file")
"""
_LOGGER.info(f"Put request will be sent to remote destination {remote_addr}")
udp_server = UdpServer(
sleep_time=0.1,
addr=(str(local_addr), PY_PORT),
explicit_remote_addr=(str(remote_addr), RUST_PORT),
tx_queue=TM_QUEUE,
source_entity_rx_queue=SOURCE_ENTITY_QUEUE,
dest_entity_rx_queue=DEST_ENTITY_QUEUE,
stop_signal=stop_signal,
)
source_entity_task.start()
dest_entity_task.start()
udp_server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_signal.set()
source_entity_task.join()
dest_entity_task.join()
udp_server.join()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,486 @@
use std::{
fmt::Debug,
fs::OpenOptions,
io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::mpsc,
thread,
time::Duration,
};
use cfdp::{
dest::DestinationHandler,
filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
};
use log::{debug, info, warn};
use spacepackets::{
cfdp::{
pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError},
ChecksumType, ConditionCode, TransmissionMode,
},
seq_count::SeqCountProviderSyncU16,
util::{UnsignedByteFieldU16, UnsignedEnum},
};
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
const RUST_PORT: u16 = 5111;
const PY_PORT: u16 = 5222;
const LOG_LEVEL: log::LevelFilter = log::LevelFilter::Info;
const FILE_DATA: &str = "Hello World!";
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
}
pub struct ExampleCfdpUser {
entity_type: EntityType,
}
impl ExampleCfdpUser {
pub fn new(entity_type: EntityType) -> Self {
Self { entity_type }
}
}
impl CfdpUser for ExampleCfdpUser {
fn transaction_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: Transaction indication for {:?}",
self.entity_type, id
);
}
fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF sent for transaction {:?}",
self.entity_type, id
);
}
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
println!(
"{:?} entity: Transaction finished: {:?}",
self.entity_type, finished_params
);
}
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
println!(
"{:?} entity: Metadata received: {:?}",
self.entity_type, md_recvd_params
);
}
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
println!(
"{:?} entity: File segment {:?} received",
self.entity_type, segment_recvd_params
);
}
fn report_indication(&mut self, _id: &crate::TransactionId) {}
fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF received for transaction {:?}",
self.entity_type, id
);
}
}
pub struct UdpServer {
pub socket: UdpSocket,
recv_buf: Vec<u8>,
remote_addr: SocketAddr,
source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
}
#[derive(Debug, thiserror::Error)]
pub enum UdpServerError {
#[error(transparent)]
Io(#[from] io::Error),
#[error("pdu error: {0}")]
Pdu(#[from] PduError),
#[error("send error")]
Send,
}
impl UdpServer {
pub fn new<A: ToSocketAddrs>(
addr: A,
remote_addr: SocketAddr,
max_recv_size: usize,
source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
) -> Result<Self, io::Error> {
let server = Self {
socket: UdpSocket::bind(addr)?,
recv_buf: vec![0; max_recv_size],
source_tc_tx,
dest_tc_tx,
remote_addr,
source_tm_rx,
dest_tm_rx,
};
server.socket.set_nonblocking(true)?;
Ok(server)
}
pub fn try_recv_tc(
&mut self,
) -> Result<Option<(PduOwnedWithInfo, SocketAddr)>, UdpServerError> {
let res = match self.socket.recv_from(&mut self.recv_buf) {
Ok(res) => res,
Err(e) => {
return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut {
Ok(None)
} else {
Err(e.into())
}
}
};
let (_, from) = res;
self.remote_addr = from;
let pdu_owned = PduOwnedWithInfo::new_from_raw_packet(&self.recv_buf)?;
match pdu_owned.packet_target()? {
cfdp::PacketTarget::SourceEntity => {
self.source_tc_tx
.send(pdu_owned.clone())
.map_err(|_| UdpServerError::Send)?;
}
cfdp::PacketTarget::DestEntity => {
self.dest_tc_tx
.send(pdu_owned.clone())
.map_err(|_| UdpServerError::Send)?;
}
}
Ok(Some((pdu_owned, from)))
}
pub fn recv_and_send_telemetry(&mut self) {
let tm_handler = |receiver: &mpsc::Receiver<PduOwnedWithInfo>| {
while let Ok(tm) = receiver.try_recv() {
debug!("Sending PDU: {:?}", tm);
pdu_printout(&tm);
let result = self.socket.send_to(tm.pdu(), self.remote_addr());
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
};
tm_handler(&self.source_tm_rx);
tm_handler(&self.dest_tm_rx);
}
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}
fn pdu_printout(pdu: &PduOwnedWithInfo) {
match pdu.pdu_type() {
spacepackets::cfdp::PduType::FileDirective => match pdu.file_directive_type().unwrap() {
spacepackets::cfdp::pdu::FileDirectiveType::EofPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::FinishedPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
let meta_pdu =
MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed");
debug!("Metadata PDU: {:?}", meta_pdu)
}
spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::PromptPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
},
spacepackets::cfdp::PduType::FileData => {
let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed");
debug!("File data PDU: {:?}", fd_pdu);
}
}
}
fn main() {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{}[{}][{}] {}",
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
std::thread::current().name().expect("thread is not named"),
record.level(),
message
))
})
.level(LOG_LEVEL)
.chain(std::io::stdout())
.apply()
.unwrap();
let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut file = OpenOptions::new()
.write(true)
.open(&srcfile)
.expect("opening file failed");
info!("created test source file {:?}", srcfile);
file.write_all(FILE_DATA.as_bytes())
.expect("writing file content failed");
let destdir = tempfile::tempdir().expect("creating temp directory failed");
let destfile = destdir.path().join("test.txt");
let local_cfg_source = LocalEntityConfig::new(
RUST_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let (source_tm_tx, source_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
let (dest_tm_tx, dest_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg_python = RemoteEntityConfig::new_with_default_values(
PYTHON_ID.into(),
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32C,
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let mut source_handler = SourceHandler::new(
local_cfg_source,
source_tm_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg_python,
seq_count_provider,
);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending);
let local_cfg_dest = LocalEntityConfig::new(
RUST_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tm_tx,
NativeFilestore::default(),
remote_cfg_python,
StdCheckTimerCreator::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);
let put_request = PutRequestOwned::new_regular_request(
PYTHON_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged),
Some(true),
)
.expect("put request creation failed");
let (source_tc_tx, source_tc_rx) = mpsc::channel();
let (dest_tc_tx, dest_tc_rx) = mpsc::channel();
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT);
let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), PY_PORT);
let mut udp_server = UdpServer::new(
local_addr,
remote_addr,
2048,
source_tc_tx,
dest_tc_tx,
source_tm_rx,
dest_tm_rx,
)
.expect("creating UDP server failed");
let jh_source = thread::Builder::new()
.name("cfdp src entity".to_string())
.spawn(move || {
info!("Starting RUST SRC");
source_handler
.put_request(&put_request)
.expect("put request failed");
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match source_tc_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
warn!("cfdp src entity error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Source handler state machine possible in permanent loop");
}
}
})
.unwrap();
let jh_dest = thread::Builder::new()
.name("cfdp dest entity".to_string())
.spawn(move || {
info!("Starting RUST DEST. Local ID {}", RUST_ID.value());
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match dest_tc_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Destination handler state machine possible in permanent loop");
}
}
})
.unwrap();
let jh_udp_server = thread::Builder::new()
.name("cfdp udp server".to_string())
.spawn(move || {
info!("Starting UDP server on {}", remote_addr);
loop {
loop {
match udp_server.try_recv_tc() {
Ok(result) => match result {
Some((pdu, _addr)) => {
debug!("Received PDU on UDP server: {:?}", pdu);
pdu_printout(&pdu);
}
None => break,
},
Err(e) => {
warn!("UDP server error: {}", e);
break;
}
}
}
udp_server.recv_and_send_telemetry();
thread::sleep(Duration::from_millis(50));
}
})
.unwrap();
jh_source.join().unwrap();
jh_dest.join().unwrap();
jh_udp_server.join().unwrap();
}

View File

@ -0,0 +1,2 @@
cfdp-py @ git+https://github.com/us-irs/cfdp-py.git@main
tmtccmd == 8.0.2

25
release-checklist.md Normal file
View File

@ -0,0 +1,25 @@
Checklist for new releases
=======
# Pre-Release
1. Make sure any new modules are documented sufficiently enough and check docs with
`RUSTDOCFLAGS="--cfg docsrs --generate-link-to-definition -Z unstable-options" cargo +nightly doc --all-features --open`
or `cargo +nightly doc --all-features --config 'build.rustdocflags=["--cfg", "docsrs" --generate-link-to-definition"]' --open`
(was problematic on more recent nightly versions).
2. Bump version specifier in `Cargo.toml`.
3. Update `CHANGELOG.md`: Convert `unreleased` section into version section with date and add new
`unreleased` section.
4. Run `cargo test --all-features` or `cargo nextest r --all-features` together with
`cargo test --doc`.
5. Run `cargo fmt` and `cargo clippy`. Check `cargo msrv` against MSRV in `Cargo.toml`.
6. Wait for CI/CD results for EGit and Github. These also check cross-compilation for bare-metal
targets.
# Release
1. `cargo publish`
# Post-Release
1. Create a new release on `EGit` based on the release branch.

1591
src/dest.rs Normal file

File diff suppressed because it is too large Load Diff

805
src/filestore.rs Normal file
View File

@ -0,0 +1,805 @@
use alloc::string::{String, ToString};
use core::fmt::Display;
use spacepackets::cfdp::ChecksumType;
use spacepackets::ByteConversionError;
#[cfg(feature = "std")]
use std::error::Error;
use std::path::Path;
#[cfg(feature = "std")]
pub use std_mod::*;
#[derive(Debug, Clone)]
pub enum FilestoreError {
FileDoesNotExist,
FileAlreadyExists,
DirDoesNotExist,
Permission,
IsNotFile,
IsNotDirectory,
ByteConversion(ByteConversionError),
Io {
raw_errno: Option<i32>,
string: String,
},
ChecksumTypeNotImplemented(ChecksumType),
}
impl From<ByteConversionError> for FilestoreError {
fn from(value: ByteConversionError) -> Self {
Self::ByteConversion(value)
}
}
impl Display for FilestoreError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
FilestoreError::FileDoesNotExist => {
write!(f, "file does not exist")
}
FilestoreError::FileAlreadyExists => {
write!(f, "file already exists")
}
FilestoreError::DirDoesNotExist => {
write!(f, "directory does not exist")
}
FilestoreError::Permission => {
write!(f, "permission error")
}
FilestoreError::IsNotFile => {
write!(f, "is not a file")
}
FilestoreError::IsNotDirectory => {
write!(f, "is not a directory")
}
FilestoreError::ByteConversion(e) => {
write!(f, "filestore error: {e}")
}
FilestoreError::Io { raw_errno, string } => {
write!(
f,
"filestore generic IO error with raw errno {:?}: {}",
raw_errno, string
)
}
FilestoreError::ChecksumTypeNotImplemented(checksum_type) => {
write!(f, "checksum {:?} not implemented", checksum_type)
}
}
}
}
impl Error for FilestoreError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
FilestoreError::ByteConversion(e) => Some(e),
_ => None,
}
}
}
#[cfg(feature = "std")]
impl From<std::io::Error> for FilestoreError {
fn from(value: std::io::Error) -> Self {
Self::Io {
raw_errno: value.raw_os_error(),
string: value.to_string(),
}
}
}
pub trait VirtualFilestore {
fn create_file(&self, file_path: &str) -> Result<(), FilestoreError>;
fn remove_file(&self, file_path: &str) -> Result<(), FilestoreError>;
/// Truncating a file means deleting all its data so the resulting file is empty.
/// This can be more efficient than removing and re-creating a file.
fn truncate_file(&self, file_path: &str) -> Result<(), FilestoreError>;
fn remove_dir(&self, dir_path: &str, all: bool) -> Result<(), FilestoreError>;
fn create_dir(&self, dir_path: &str) -> Result<(), FilestoreError>;
fn read_data(
&self,
file_path: &str,
offset: u64,
read_len: u64,
buf: &mut [u8],
) -> Result<(), FilestoreError>;
fn write_data(&self, file: &str, offset: u64, buf: &[u8]) -> Result<(), FilestoreError>;
fn filename_from_full_path(path: &str) -> Option<&str>
where
Self: Sized,
{
// Convert the path string to a Path
let path = Path::new(path);
// Extract the file name using the file_name() method
path.file_name().and_then(|name| name.to_str())
}
fn is_file(&self, path: &str) -> Result<bool, FilestoreError>;
fn is_dir(&self, path: &str) -> Result<bool, FilestoreError> {
Ok(!self.is_file(path)?)
}
fn exists(&self, path: &str) -> Result<bool, FilestoreError>;
fn file_size(&self, path: &str) -> Result<u64, FilestoreError>;
/// This special function is the CFDP specific abstraction to calculate the checksum of a file.
/// This allows to keep OS specific details like reading the whole file in the most efficient
/// manner inside the file system abstraction.
///
/// The passed verification buffer argument will be used by the specific implementation as
/// a buffer to read the file into. It is recommended to use common buffer sizes like
/// 4096 or 8192 bytes.
fn calculate_checksum(
&self,
file_path: &str,
checksum_type: ChecksumType,
verification_buf: &mut [u8],
) -> Result<u32, FilestoreError>;
/// This special function is the CFDP specific abstraction to verify the checksum of a file.
/// This allows to keep OS specific details like reading the whole file in the most efficient
/// manner inside the file system abstraction.
///
/// The passed verification buffer argument will be used by the specific implementation as
/// a buffer to read the file into. It is recommended to use common buffer sizes like
/// 4096 or 8192 bytes.
fn checksum_verify(
&self,
file_path: &str,
checksum_type: ChecksumType,
expected_checksum: u32,
verification_buf: &mut [u8],
) -> Result<bool, FilestoreError> {
Ok(
self.calculate_checksum(file_path, checksum_type, verification_buf)?
== expected_checksum,
)
}
}
#[cfg(feature = "std")]
pub mod std_mod {
use crc::Crc;
use crate::{CRC_32, CRC_32C};
use super::*;
use std::{
fs::{self, File, OpenOptions},
io::{BufReader, Read, Seek, SeekFrom, Write},
};
#[derive(Default)]
pub struct NativeFilestore {}
impl VirtualFilestore for NativeFilestore {
fn create_file(&self, file_path: &str) -> Result<(), FilestoreError> {
if self.exists(file_path)? {
return Err(FilestoreError::FileAlreadyExists);
}
File::create(file_path)?;
Ok(())
}
fn remove_file(&self, file_path: &str) -> Result<(), FilestoreError> {
if !self.exists(file_path)? {
return Err(FilestoreError::FileDoesNotExist);
}
if !self.is_file(file_path)? {
return Err(FilestoreError::IsNotFile);
}
fs::remove_file(file_path)?;
Ok(())
}
fn truncate_file(&self, file_path: &str) -> Result<(), FilestoreError> {
if !self.exists(file_path)? {
return Err(FilestoreError::FileDoesNotExist);
}
if !self.is_file(file_path)? {
return Err(FilestoreError::IsNotFile);
}
OpenOptions::new()
.write(true)
.truncate(true)
.open(file_path)?;
Ok(())
}
fn create_dir(&self, dir_path: &str) -> Result<(), FilestoreError> {
fs::create_dir(dir_path).map_err(|e| FilestoreError::Io {
raw_errno: e.raw_os_error(),
string: e.to_string(),
})?;
Ok(())
}
fn remove_dir(&self, dir_path: &str, all: bool) -> Result<(), FilestoreError> {
if !self.exists(dir_path)? {
return Err(FilestoreError::DirDoesNotExist);
}
if !self.is_dir(dir_path)? {
return Err(FilestoreError::IsNotDirectory);
}
if !all {
fs::remove_dir(dir_path)?;
return Ok(());
}
fs::remove_dir_all(dir_path)?;
Ok(())
}
fn read_data(
&self,
file_name: &str,
offset: u64,
read_len: u64,
buf: &mut [u8],
) -> Result<(), FilestoreError> {
if buf.len() < read_len as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: buf.len(),
expected: read_len as usize,
}
.into());
}
if !self.exists(file_name)? {
return Err(FilestoreError::FileDoesNotExist);
}
if !self.is_file(file_name)? {
return Err(FilestoreError::IsNotFile);
}
let mut file = File::open(file_name)?;
file.seek(SeekFrom::Start(offset))?;
file.read_exact(&mut buf[0..read_len as usize])?;
Ok(())
}
fn write_data(&self, file: &str, offset: u64, buf: &[u8]) -> Result<(), FilestoreError> {
if !self.exists(file)? {
return Err(FilestoreError::FileDoesNotExist);
}
if !self.is_file(file)? {
return Err(FilestoreError::IsNotFile);
}
let mut file = OpenOptions::new().write(true).open(file)?;
file.seek(SeekFrom::Start(offset))?;
file.write_all(buf)?;
Ok(())
}
fn is_file(&self, str_path: &str) -> Result<bool, FilestoreError> {
let path = Path::new(str_path);
if !self.exists(str_path)? {
return Err(FilestoreError::FileDoesNotExist);
}
Ok(path.is_file())
}
fn exists(&self, path: &str) -> Result<bool, FilestoreError> {
let path = Path::new(path);
Ok(self.exists_internal(path))
}
fn file_size(&self, str_path: &str) -> Result<u64, FilestoreError> {
let path = Path::new(str_path);
if !self.exists_internal(path) {
return Err(FilestoreError::FileDoesNotExist);
}
if !path.is_file() {
return Err(FilestoreError::IsNotFile);
}
Ok(path.metadata()?.len())
}
fn calculate_checksum(
&self,
file_path: &str,
checksum_type: ChecksumType,
verification_buf: &mut [u8],
) -> Result<u32, FilestoreError> {
let mut calc_with_crc_lib = |crc: Crc<u32>| -> Result<u32, FilestoreError> {
let mut digest = crc.digest();
let file_to_check = File::open(file_path)?;
let mut buf_reader = BufReader::new(file_to_check);
loop {
let bytes_read = buf_reader.read(verification_buf)?;
if bytes_read == 0 {
break;
}
digest.update(&verification_buf[0..bytes_read]);
}
Ok(digest.finalize())
};
match checksum_type {
ChecksumType::Modular => self.calc_modular_checksum(file_path),
ChecksumType::Crc32 => calc_with_crc_lib(CRC_32),
ChecksumType::Crc32C => calc_with_crc_lib(CRC_32C),
ChecksumType::NullChecksum => Ok(0),
_ => Err(FilestoreError::ChecksumTypeNotImplemented(checksum_type)),
}
}
}
impl NativeFilestore {
pub fn calc_modular_checksum(&self, file_path: &str) -> Result<u32, FilestoreError> {
let mut checksum: u32 = 0;
let file = File::open(file_path)?;
let mut buf_reader = BufReader::new(file);
let mut buffer = [0; 4];
loop {
let bytes_read = buf_reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
// Perform padding directly in the buffer
(bytes_read..4).for_each(|i| {
buffer[i] = 0;
});
checksum = checksum.wrapping_add(u32::from_be_bytes(buffer));
}
Ok(checksum)
}
fn exists_internal(&self, path: &Path) -> bool {
if !path.exists() {
return false;
}
true
}
}
}
#[cfg(test)]
mod tests {
use std::{fs, path::Path, println};
use super::*;
use alloc::format;
use tempfile::tempdir;
const EXAMPLE_DATA_CFDP: [u8; 15] = [
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
];
const NATIVE_FS: NativeFilestore = NativeFilestore {};
#[test]
fn test_basic_native_filestore_create() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
let result =
NATIVE_FS.create_file(file_path.to_str().expect("getting str for file failed"));
assert!(result.is_ok());
let path = Path::new(&file_path);
assert!(path.exists());
assert!(NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS.is_file(file_path.to_str().unwrap()).unwrap());
}
#[test]
fn test_basic_native_fs_file_exists() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
assert!(!NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
assert!(NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS.is_file(file_path.to_str().unwrap()).unwrap());
}
#[test]
fn test_basic_native_fs_dir_exists() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let dir_path = tmpdir.path().join("testdir");
assert!(!NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
NATIVE_FS
.create_dir(dir_path.to_str().expect("getting str for file failed"))
.unwrap();
assert!(NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS
.is_dir(dir_path.as_path().to_str().unwrap())
.unwrap());
}
#[test]
fn test_basic_native_fs_remove_file() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.expect("creating file failed");
assert!(NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
NATIVE_FS
.remove_file(file_path.to_str().unwrap())
.expect("removing file failed");
assert!(!NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
}
#[test]
fn test_basic_native_fs_write() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
assert!(!NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
assert!(NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS.is_file(file_path.to_str().unwrap()).unwrap());
println!("{}", file_path.to_str().unwrap());
let write_data = "hello world\n";
NATIVE_FS
.write_data(file_path.to_str().unwrap(), 0, write_data.as_bytes())
.expect("writing to file failed");
let read_back = fs::read_to_string(file_path).expect("reading back data failed");
assert_eq!(read_back, write_data);
}
#[test]
fn test_basic_native_fs_read() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
assert!(!NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
assert!(NATIVE_FS.exists(file_path.to_str().unwrap()).unwrap());
assert!(NATIVE_FS.is_file(file_path.to_str().unwrap()).unwrap());
println!("{}", file_path.to_str().unwrap());
let write_data = "hello world\n";
NATIVE_FS
.write_data(file_path.to_str().unwrap(), 0, write_data.as_bytes())
.expect("writing to file failed");
let read_back = fs::read_to_string(file_path).expect("reading back data failed");
assert_eq!(read_back, write_data);
}
#[test]
fn test_truncate_file() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.expect("creating file failed");
fs::write(file_path.clone(), [1, 2, 3, 4]).unwrap();
assert_eq!(fs::read(file_path.clone()).unwrap(), [1, 2, 3, 4]);
NATIVE_FS
.truncate_file(file_path.to_str().unwrap())
.unwrap();
assert_eq!(fs::read(file_path.clone()).unwrap(), []);
}
#[test]
fn test_remove_dir() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let dir_path = tmpdir.path().join("testdir");
assert!(!NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
NATIVE_FS
.create_dir(dir_path.to_str().expect("getting str for file failed"))
.unwrap();
assert!(NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
NATIVE_FS
.remove_dir(dir_path.to_str().unwrap(), false)
.unwrap();
assert!(!NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
}
#[test]
fn test_read_file() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.expect("creating file failed");
fs::write(file_path.clone(), [1, 2, 3, 4]).unwrap();
let read_buf: &mut [u8] = &mut [0; 4];
NATIVE_FS
.read_data(file_path.to_str().unwrap(), 0, 4, read_buf)
.unwrap();
assert_eq!([1, 2, 3, 4], read_buf);
NATIVE_FS
.write_data(file_path.to_str().unwrap(), 4, &[5, 6, 7, 8])
.expect("writing to file failed");
NATIVE_FS
.read_data(file_path.to_str().unwrap(), 2, 4, read_buf)
.unwrap();
assert_eq!([3, 4, 5, 6], read_buf);
}
#[test]
fn test_remove_which_does_not_exist() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
let result = NATIVE_FS.read_data(file_path.to_str().unwrap(), 0, 4, &mut [0; 4]);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::FileDoesNotExist = error {
assert_eq!(error.to_string(), "file does not exist");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_file_already_exists() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
let result =
NATIVE_FS.create_file(file_path.to_str().expect("getting str for file failed"));
assert!(result.is_ok());
let result =
NATIVE_FS.create_file(file_path.to_str().expect("getting str for file failed"));
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::FileAlreadyExists = error {
assert_eq!(error.to_string(), "file already exists");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_remove_file_with_dir_api() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
let result = NATIVE_FS.remove_dir(file_path.to_str().unwrap(), true);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::IsNotDirectory = error {
assert_eq!(error.to_string(), "is not a directory");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_remove_dir_remove_all() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let dir_path = tmpdir.path().join("test");
NATIVE_FS
.create_dir(dir_path.to_str().expect("getting str for file failed"))
.unwrap();
let file_path = dir_path.as_path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
let result = NATIVE_FS.remove_dir(dir_path.to_str().unwrap(), true);
assert!(result.is_ok());
assert!(!NATIVE_FS.exists(dir_path.to_str().unwrap()).unwrap());
}
#[test]
fn test_remove_dir_with_file_api() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test");
NATIVE_FS
.create_dir(file_path.to_str().expect("getting str for file failed"))
.unwrap();
let result = NATIVE_FS.remove_file(file_path.to_str().unwrap());
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::IsNotFile = error {
assert_eq!(error.to_string(), "is not a file");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_remove_dir_which_does_not_exist() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test");
let result = NATIVE_FS.remove_dir(file_path.to_str().unwrap(), true);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::DirDoesNotExist = error {
assert_eq!(error.to_string(), "directory does not exist");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_remove_file_which_does_not_exist() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
let result = NATIVE_FS.remove_file(file_path.to_str().unwrap());
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::FileDoesNotExist = error {
assert_eq!(error.to_string(), "file does not exist");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_truncate_file_which_does_not_exist() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
let result = NATIVE_FS.truncate_file(file_path.to_str().unwrap());
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::FileDoesNotExist = error {
assert_eq!(error.to_string(), "file does not exist");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_truncate_file_on_directory() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test");
NATIVE_FS.create_dir(file_path.to_str().unwrap()).unwrap();
let result = NATIVE_FS.truncate_file(file_path.to_str().unwrap());
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::IsNotFile = error {
assert_eq!(error.to_string(), "is not a file");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_byte_conversion_error_when_reading() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
let result = NATIVE_FS.read_data(file_path.to_str().unwrap(), 0, 2, &mut []);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::ByteConversion(byte_conv_error) = error {
if let ByteConversionError::ToSliceTooSmall { found, expected } = byte_conv_error {
assert_eq!(found, 0);
assert_eq!(expected, 2);
} else {
panic!("unexpected error");
}
assert_eq!(
error.to_string(),
format!("filestore error: {}", byte_conv_error)
);
} else {
panic!("unexpected error");
}
}
#[test]
fn test_read_file_on_dir() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let dir_path = tmpdir.path().join("test");
NATIVE_FS
.create_dir(dir_path.to_str().expect("getting str for file failed"))
.unwrap();
let result = NATIVE_FS.read_data(dir_path.to_str().unwrap(), 0, 4, &mut [0; 4]);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::IsNotFile = error {
assert_eq!(error.to_string(), "is not a file");
} else {
panic!("unexpected error");
}
}
#[test]
fn test_write_file_non_existing() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
let result = NATIVE_FS.write_data(file_path.to_str().unwrap(), 0, &[]);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::FileDoesNotExist = error {
} else {
panic!("unexpected error");
}
}
#[test]
fn test_write_file_on_dir() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test");
NATIVE_FS.create_dir(file_path.to_str().unwrap()).unwrap();
let result = NATIVE_FS.write_data(file_path.to_str().unwrap(), 0, &[]);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::IsNotFile = error {
} else {
panic!("unexpected error");
}
}
#[test]
fn test_filename_extraction() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("test.txt");
NATIVE_FS
.create_file(file_path.to_str().expect("getting str for file failed"))
.unwrap();
NativeFilestore::filename_from_full_path(file_path.to_str().unwrap());
}
#[test]
fn test_modular_checksum() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("mod-crc.bin");
fs::write(file_path.as_path(), EXAMPLE_DATA_CFDP).expect("writing test file failed");
// Kind of re-writing the modular checksum impl here which we are trying to test, but the
// numbers/correctness were verified manually using calculators, so this is okay.
let mut checksum: u32 = 0;
let mut buffer: [u8; 4] = [0; 4];
for i in 0..3 {
buffer = EXAMPLE_DATA_CFDP[i * 4..(i + 1) * 4].try_into().unwrap();
checksum = checksum.wrapping_add(u32::from_be_bytes(buffer));
}
buffer[0..3].copy_from_slice(&EXAMPLE_DATA_CFDP[12..15]);
buffer[3] = 0;
checksum = checksum.wrapping_add(u32::from_be_bytes(buffer));
let mut verif_buf: [u8; 32] = [0; 32];
let result = NATIVE_FS.checksum_verify(
file_path.to_str().unwrap(),
ChecksumType::Modular,
checksum,
&mut verif_buf,
);
assert!(result.is_ok());
}
#[test]
fn test_null_checksum_impl() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("mod-crc.bin");
// The file to check does not even need to exist, and the verification buffer can be
// empty: the null checksum is always yields the same result.
let result = NATIVE_FS.checksum_verify(
file_path.to_str().unwrap(),
ChecksumType::NullChecksum,
0,
&mut [],
);
assert!(result.is_ok());
assert!(result.unwrap());
}
#[test]
fn test_checksum_not_implemented() {
let tmpdir = tempdir().expect("creating tmpdir failed");
let file_path = tmpdir.path().join("mod-crc.bin");
// The file to check does not even need to exist, and the verification buffer can be
// empty: the null checksum is always yields the same result.
let result = NATIVE_FS.checksum_verify(
file_path.to_str().unwrap(),
ChecksumType::Crc32Proximity1,
0,
&mut [],
);
assert!(result.is_err());
let error = result.unwrap_err();
if let FilestoreError::ChecksumTypeNotImplemented(cksum_type) = error {
assert_eq!(
error.to_string(),
format!("checksum {:?} not implemented", cksum_type)
);
} else {
panic!("unexpected error");
}
}
}

1473
src/lib.rs Normal file

File diff suppressed because it is too large Load Diff

777
src/request.rs Normal file
View File

@ -0,0 +1,777 @@
use spacepackets::{
cfdp::{
tlv::{GenericTlv, Tlv, TlvType},
SegmentationControl, TransmissionMode,
},
util::UnsignedByteField,
};
#[cfg(feature = "alloc")]
pub use alloc_mod::*;
#[derive(Debug, PartialEq, Eq)]
pub struct FilePathTooLarge(pub usize);
/// This trait is an abstraction for different Put Request structures which can be used
/// by Put Request consumers.
pub trait ReadablePutRequest {
fn destination_id(&self) -> UnsignedByteField;
fn source_file(&self) -> Option<&str>;
fn dest_file(&self) -> Option<&str>;
fn trans_mode(&self) -> Option<TransmissionMode>;
fn closure_requested(&self) -> Option<bool>;
fn seg_ctrl(&self) -> Option<SegmentationControl>;
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv>>;
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv>>;
fn flow_label(&self) -> Option<Tlv>;
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv>>;
}
#[derive(Debug, PartialEq, Eq)]
pub struct PutRequest<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_label, 'fs_requests> {
pub destination_id: UnsignedByteField,
source_file: Option<&'src_file str>,
dest_file: Option<&'dest_file str>,
pub trans_mode: Option<TransmissionMode>,
pub closure_requested: Option<bool>,
pub seg_ctrl: Option<SegmentationControl>,
pub msgs_to_user: Option<&'msgs_to_user [Tlv<'msgs_to_user>]>,
pub fault_handler_overrides: Option<&'fh_ovrds [Tlv<'fh_ovrds>]>,
pub flow_label: Option<Tlv<'flow_label>>,
pub fs_requests: Option<&'fs_requests [Tlv<'fs_requests>]>,
}
impl<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_label, 'fs_requests>
PutRequest<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_label, 'fs_requests>
{
#[allow(clippy::too_many_arguments)]
pub fn new(
destination_id: UnsignedByteField,
source_file: Option<&'src_file str>,
dest_file: Option<&'dest_file str>,
trans_mode: Option<TransmissionMode>,
closure_requested: Option<bool>,
seg_ctrl: Option<SegmentationControl>,
msgs_to_user: Option<&'msgs_to_user [Tlv<'msgs_to_user>]>,
fault_handler_overrides: Option<&'fh_ovrds [Tlv<'fh_ovrds>]>,
flow_label: Option<Tlv<'flow_label>>,
fs_requests: Option<&'fs_requests [Tlv<'fs_requests>]>,
) -> Result<Self, FilePathTooLarge> {
generic_path_checks(source_file, dest_file)?;
Ok(Self {
destination_id,
source_file,
dest_file,
trans_mode,
closure_requested,
seg_ctrl,
msgs_to_user,
fault_handler_overrides,
flow_label,
fs_requests,
})
}
}
impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> {
fn destination_id(&self) -> UnsignedByteField {
self.destination_id
}
fn source_file(&self) -> Option<&str> {
self.source_file
}
fn dest_file(&self) -> Option<&str> {
self.dest_file
}
fn trans_mode(&self) -> Option<TransmissionMode> {
self.trans_mode
}
fn closure_requested(&self) -> Option<bool> {
self.closure_requested
}
fn seg_ctrl(&self) -> Option<SegmentationControl> {
self.seg_ctrl
}
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv>> {
if let Some(msgs_to_user) = self.msgs_to_user {
return Some(msgs_to_user.iter().copied());
}
None
}
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv>> {
if let Some(fh_overrides) = self.fault_handler_overrides {
return Some(fh_overrides.iter().copied());
}
None
}
fn flow_label(&self) -> Option<Tlv> {
self.flow_label
}
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv>> {
if let Some(fs_requests) = self.msgs_to_user {
return Some(fs_requests.iter().copied());
}
None
}
}
pub fn generic_path_checks(
source_file: Option<&str>,
dest_file: Option<&str>,
) -> Result<(), FilePathTooLarge> {
if let Some(src_file) = source_file {
if src_file.len() > u8::MAX as usize {
return Err(FilePathTooLarge(src_file.len()));
}
}
if let Some(dest_file) = dest_file {
if dest_file.len() > u8::MAX as usize {
return Err(FilePathTooLarge(dest_file.len()));
}
}
Ok(())
}
impl<'src_file, 'dest_file> PutRequest<'src_file, 'dest_file, 'static, 'static, 'static, 'static> {
pub fn new_regular_request(
dest_id: UnsignedByteField,
source_file: &'src_file str,
dest_file: &'dest_file str,
trans_mode: Option<TransmissionMode>,
closure_requested: Option<bool>,
) -> Result<Self, FilePathTooLarge> {
generic_path_checks(Some(source_file), Some(dest_file))?;
Ok(Self {
destination_id: dest_id,
source_file: Some(source_file),
dest_file: Some(dest_file),
trans_mode,
closure_requested,
seg_ctrl: None,
msgs_to_user: None,
fault_handler_overrides: None,
flow_label: None,
fs_requests: None,
})
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct TlvWithInvalidType(pub(crate) ());
impl<'msgs_to_user> PutRequest<'static, 'static, 'msgs_to_user, 'static, 'static, 'static> {
pub fn new_msgs_to_user_only(
dest_id: UnsignedByteField,
msgs_to_user: &'msgs_to_user [Tlv<'msgs_to_user>],
) -> Result<Self, TlvWithInvalidType> {
Ok(Self {
destination_id: dest_id,
source_file: None,
dest_file: None,
trans_mode: None,
closure_requested: None,
seg_ctrl: None,
msgs_to_user: Some(msgs_to_user),
fault_handler_overrides: None,
flow_label: None,
fs_requests: None,
})
}
/// Uses [generic_tlv_list_type_check] to check the TLV type validity of all TLV fields.
pub fn check_tlv_type_validities(&self) -> bool {
generic_tlv_list_type_check(self.msgs_to_user, TlvType::MsgToUser);
if let Some(flow_label) = &self.flow_label {
if flow_label.tlv_type().is_none() {
return false;
}
if flow_label.tlv_type().unwrap() != TlvType::FlowLabel {
return false;
}
}
generic_tlv_list_type_check(self.fault_handler_overrides, TlvType::FaultHandler);
generic_tlv_list_type_check(self.fs_requests, TlvType::FilestoreRequest);
true
}
}
pub fn generic_tlv_list_type_check<TlvProvider: GenericTlv>(
opt_tlvs: Option<&[TlvProvider]>,
tlv_type: TlvType,
) -> bool {
if let Some(tlvs) = opt_tlvs {
for tlv in tlvs {
if tlv.tlv_type().is_none() {
return false;
}
if tlv.tlv_type().unwrap() != tlv_type {
return false;
}
}
}
true
}
#[cfg(feature = "alloc")]
pub mod alloc_mod {
use core::str::Utf8Error;
use super::*;
use alloc::string::ToString;
use spacepackets::{
cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv, TlvOwned, WritableTlv},
ByteConversionError,
};
/// Owned variant of [PutRequest] with no lifetimes which is also [Clone]able.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PutRequestOwned {
pub destination_id: UnsignedByteField,
source_file: Option<alloc::string::String>,
dest_file: Option<alloc::string::String>,
pub trans_mode: Option<TransmissionMode>,
pub closure_requested: Option<bool>,
pub seg_ctrl: Option<SegmentationControl>,
pub msgs_to_user: Option<alloc::vec::Vec<TlvOwned>>,
pub fault_handler_overrides: Option<alloc::vec::Vec<TlvOwned>>,
pub flow_label: Option<TlvOwned>,
pub fs_requests: Option<alloc::vec::Vec<TlvOwned>>,
}
impl PutRequestOwned {
pub fn new_regular_request(
dest_id: UnsignedByteField,
source_file: &str,
dest_file: &str,
trans_mode: Option<TransmissionMode>,
closure_requested: Option<bool>,
) -> Result<Self, FilePathTooLarge> {
if source_file.len() > u8::MAX as usize {
return Err(FilePathTooLarge(source_file.len()));
}
if dest_file.len() > u8::MAX as usize {
return Err(FilePathTooLarge(dest_file.len()));
}
Ok(Self {
destination_id: dest_id,
source_file: Some(source_file.to_string()),
dest_file: Some(dest_file.to_string()),
trans_mode,
closure_requested,
seg_ctrl: None,
msgs_to_user: None,
fault_handler_overrides: None,
flow_label: None,
fs_requests: None,
})
}
pub fn new_msgs_to_user_only(
dest_id: UnsignedByteField,
msgs_to_user: &[MsgToUserTlv<'_>],
) -> Result<Self, TlvWithInvalidType> {
Ok(Self {
destination_id: dest_id,
source_file: None,
dest_file: None,
trans_mode: None,
closure_requested: None,
seg_ctrl: None,
msgs_to_user: Some(msgs_to_user.iter().map(|msg| msg.tlv.to_owned()).collect()),
fault_handler_overrides: None,
flow_label: None,
fs_requests: None,
})
}
/// Uses [generic_tlv_list_type_check] to check the TLV type validity of all TLV fields.
pub fn check_tlv_type_validities(&self) -> bool {
generic_tlv_list_type_check(self.msgs_to_user.as_deref(), TlvType::MsgToUser);
if let Some(flow_label) = &self.flow_label {
if flow_label.tlv_type().is_none() {
return false;
}
if flow_label.tlv_type().unwrap() != TlvType::FlowLabel {
return false;
}
}
generic_tlv_list_type_check(
self.fault_handler_overrides.as_deref(),
TlvType::FaultHandler,
);
generic_tlv_list_type_check(self.fs_requests.as_deref(), TlvType::FilestoreRequest);
true
}
}
impl From<PutRequest<'_, '_, '_, '_, '_, '_>> for PutRequestOwned {
fn from(req: PutRequest<'_, '_, '_, '_, '_, '_>) -> Self {
Self {
destination_id: req.destination_id,
source_file: req.source_file.map(|s| s.into()),
dest_file: req.dest_file.map(|s| s.into()),
trans_mode: req.trans_mode,
closure_requested: req.closure_requested,
seg_ctrl: req.seg_ctrl,
msgs_to_user: req
.msgs_to_user
.map(|msgs_to_user| msgs_to_user.iter().map(|msg| msg.to_owned()).collect()),
fault_handler_overrides: req
.msgs_to_user
.map(|fh_overides| fh_overides.iter().map(|msg| msg.to_owned()).collect()),
flow_label: req
.flow_label
.map(|flow_label_tlv| flow_label_tlv.to_owned()),
fs_requests: req
.fs_requests
.map(|fs_requests| fs_requests.iter().map(|msg| msg.to_owned()).collect()),
}
}
}
impl ReadablePutRequest for PutRequestOwned {
fn destination_id(&self) -> UnsignedByteField {
self.destination_id
}
fn source_file(&self) -> Option<&str> {
self.source_file.as_deref()
}
fn dest_file(&self) -> Option<&str> {
self.dest_file.as_deref()
}
fn trans_mode(&self) -> Option<TransmissionMode> {
self.trans_mode
}
fn closure_requested(&self) -> Option<bool> {
self.closure_requested
}
fn seg_ctrl(&self) -> Option<SegmentationControl> {
self.seg_ctrl
}
fn msgs_to_user(&self) -> Option<impl Iterator<Item = Tlv>> {
if let Some(msgs_to_user) = &self.msgs_to_user {
return Some(msgs_to_user.iter().map(|tlv_owned| tlv_owned.as_tlv()));
}
None
}
fn fault_handler_overrides(&self) -> Option<impl Iterator<Item = Tlv>> {
if let Some(fh_overrides) = &self.fault_handler_overrides {
return Some(fh_overrides.iter().map(|tlv_owned| tlv_owned.as_tlv()));
}
None
}
fn flow_label(&self) -> Option<Tlv> {
self.flow_label.as_ref().map(|tlv| tlv.as_tlv())
}
fn fs_requests(&self) -> Option<impl Iterator<Item = Tlv>> {
if let Some(requests) = &self.fs_requests {
return Some(requests.iter().map(|tlv_owned| tlv_owned.as_tlv()));
}
None
}
}
pub struct StaticPutRequestFields {
pub destination_id: UnsignedByteField,
/// Static buffer to store source file path.
pub source_file_buf: [u8; u8::MAX as usize],
/// Current source path length.
pub source_file_len: usize,
/// Static buffer to store dest file path.
pub dest_file_buf: [u8; u8::MAX as usize],
/// Current destination path length.
pub dest_file_len: usize,
pub trans_mode: Option<TransmissionMode>,
pub closure_requested: Option<bool>,
pub seg_ctrl: Option<SegmentationControl>,
}
impl Default for StaticPutRequestFields {
fn default() -> Self {
Self {
destination_id: UnsignedByteField::new(0, 0),
source_file_buf: [0; u8::MAX as usize],
source_file_len: Default::default(),
dest_file_buf: [0; u8::MAX as usize],
dest_file_len: Default::default(),
trans_mode: Default::default(),
closure_requested: Default::default(),
seg_ctrl: Default::default(),
}
}
}
impl StaticPutRequestFields {
pub fn clear(&mut self) {
self.destination_id = UnsignedByteField::new(0, 0);
self.source_file_len = 0;
self.dest_file_len = 0;
self.trans_mode = None;
self.closure_requested = None;
self.seg_ctrl = None;
}
}
/// This is a put request cache structure which can be used to cache [ReadablePutRequest]s
/// without requiring run-time allocation. The user must specify the static buffer sizes used
/// to store TLVs or list of TLVs.
pub struct StaticPutRequestCacher {
pub static_fields: StaticPutRequestFields,
opts_buf: alloc::vec::Vec<u8>,
opts_len: usize, // fs_request_start_end_pos: Option<(usize, usize)>
}
impl StaticPutRequestCacher {
pub fn new(max_len_opts_buf: usize) -> Self {
Self {
static_fields: StaticPutRequestFields::default(),
opts_buf: alloc::vec![0; max_len_opts_buf],
opts_len: 0,
}
}
pub fn set(
&mut self,
put_request: &impl ReadablePutRequest,
) -> Result<(), ByteConversionError> {
self.static_fields.destination_id = put_request.destination_id();
if let Some(source_file) = put_request.source_file() {
if source_file.len() > u8::MAX as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.static_fields.source_file_buf.len(),
expected: source_file.len(),
});
}
self.static_fields.source_file_buf[..source_file.len()]
.copy_from_slice(source_file.as_bytes());
self.static_fields.source_file_len = source_file.len();
}
if let Some(dest_file) = put_request.dest_file() {
if dest_file.len() > u8::MAX as usize {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.static_fields.source_file_buf.len(),
expected: dest_file.len(),
});
}
self.static_fields.dest_file_buf[..dest_file.len()]
.copy_from_slice(dest_file.as_bytes());
self.static_fields.dest_file_len = dest_file.len();
}
self.static_fields.trans_mode = put_request.trans_mode();
self.static_fields.closure_requested = put_request.closure_requested();
self.static_fields.seg_ctrl = put_request.seg_ctrl();
let mut current_idx = 0;
let mut store_tlv = |tlv: &Tlv| {
if current_idx + tlv.len_full() > self.opts_buf.len() {
return Err(ByteConversionError::ToSliceTooSmall {
found: self.opts_buf.len(),
expected: current_idx + tlv.len_full(),
});
}
// We checked the buffer lengths, so this should never fail.
tlv.write_to_bytes(&mut self.opts_buf[current_idx..current_idx + tlv.len_full()])
.unwrap();
current_idx += tlv.len_full();
Ok(())
};
if let Some(fs_req) = put_request.fs_requests() {
for fs_req in fs_req {
store_tlv(&fs_req)?;
}
}
if let Some(msgs_to_user) = put_request.msgs_to_user() {
for msg_to_user in msgs_to_user {
store_tlv(&msg_to_user)?;
}
}
self.opts_len = current_idx;
Ok(())
}
pub fn has_source_file(&self) -> bool {
self.static_fields.source_file_len > 0
}
pub fn has_dest_file(&self) -> bool {
self.static_fields.dest_file_len > 0
}
pub fn source_file(&self) -> Result<&str, Utf8Error> {
core::str::from_utf8(
&self.static_fields.source_file_buf[0..self.static_fields.source_file_len],
)
}
pub fn dest_file(&self) -> Result<&str, Utf8Error> {
core::str::from_utf8(
&self.static_fields.dest_file_buf[0..self.static_fields.dest_file_len],
)
}
pub fn opts_len(&self) -> usize {
self.opts_len
}
pub fn opts_slice(&self) -> &[u8] {
&self.opts_buf[0..self.opts_len]
}
/// This clears the cacher structure. This is a cheap operation because it only
/// sets [Option]al values to [None] and the length of stores TLVs to 0.
///
/// Please note that this method will not set the values in the buffer to 0.
pub fn clear(&mut self) {
self.static_fields.clear();
self.opts_len = 0;
}
}
}
#[cfg(test)]
mod tests {
use std::string::String;
use spacepackets::{
cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv},
util::UbfU16,
};
use super::*;
pub const DEST_ID: UbfU16 = UbfU16::new(5);
#[test]
fn test_put_request_basic() {
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request = PutRequest::new(
DEST_ID.into(),
Some(src_file),
Some(dest_file),
None,
None,
None,
None,
None,
None,
None,
)
.unwrap();
let identical_request =
PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None)
.unwrap();
assert_eq!(put_request, identical_request);
}
#[test]
fn test_put_request_path_checks_source_too_long() {
let mut invalid_path = String::from("/tmp/");
invalid_path += "a".repeat(u8::MAX as usize).as_str();
let dest_file = "/tmp/hello2.txt";
let error =
PutRequest::new_regular_request(DEST_ID.into(), &invalid_path, dest_file, None, None);
assert!(error.is_err());
let error = error.unwrap_err();
assert_eq!(u8::MAX as usize + 5, error.0);
}
#[test]
fn test_put_request_path_checks_dest_file_too_long() {
let mut invalid_path = String::from("/tmp/");
invalid_path += "a".repeat(u8::MAX as usize).as_str();
let source_file = "/tmp/hello2.txt";
let error =
PutRequest::new_regular_request(DEST_ID.into(), source_file, &invalid_path, None, None);
assert!(error.is_err());
let error = error.unwrap_err();
assert_eq!(u8::MAX as usize + 5, error.0);
}
#[test]
fn test_owned_put_request_path_checks_source_too_long() {
let mut invalid_path = String::from("/tmp/");
invalid_path += "a".repeat(u8::MAX as usize).as_str();
let dest_file = "/tmp/hello2.txt";
let error = PutRequestOwned::new_regular_request(
DEST_ID.into(),
&invalid_path,
dest_file,
None,
None,
);
assert!(error.is_err());
let error = error.unwrap_err();
assert_eq!(u8::MAX as usize + 5, error.0);
}
#[test]
fn test_owned_put_request_path_checks_dest_file_too_long() {
let mut invalid_path = String::from("/tmp/");
invalid_path += "a".repeat(u8::MAX as usize).as_str();
let source_file = "/tmp/hello2.txt";
let error = PutRequestOwned::new_regular_request(
DEST_ID.into(),
source_file,
&invalid_path,
None,
None,
);
assert!(error.is_err());
let error = error.unwrap_err();
assert_eq!(u8::MAX as usize + 5, error.0);
}
#[test]
fn test_put_request_basic_small_ctor() {
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =
PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None)
.unwrap();
assert_eq!(put_request.source_file(), Some(src_file));
assert_eq!(put_request.dest_file(), Some(dest_file));
assert_eq!(put_request.destination_id(), DEST_ID.into());
assert_eq!(put_request.seg_ctrl(), None);
assert_eq!(put_request.closure_requested(), None);
assert_eq!(put_request.trans_mode(), None);
assert!(put_request.fs_requests().is_none());
assert!(put_request.msgs_to_user().is_none());
assert!(put_request.fault_handler_overrides().is_none());
assert!(put_request.flow_label().is_none());
}
#[test]
fn test_put_request_owned_basic() {
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =
PutRequestOwned::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None)
.unwrap();
assert_eq!(put_request.source_file(), Some(src_file));
assert_eq!(put_request.dest_file(), Some(dest_file));
assert_eq!(put_request.destination_id(), DEST_ID.into());
assert_eq!(put_request.seg_ctrl(), None);
assert_eq!(put_request.closure_requested(), None);
assert_eq!(put_request.trans_mode(), None);
assert!(put_request.flow_label().is_none());
assert!(put_request.fs_requests().is_none());
assert!(put_request.msgs_to_user().is_none());
assert!(put_request.fault_handler_overrides().is_none());
assert!(put_request.flow_label().is_none());
let put_request_cloned = put_request.clone();
assert_eq!(put_request, put_request_cloned);
}
#[test]
fn test_put_request_cacher_basic() {
let put_request_cached = StaticPutRequestCacher::new(128);
assert_eq!(put_request_cached.static_fields.source_file_len, 0);
assert_eq!(put_request_cached.static_fields.dest_file_len, 0);
assert_eq!(put_request_cached.opts_len(), 0);
assert_eq!(put_request_cached.opts_slice(), &[]);
}
#[test]
fn test_put_request_cacher_set() {
let mut put_request_cached = StaticPutRequestCacher::new(128);
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =
PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None)
.unwrap();
put_request_cached.set(&put_request).unwrap();
assert_eq!(
put_request_cached.static_fields.source_file_len,
src_file.len()
);
assert_eq!(
put_request_cached.static_fields.dest_file_len,
dest_file.len()
);
assert_eq!(put_request_cached.source_file().unwrap(), src_file);
assert_eq!(put_request_cached.dest_file().unwrap(), dest_file);
assert_eq!(put_request_cached.opts_len(), 0);
}
#[test]
fn test_put_request_cacher_set_and_clear() {
let mut put_request_cached = StaticPutRequestCacher::new(128);
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =
PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None)
.unwrap();
put_request_cached.set(&put_request).unwrap();
put_request_cached.clear();
assert_eq!(put_request_cached.static_fields.source_file_len, 0);
assert_eq!(put_request_cached.static_fields.dest_file_len, 0);
assert_eq!(put_request_cached.opts_len(), 0);
}
#[test]
fn test_messages_to_user_ctor_owned() {
let msg_to_user = MsgToUserTlv::new(&[1, 2, 3]).expect("creating message to user failed");
let put_request = PutRequestOwned::new_msgs_to_user_only(DEST_ID.into(), &[msg_to_user])
.expect("creating msgs to user only put request failed");
let msg_to_user_iter = put_request.msgs_to_user();
assert!(msg_to_user_iter.is_some());
assert!(put_request.check_tlv_type_validities());
let msg_to_user_iter = msg_to_user_iter.unwrap();
for msg_to_user_tlv in msg_to_user_iter {
assert_eq!(msg_to_user_tlv.value(), msg_to_user.value());
assert_eq!(msg_to_user_tlv.tlv_type().unwrap(), TlvType::MsgToUser);
}
}
#[test]
fn test_messages_to_user_ctor() {
let msg_to_user = MsgToUserTlv::new(&[1, 2, 3]).expect("creating message to user failed");
let binding = &[msg_to_user.to_tlv()];
let put_request = PutRequest::new_msgs_to_user_only(DEST_ID.into(), binding)
.expect("creating msgs to user only put request failed");
let msg_to_user_iter = put_request.msgs_to_user();
assert!(put_request.check_tlv_type_validities());
assert!(msg_to_user_iter.is_some());
let msg_to_user_iter = msg_to_user_iter.unwrap();
for msg_to_user_tlv in msg_to_user_iter {
assert_eq!(msg_to_user_tlv.value(), msg_to_user.value());
assert_eq!(msg_to_user_tlv.tlv_type().unwrap(), TlvType::MsgToUser);
}
}
#[test]
fn test_put_request_to_owned() {
let src_file = "/tmp/hello.txt";
let dest_file = "/tmp/hello2.txt";
let put_request =
PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None)
.unwrap();
let put_request_owned: PutRequestOwned = put_request.into();
assert_eq!(put_request_owned.destination_id(), DEST_ID.into());
assert_eq!(put_request_owned.source_file().unwrap(), src_file);
assert_eq!(put_request_owned.dest_file().unwrap(), dest_file);
assert!(put_request_owned.msgs_to_user().is_none());
assert!(put_request_owned.trans_mode().is_none());
assert!(put_request_owned.closure_requested().is_none());
}
}

1329
src/source.rs Normal file

File diff suppressed because it is too large Load Diff

7
src/time.rs Normal file
View File

@ -0,0 +1,7 @@
use core::fmt::Debug;
/// Generic abstraction for a check/countdown timer.
pub trait CountdownProvider: Debug {
fn has_expired(&self) -> bool;
fn reset(&mut self);
}

98
src/user.rs Normal file
View File

@ -0,0 +1,98 @@
#[cfg(feature = "alloc")]
use spacepackets::cfdp::tlv::WritableTlv;
use spacepackets::{
cfdp::{
pdu::{
file_data::SegmentMetadata,
finished::{DeliveryCode, FileStatus},
},
tlv::msg_to_user::MsgToUserTlv,
ConditionCode,
},
util::UnsignedByteField,
};
use super::TransactionId;
#[derive(Debug, Copy, Clone)]
pub struct TransactionFinishedParams {
pub id: TransactionId,
pub condition_code: ConditionCode,
pub delivery_code: DeliveryCode,
pub file_status: FileStatus,
}
#[derive(Debug)]
pub struct MetadataReceivedParams<'src_file, 'dest_file, 'msgs_to_user> {
pub id: TransactionId,
pub source_id: UnsignedByteField,
pub file_size: u64,
pub src_file_name: &'src_file str,
pub dest_file_name: &'dest_file str,
pub msgs_to_user: &'msgs_to_user [MsgToUserTlv<'msgs_to_user>],
}
#[cfg(feature = "alloc")]
#[derive(Debug)]
pub struct OwnedMetadataRecvdParams {
pub id: TransactionId,
pub source_id: UnsignedByteField,
pub file_size: u64,
pub src_file_name: alloc::string::String,
pub dest_file_name: alloc::string::String,
pub msgs_to_user: alloc::vec::Vec<alloc::vec::Vec<u8>>,
}
#[cfg(feature = "alloc")]
impl From<MetadataReceivedParams<'_, '_, '_>> for OwnedMetadataRecvdParams {
fn from(value: MetadataReceivedParams) -> Self {
Self::from(&value)
}
}
#[cfg(feature = "alloc")]
impl From<&MetadataReceivedParams<'_, '_, '_>> for OwnedMetadataRecvdParams {
fn from(value: &MetadataReceivedParams) -> Self {
Self {
id: value.id,
source_id: value.source_id,
file_size: value.file_size,
src_file_name: value.src_file_name.into(),
dest_file_name: value.dest_file_name.into(),
msgs_to_user: value.msgs_to_user.iter().map(|tlv| tlv.to_vec()).collect(),
}
}
}
#[derive(Debug)]
pub struct FileSegmentRecvdParams<'seg_meta> {
pub id: TransactionId,
pub offset: u64,
pub length: usize,
pub segment_metadata: Option<&'seg_meta SegmentMetadata<'seg_meta>>,
}
pub trait CfdpUser {
fn transaction_indication(&mut self, id: &TransactionId);
fn eof_sent_indication(&mut self, id: &TransactionId);
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams);
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams);
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams);
// TODO: The standard does not strictly specify how the report information looks..
fn report_indication(&mut self, id: &TransactionId);
fn suspended_indication(&mut self, id: &TransactionId, condition_code: ConditionCode);
fn resumed_indication(&mut self, id: &TransactionId, progress: u64);
fn fault_indication(
&mut self,
id: &TransactionId,
condition_code: ConditionCode,
progress: u64,
);
fn abandoned_indication(
&mut self,
id: &TransactionId,
condition_code: ConditionCode,
progress: u64,
);
fn eof_recvd_indication(&mut self, id: &TransactionId);
}

352
tests/end-to-end.rs Normal file
View File

@ -0,0 +1,352 @@
//! This is an end-to-end integration tests using the CFDP abstractions provided by the library.
use std::{
fs::OpenOptions,
io::Write,
sync::{atomic::AtomicBool, mpsc, Arc},
thread,
time::Duration,
};
use cfdp::{
dest::DestinationHandler,
filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, RemoteEntityConfig,
StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
};
use spacepackets::{
cfdp::{ChecksumType, ConditionCode, TransmissionMode},
seq_count::SeqCountProviderSyncU16,
util::UnsignedByteFieldU16,
};
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
const FILE_DATA: &str = "Hello World!";
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
}
pub struct ExampleCfdpUser {
entity_type: EntityType,
completion_signal: Arc<AtomicBool>,
}
impl ExampleCfdpUser {
pub fn new(entity_type: EntityType, completion_signal: Arc<AtomicBool>) -> Self {
Self {
entity_type,
completion_signal,
}
}
}
impl CfdpUser for ExampleCfdpUser {
fn transaction_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: Transaction indication for {:?}",
self.entity_type, id
);
}
fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF sent for transaction {:?}",
self.entity_type, id
);
}
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
println!(
"{:?} entity: Transaction finished: {:?}",
self.entity_type, finished_params
);
self.completion_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
}
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
println!(
"{:?} entity: Metadata received: {:?}",
self.entity_type, md_recvd_params
);
}
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
println!(
"{:?} entity: File segment {:?} received",
self.entity_type, segment_recvd_params
);
}
fn report_indication(&mut self, _id: &crate::TransactionId) {}
fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF received for transaction {:?}",
self.entity_type, id
);
}
}
fn end_to_end_test(with_closure: bool) {
// Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone();
let stop_signal_ctrl = stop_signal_source.clone();
let completion_signal_source = Arc::new(AtomicBool::new(false));
let completion_signal_source_main = completion_signal_source.clone();
let completion_signal_dest = Arc::new(AtomicBool::new(false));
let completion_signal_dest_main = completion_signal_dest.clone();
let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut file = OpenOptions::new()
.write(true)
.open(&srcfile)
.expect("opening file failed");
file.write_all(FILE_DATA.as_bytes())
.expect("writing file content failed");
let destdir = tempfile::tempdir().expect("creating temp directory failed");
let destfile = destdir.path().join("test.txt");
let local_cfg_source = LocalEntityConfig::new(
LOCAL_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let (source_tx, source_rx) = mpsc::channel::<PduOwnedWithInfo>();
let (dest_tx, dest_rx) = mpsc::channel::<PduOwnedWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(),
1024,
with_closure,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let mut source_handler = SourceHandler::new(
local_cfg_source,
source_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg_of_dest,
seq_count_provider,
);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source);
let local_cfg_dest = LocalEntityConfig::new(
REMOTE_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let remote_cfg_of_source = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(),
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tx,
NativeFilestore::default(),
remote_cfg_of_source,
StdCheckTimerCreator::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged),
Some(with_closure),
)
.expect("put request creation failed");
let start = std::time::Instant::now();
let jh_source = thread::spawn(move || {
source_handler
.put_request(&put_request)
.expect("put request failed");
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match dest_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Source handler state machine possible in permanent loop");
}
}
});
let jh_dest = thread::spawn(move || {
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match source_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Destination handler state machine possible in permanent loop");
}
}
});
loop {
if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed)
&& completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed)
{
let file = std::fs::read_to_string(destfile).expect("reading file failed");
assert_eq!(file, FILE_DATA);
// Stop the threads gracefully.
stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
if std::time::Instant::now() - start > Duration::from_secs(2) {
panic!("file transfer not finished in 2 seconds");
}
std::thread::sleep(Duration::from_millis(50));
}
jh_source.join().unwrap();
jh_dest.join().unwrap();
}
#[test]
fn end_to_end_test_no_closure() {
end_to_end_test(false);
}
#[test]
fn end_to_end_test_with_closure() {
end_to_end_test(true);
}