init commit
All checks were successful
Rust/cfdp/pipeline/head This commit looks good

This commit is contained in:
2024-08-20 11:50:13 +02:00
commit b87ccde07d
24 changed files with 9299 additions and 0 deletions

1
examples/python-interop/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/venv

View File

@ -0,0 +1,37 @@
Python Interoperability Example for cfdp-rs
=======
This example application showcases the interoperability of the CFDP handlers written in Rust
with a Python implementation which uses [cfdp-py](https://github.com/us-irs/cfdp-py) library.
Both the Rust and the Python app exchange packet data units via a UDP interface and launch
both a destination and source handler. As such, they are both able to send and receive files.
Both applications can be started with the command line argument `-f` to initiate a file transfer.
You can run both applications with `-h` to get more information about the available options.
## Running the Python App
It is recommended to run the Python App in a dedicated virtual environment. For example, on a
Unix system you can use `python3 -m venv venv` and then `source venv/bin/activate` to create
and activate a virtual environment.
After that, you can install the required dependencies using
```sh
pip install -r requirements.txt
```
and then run the application using `./main.py` or `python3 main.py`.
It is recommended to run `./main.py -h` first to get an overview of some possible options.
Running the Python App with `./main.py -f` will cause the Python App to start a file copy operation
with fixed temporary paths.
## Running the Rust App
You can run the Rust application using `cargo`, for example `cargo run --example python-interop`.
It is recommended to run `cargo run --example python-interop -- -h` to get an overview of some
possible launch options.
Running the Rust App with `cargo run --example python-interop -- -f` will cause the Rust app to
start a file copy operation with fixed temporary paths.

682
examples/python-interop/main.py Executable file
View File

@ -0,0 +1,682 @@
#!/usr/bin/env python3
from datetime import timedelta
from pathlib import Path
import os
import ipaddress
import tempfile
import socket
import select
import threading
import argparse
import logging
import time
import copy
from threading import Thread, Event
from typing import Any, Dict, List, Tuple, Optional
from multiprocessing import Queue
from queue import Empty
from cfdppy.handler import DestHandler, RemoteEntityCfgTable, SourceHandler
from cfdppy.exceptions import InvalidDestinationId, SourceFileDoesNotExist
from cfdppy import (
CfdpUserBase,
LocalEntityCfg,
PacketDestination,
PutRequest,
TransactionId,
get_packet_destination,
CfdpState,
)
from cfdppy.mib import (
CheckTimerProvider,
DefaultFaultHandlerBase,
EntityType,
IndicationCfg,
RemoteEntityCfg,
)
from cfdppy.user import (
FileSegmentRecvdParams,
MetadataRecvParams,
TransactionFinishedParams,
TransactionParams,
)
from spacepackets.cfdp import ChecksumType, ConditionCode, TransmissionMode
from spacepackets.cfdp.pdu import AbstractFileDirectiveBase, PduFactory, PduHolder
from spacepackets.cfdp.tlv import (
MessageToUserTlv,
OriginatingTransactionId,
ProxyMessageType,
ProxyPutResponse,
ReservedCfdpMessage,
)
from spacepackets.cfdp.tlv.msg_to_user import ProxyPutResponseParams
from spacepackets.countdown import Countdown
from spacepackets.seqcount import SeqCountProvider
from spacepackets.util import ByteFieldU16, UnsignedByteField
PYTHON_ENTITY_ID = ByteFieldU16(1)
RUST_ENTITY_ID = ByteFieldU16(2)
# Enable all indications for both local and remote entity.
INDICATION_CFG = IndicationCfg()
BASE_STR_SRC = "PY SRC"
BASE_STR_DEST = "PY DEST"
FILE_CONTENT = "Hello World!\n"
FILE_SEGMENT_SIZE = 256
MAX_PACKET_LEN = 512
# This queue is used to send put requests.
PUT_REQ_QUEUE = Queue()
# All telecommands which should go to the source handler should be put into this queue by
# the UDP server.
SOURCE_ENTITY_QUEUE = Queue()
# All telecommands which should go to the destination handler should be put into this queue by
# the UDP server.
DEST_ENTITY_QUEUE = Queue()
# All telemetry which should be sent to the remote entity is put into this queue and will then
# be sent by the UDP server.
TM_QUEUE = Queue()
REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg(
entity_id=PYTHON_ENTITY_ID,
max_packet_len=MAX_PACKET_LEN,
max_file_segment_len=FILE_SEGMENT_SIZE,
closure_requested=True,
crc_on_transmission=False,
default_transmission_mode=TransmissionMode.ACKNOWLEDGED,
crc_type=ChecksumType.CRC_32,
)
REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_PY_ENTITY)
REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = RUST_ENTITY_ID
RUST_PORT = 5111
PY_PORT = 5222
_LOGGER = logging.getLogger(__name__)
class UdpServer(Thread):
def __init__(
self,
sleep_time: float,
addr: Tuple[str, int],
explicit_remote_addr: Optional[Tuple[str, int]],
tx_queue: Queue,
source_entity_rx_queue: Queue,
dest_entity_rx_queue: Queue,
stop_signal: Event,
):
super().__init__()
self.sleep_time = sleep_time
self.udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
self.addr = addr
self.explicit_remote_addr = explicit_remote_addr
self.udp_socket.bind(addr)
self.tm_queue = tx_queue
self.last_sender = None
self.stop_signal = stop_signal
self.source_entity_queue = source_entity_rx_queue
self.dest_entity_queue = dest_entity_rx_queue
def run(self):
_LOGGER.info(f"Starting UDP server on {self.addr}")
while True:
if self.stop_signal.is_set():
break
self.periodic_operation()
time.sleep(self.sleep_time)
def periodic_operation(self):
while True:
next_packet = self.poll_next_udp_packet()
if next_packet is None or next_packet.pdu is None:
break
# Perform PDU routing.
packet_dest = get_packet_destination(next_packet.pdu)
_LOGGER.debug(f"UDP server: Routing {next_packet} to {packet_dest}")
if packet_dest == PacketDestination.DEST_HANDLER:
self.dest_entity_queue.put(next_packet.pdu)
elif packet_dest == PacketDestination.SOURCE_HANDLER:
self.source_entity_queue.put(next_packet.pdu)
self.send_packets()
def poll_next_udp_packet(self) -> Optional[PduHolder]:
ready = select.select([self.udp_socket], [], [], 0)
if ready[0]:
data, self.last_sender = self.udp_socket.recvfrom(4096)
return PduFactory.from_raw_to_holder(data)
return None
def send_packets(self):
while True:
try:
next_tm = self.tm_queue.get(False)
if not isinstance(next_tm, bytes) and not isinstance(
next_tm, bytearray
):
_LOGGER.error(
f"UDP server can only sent bytearray, received {next_tm}"
)
continue
if self.explicit_remote_addr is not None:
self.udp_socket.sendto(next_tm, self.explicit_remote_addr)
elif self.last_sender is not None:
self.udp_socket.sendto(next_tm, self.last_sender)
else:
_LOGGER.warning(
"UDP Server: No packet destination found, dropping TM"
)
except Empty:
break
class SourceEntityHandler(Thread):
def __init__(
self,
base_str: str,
verbose_level: int,
source_handler: SourceHandler,
put_req_queue: Queue,
source_entity_queue: Queue,
tm_queue: Queue,
stop_signal: Event,
):
super().__init__()
self.base_str = base_str
self.verbose_level = verbose_level
self.source_handler = source_handler
self.put_req_queue = put_req_queue
self.source_entity_queue = source_entity_queue
self.tm_queue = tm_queue
self.stop_signal = stop_signal
def _idle_handling(self) -> bool:
try:
put_req: PutRequest = self.put_req_queue.get(False)
_LOGGER.info(f"{self.base_str}: Handling Put Request: {put_req}")
if put_req.destination_id not in [PYTHON_ENTITY_ID, RUST_ENTITY_ID]:
_LOGGER.warning(
f"can only handle put requests target towards {RUST_ENTITY_ID} or "
f"{PYTHON_ENTITY_ID}"
)
else:
try:
self.source_handler.put_request(put_req)
except SourceFileDoesNotExist as e:
_LOGGER.warning(
f"can not handle put request, source file {e.file} does not exist"
)
return True
except Empty:
return False
def _busy_handling(self):
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
packet_received = False
packet = None
try:
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
packet = self.source_entity_queue.get(False)
packet_received = True
except Empty:
pass
try:
packet_sent = self._call_source_state_machine(packet)
# If there is no work to do, put the thread to sleep.
if not packet_received and not packet_sent:
return False
except SourceFileDoesNotExist:
_LOGGER.warning("Source file does not exist")
self.source_handler.reset()
def _call_source_state_machine(
self, packet: Optional[AbstractFileDirectiveBase]
) -> bool:
"""Returns whether a packet was sent."""
if packet is not None:
_LOGGER.debug(f"{self.base_str}: Inserting {packet}")
try:
fsm_result = self.source_handler.state_machine(packet)
except InvalidDestinationId as e:
_LOGGER.warning(
f"invalid destination ID {e.found_dest_id} on packet {packet}, expected "
f"{e.expected_dest_id}"
)
fsm_result = self.source_handler.state_machine(None)
packet_sent = False
if fsm_result.states.num_packets_ready > 0:
while fsm_result.states.num_packets_ready > 0:
next_pdu_wrapper = self.source_handler.get_next_packet()
assert next_pdu_wrapper is not None
if self.verbose_level >= 1:
_LOGGER.debug(
f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}"
)
# Send all packets which need to be sent.
self.tm_queue.put(next_pdu_wrapper.pack())
packet_sent = True
return packet_sent
def run(self):
_LOGGER.info(f"Starting {self.base_str}")
while True:
if self.stop_signal.is_set():
break
if self.source_handler.state == CfdpState.IDLE:
if not self._idle_handling():
time.sleep(0.2)
continue
if self.source_handler.state == CfdpState.BUSY:
if not self._busy_handling():
time.sleep(0.2)
class DestEntityHandler(Thread):
def __init__(
self,
base_str: str,
verbose_level: int,
dest_handler: DestHandler,
dest_entity_queue: Queue,
tm_queue: Queue,
stop_signal: Event,
):
super().__init__()
self.base_str = base_str
self.verbose_level = verbose_level
self.dest_handler = dest_handler
self.dest_entity_queue = dest_entity_queue
self.tm_queue = tm_queue
self.stop_signal = stop_signal
def run(self):
_LOGGER.info(
f"Starting {self.base_str}. Local ID {self.dest_handler.cfg.local_entity_id}"
)
while True:
packet_received = False
packet = None
if self.stop_signal.is_set():
break
try:
packet = self.dest_entity_queue.get(False)
packet_received = True
except Empty:
pass
if packet is not None:
_LOGGER.debug(f"{self.base_str}: Inserting {packet}")
fsm_result = self.dest_handler.state_machine(packet)
packet_sent = False
if fsm_result.states.num_packets_ready > 0:
while fsm_result.states.num_packets_ready > 0:
next_pdu_wrapper = self.dest_handler.get_next_packet()
assert next_pdu_wrapper is not None
if self.verbose_level >= 1:
_LOGGER.debug(
f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}"
)
self.tm_queue.put(next_pdu_wrapper.pack())
packet_sent = True
# If there is no work to do, put the thread to sleep.
if not packet_received and not packet_sent:
time.sleep(0.5)
class CfdpFaultHandler(DefaultFaultHandlerBase):
def __init__(self, base_str: str):
self.base_str = base_str
super().__init__()
def notice_of_suspension_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Received Notice of Suspension for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
def notice_of_cancellation_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Received Notice of Cancellation for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
def abandoned_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Abandoned fault for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
def ignore_cb(
self, transaction_id: TransactionId, cond: ConditionCode, progress: int
):
_LOGGER.warning(
f"{self.base_str}: Ignored fault for transaction {transaction_id!r} "
f"with condition code {cond!r}. Progress: {progress}"
)
class CfdpUser(CfdpUserBase):
def __init__(self, base_str: str, put_req_queue: Queue):
self.base_str = base_str
self.put_req_queue = put_req_queue
# This is a dictionary where the key is the current transaction ID for a transaction which
# was triggered by a proxy request with a originating ID.
self.active_proxy_put_reqs: Dict[TransactionId, TransactionId] = {}
super().__init__()
def transaction_indication(
self,
transaction_indication_params: TransactionParams,
):
"""This indication is used to report the transaction ID to the CFDP user"""
_LOGGER.info(
f"{self.base_str}: Transaction.indication for {transaction_indication_params.transaction_id}"
)
if transaction_indication_params.originating_transaction_id is not None:
_LOGGER.info(
f"Originating Transaction ID: {transaction_indication_params.originating_transaction_id}"
)
self.active_proxy_put_reqs.update(
{
transaction_indication_params.transaction_id: transaction_indication_params.originating_transaction_id
}
)
def eof_sent_indication(self, transaction_id: TransactionId):
_LOGGER.info(f"{self.base_str}: EOF-Sent.indication for {transaction_id}")
def transaction_finished_indication(self, params: TransactionFinishedParams):
_LOGGER.info(
f"{self.base_str}: Transaction-Finished.indication for {params.transaction_id}."
)
_LOGGER.info(f"Condition Code: {params.finished_params.condition_code!r}")
_LOGGER.info(f"Delivery Code: {params.finished_params.delivery_code!r}")
_LOGGER.info(f"File Status: {params.finished_params.file_status!r}")
if params.transaction_id in self.active_proxy_put_reqs:
proxy_put_response = ProxyPutResponse(
ProxyPutResponseParams.from_finished_params(params.finished_params)
).to_generic_msg_to_user_tlv()
originating_id = self.active_proxy_put_reqs.get(params.transaction_id)
assert originating_id is not None
put_req = PutRequest(
destination_id=originating_id.source_id,
source_file=None,
dest_file=None,
trans_mode=None,
closure_requested=None,
msgs_to_user=[
proxy_put_response,
OriginatingTransactionId(
originating_id
).to_generic_msg_to_user_tlv(),
],
)
_LOGGER.info(
f"Requesting Proxy Put Response concluding Proxy Put originating from "
f"{originating_id}"
)
self.put_req_queue.put(put_req)
self.active_proxy_put_reqs.pop(params.transaction_id)
def metadata_recv_indication(self, params: MetadataRecvParams):
_LOGGER.info(
f"{self.base_str}: Metadata-Recv.indication for {params.transaction_id}."
)
if params.msgs_to_user is not None:
self._handle_msgs_to_user(params.transaction_id, params.msgs_to_user)
def _handle_msgs_to_user(
self, transaction_id: TransactionId, msgs_to_user: List[MessageToUserTlv]
):
for msg_to_user in msgs_to_user:
if msg_to_user.is_reserved_cfdp_message():
reserved_msg_tlv = msg_to_user.to_reserved_msg_tlv()
assert reserved_msg_tlv is not None
self._handle_reserved_cfdp_message(transaction_id, reserved_msg_tlv)
else:
_LOGGER.info(f"Received custom message to user: {msg_to_user}")
def _handle_reserved_cfdp_message(
self, transaction_id: TransactionId, reserved_cfdp_msg: ReservedCfdpMessage
):
if reserved_cfdp_msg.is_cfdp_proxy_operation():
self._handle_cfdp_proxy_operation(transaction_id, reserved_cfdp_msg)
elif reserved_cfdp_msg.is_originating_transaction_id():
_LOGGER.info(
f"Received originating transaction ID: "
f"{reserved_cfdp_msg.get_originating_transaction_id()}"
)
def _handle_cfdp_proxy_operation(
self, transaction_id: TransactionId, reserved_cfdp_msg: ReservedCfdpMessage
):
if (
reserved_cfdp_msg.get_cfdp_proxy_message_type()
== ProxyMessageType.PUT_REQUEST
):
put_req_params = reserved_cfdp_msg.get_proxy_put_request_params()
_LOGGER.info(f"Received Proxy Put Request: {put_req_params}")
assert put_req_params is not None
put_req = PutRequest(
destination_id=put_req_params.dest_entity_id,
source_file=Path(put_req_params.source_file_as_path),
dest_file=Path(put_req_params.dest_file_as_path),
trans_mode=None,
closure_requested=None,
msgs_to_user=[
OriginatingTransactionId(
transaction_id
).to_generic_msg_to_user_tlv()
],
)
self.put_req_queue.put(put_req)
elif (
reserved_cfdp_msg.get_cfdp_proxy_message_type()
== ProxyMessageType.PUT_RESPONSE
):
put_response_params = reserved_cfdp_msg.get_proxy_put_response_params()
_LOGGER.info(f"Received Proxy Put Response: {put_response_params}")
def file_segment_recv_indication(self, params: FileSegmentRecvdParams):
_LOGGER.info(
f"{self.base_str}: File-Segment-Recv.indication for {params.transaction_id}."
)
def report_indication(self, transaction_id: TransactionId, status_report: Any):
# TODO: p.28 of the CFDP standard specifies what information the status report parameter
# could contain. I think it would be better to not hardcode the type of the status
# report here, but something like Union[any, CfdpStatusReport] with CfdpStatusReport
# being an implementation which supports all three information suggestions would be
# nice
pass
def suspended_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode
):
_LOGGER.info(
f"{self.base_str}: Suspended.indication for {transaction_id} | Condition Code: {cond_code}"
)
def resumed_indication(self, transaction_id: TransactionId, progress: int):
_LOGGER.info(
f"{self.base_str}: Resumed.indication for {transaction_id} | Progress: {progress} bytes"
)
def fault_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
_LOGGER.info(
f"{self.base_str}: Fault.indication for {transaction_id} | Condition Code: {cond_code} | "
f"Progress: {progress} bytes"
)
def abandoned_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
_LOGGER.info(
f"{self.base_str}: Abandoned.indication for {transaction_id} | Condition Code: {cond_code} |"
f" Progress: {progress} bytes"
)
def eof_recv_indication(self, transaction_id: TransactionId):
_LOGGER.info(f"{self.base_str}: EOF-Recv.indication for {transaction_id}")
class CustomCheckTimerProvider(CheckTimerProvider):
def provide_check_timer(
self,
local_entity_id: UnsignedByteField,
remote_entity_id: UnsignedByteField,
entity_type: EntityType,
) -> Countdown:
return Countdown(timedelta(seconds=5.0))
def main():
parser = argparse.ArgumentParser(
prog="CFDP Local Entity Application",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("-v", "--verbose", action="count", default=0)
parser.add_argument(
"-f",
help="Perform a file-copy operation",
action="store_true",
dest="file_copy",
)
parser.add_argument(
"-m",
"--mode",
dest="transmission_mode",
help=(
f"Specify the transfer type{os.linesep}"
f' - "0" or "ack" for unacknowledged (Class 0) transfers{os.linesep}'
f' - "1" or "nak" for acknowledged (Class 1) transfers. Default value'
),
default="nak",
)
# Optional Boolean argument where you can specify True/False
parser.add_argument(
"-c",
type=bool,
nargs="?",
const=True,
default=None,
dest="closure_requested",
help="Request transaction closure for the unacknowledged mode",
)
args = parser.parse_args()
stop_signal = threading.Event()
logging_level = logging.INFO
if args.verbose >= 1:
logging_level = logging.DEBUG
logging.basicConfig(level=logging_level)
remote_cfg_table = RemoteEntityCfgTable()
remote_cfg_table.add_config(REMOTE_CFG_OF_REMOTE_ENTITY)
src_fault_handler = CfdpFaultHandler(BASE_STR_SRC)
# 16 bit sequence count for transactions.
src_seq_count_provider = SeqCountProvider(16)
src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE)
check_timer_provider = CustomCheckTimerProvider()
source_handler = SourceHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
seq_num_provider=src_seq_count_provider,
remote_cfg_table=remote_cfg_table,
user=src_user,
check_timer_provider=check_timer_provider,
)
source_entity_task = SourceEntityHandler(
BASE_STR_SRC,
logging_level,
source_handler,
PUT_REQ_QUEUE,
SOURCE_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)
# Enable all indications.
dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST)
dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE)
dest_handler = DestHandler(
cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
user=dest_user,
remote_cfg_table=remote_cfg_table,
check_timer_provider=check_timer_provider,
)
dest_entity_task = DestEntityHandler(
BASE_STR_DEST,
logging_level,
dest_handler,
DEST_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)
# Address Any to accept CFDP packets from other address than localhost.
local_addr = ipaddress.ip_address("0.0.0.0")
# Localhost as default.
remote_addr = ipaddress.ip_address("127.0.0.1")
udp_server = UdpServer(
sleep_time=0.1,
addr=(str(local_addr), PY_PORT),
explicit_remote_addr=(str(remote_addr), RUST_PORT),
tx_queue=TM_QUEUE,
source_entity_rx_queue=SOURCE_ENTITY_QUEUE,
dest_entity_rx_queue=DEST_ENTITY_QUEUE,
stop_signal=stop_signal,
)
# Prepare a put request / file copy operation if the user specifies it.
if args.file_copy:
_LOGGER.info("Performing file copy operation")
transmission_mode = None
if args.transmission_mode == "ack":
transmission_mode = TransmissionMode.ACKNOWLEDGED
elif args.transmission_mode == "nak":
transmission_mode = TransmissionMode.UNACKNOWLEDGED
with tempfile.NamedTemporaryFile(delete=False) as srcfile:
srcfile.write(FILE_CONTENT.encode())
srcfile_path = srcfile.name
tempdir = tempfile.TemporaryDirectory()
put_req = PutRequest(
destination_id=RUST_ENTITY_ID,
source_file=Path(srcfile_path),
dest_file=Path(tempdir.name).joinpath("test.txt"),
closure_requested=args.closure_requested,
trans_mode=transmission_mode,
)
PUT_REQ_QUEUE.put(put_req)
source_entity_task.start()
dest_entity_task.start()
udp_server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_signal.set()
source_entity_task.join()
dest_entity_task.join()
udp_server.join()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,520 @@
use std::{
fmt::Debug,
fs::OpenOptions,
io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::mpsc,
thread,
time::Duration,
};
use cfdp::{
dest::DestinationHandler,
filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
RemoteEntityConfig, StdTimerCreator, TransactionId, UserFaultHookProvider,
};
use clap::Parser;
use log::{debug, info, warn};
use spacepackets::{
cfdp::{
pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError},
ChecksumType, ConditionCode, TransmissionMode,
},
seq_count::SeqCountProviderSyncU16,
util::{UnsignedByteFieldU16, UnsignedEnum},
};
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
const RUST_PORT: u16 = 5111;
const PY_PORT: u16 = 5222;
const LOG_LEVEL: log::LevelFilter = log::LevelFilter::Info;
const FILE_DATA: &str = "Hello World!";
#[derive(Debug, Copy, Clone, clap::ValueEnum)]
pub enum TransmissionModeCli {
Nak,
Ack,
}
#[derive(clap::Parser)]
#[command(about = "Arguments for executing a file copy operation")]
pub struct Cli {
#[arg(short, help = "Perform a file copy operation")]
file_copy: bool,
#[arg(short, default_value = "nak")]
mode: Option<TransmissionModeCli>,
#[arg(short)]
closure_requested: Option<bool>,
}
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
}
pub struct ExampleCfdpUser {
entity_type: EntityType,
}
impl ExampleCfdpUser {
pub fn new(entity_type: EntityType) -> Self {
Self { entity_type }
}
}
impl CfdpUser for ExampleCfdpUser {
fn transaction_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: Transaction indication for {:?}",
self.entity_type, id
);
}
fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF sent for transaction {:?}",
self.entity_type, id
);
}
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
println!(
"{:?} entity: Transaction finished: {:?}",
self.entity_type, finished_params
);
}
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
println!(
"{:?} entity: Metadata received: {:?}",
self.entity_type, md_recvd_params
);
}
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
println!(
"{:?} entity: File segment {:?} received",
self.entity_type, segment_recvd_params
);
}
fn report_indication(&mut self, _id: &crate::TransactionId) {}
fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF received for transaction {:?}",
self.entity_type, id
);
}
}
pub struct UdpServer {
pub socket: UdpSocket,
recv_buf: Vec<u8>,
remote_addr: SocketAddr,
source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
}
#[derive(Debug, thiserror::Error)]
pub enum UdpServerError {
#[error(transparent)]
Io(#[from] io::Error),
#[error("pdu error: {0}")]
Pdu(#[from] PduError),
#[error("send error")]
Send,
}
impl UdpServer {
pub fn new<A: ToSocketAddrs>(
addr: A,
remote_addr: SocketAddr,
max_recv_size: usize,
source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
) -> Result<Self, io::Error> {
let server = Self {
socket: UdpSocket::bind(addr)?,
recv_buf: vec![0; max_recv_size],
source_tc_tx,
dest_tc_tx,
remote_addr,
source_tm_rx,
dest_tm_rx,
};
server.socket.set_nonblocking(true)?;
Ok(server)
}
pub fn try_recv_tc(
&mut self,
) -> Result<Option<(PduOwnedWithInfo, SocketAddr)>, UdpServerError> {
let res = match self.socket.recv_from(&mut self.recv_buf) {
Ok(res) => res,
Err(e) => {
return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut {
Ok(None)
} else {
Err(e.into())
}
}
};
let (_, from) = res;
self.remote_addr = from;
let pdu_owned = PduOwnedWithInfo::new_from_raw_packet(&self.recv_buf)?;
match pdu_owned.packet_target()? {
cfdp::PacketTarget::SourceEntity => {
self.source_tc_tx
.send(pdu_owned.clone())
.map_err(|_| UdpServerError::Send)?;
}
cfdp::PacketTarget::DestEntity => {
self.dest_tc_tx
.send(pdu_owned.clone())
.map_err(|_| UdpServerError::Send)?;
}
}
Ok(Some((pdu_owned, from)))
}
pub fn recv_and_send_telemetry(&mut self) {
let tm_handler = |receiver: &mpsc::Receiver<PduOwnedWithInfo>| {
while let Ok(tm) = receiver.try_recv() {
debug!("Sending PDU: {:?}", tm);
pdu_printout(&tm);
let result = self.socket.send_to(tm.pdu(), self.remote_addr());
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
};
tm_handler(&self.source_tm_rx);
tm_handler(&self.dest_tm_rx);
}
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}
fn pdu_printout(pdu: &PduOwnedWithInfo) {
match pdu.pdu_type() {
spacepackets::cfdp::PduType::FileDirective => match pdu.file_directive_type().unwrap() {
spacepackets::cfdp::pdu::FileDirectiveType::EofPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::FinishedPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
let meta_pdu =
MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed");
debug!("Metadata PDU: {:?}", meta_pdu)
}
spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::PromptPdu => (),
spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
},
spacepackets::cfdp::PduType::FileData => {
let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed");
debug!("File data PDU: {:?}", fd_pdu);
}
}
}
fn main() {
let cli_args = Cli::parse();
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{}[{}][{}] {}",
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
std::thread::current().name().expect("thread is not named"),
record.level(),
message
))
})
.level(LOG_LEVEL)
.chain(std::io::stdout())
.apply()
.unwrap();
let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut file = OpenOptions::new()
.write(true)
.open(&srcfile)
.expect("opening file failed");
info!("created test source file {:?}", srcfile);
file.write_all(FILE_DATA.as_bytes())
.expect("writing file content failed");
let destdir = tempfile::tempdir().expect("creating temp directory failed");
let destfile = destdir.path().join("test.txt");
let local_cfg_source = LocalEntityConfig::new(
RUST_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let (source_tm_tx, source_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
let (dest_tm_tx, dest_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg_python = RemoteEntityConfig::new_with_default_values(
PYTHON_ID.into(),
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32C,
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let mut source_handler = SourceHandler::new(
local_cfg_source,
source_tm_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg_python,
StdTimerCreator::default(),
seq_count_provider,
);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending);
let local_cfg_dest = LocalEntityConfig::new(
RUST_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tm_tx,
NativeFilestore::default(),
remote_cfg_python,
StdTimerCreator::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);
let put_request = if cli_args.file_copy {
Some(
PutRequestOwned::new_regular_request(
PYTHON_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
cli_args.mode.map(|m| match m {
TransmissionModeCli::Ack => TransmissionMode::Acknowledged,
TransmissionModeCli::Nak => TransmissionMode::Unacknowledged,
}),
cli_args.closure_requested,
)
.expect("put request creation failed"),
)
} else {
None
};
let (source_tc_tx, source_tc_rx) = mpsc::channel();
let (dest_tc_tx, dest_tc_rx) = mpsc::channel();
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT);
let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), PY_PORT);
let mut udp_server = UdpServer::new(
local_addr,
remote_addr,
2048,
source_tc_tx,
dest_tc_tx,
source_tm_rx,
dest_tm_rx,
)
.expect("creating UDP server failed");
let jh_source = thread::Builder::new()
.name("cfdp src entity".to_string())
.spawn(move || {
info!("Starting RUST SRC");
if let Some(put_request) = put_request {
info!("RUST SRC: Performing put request: {:?}", put_request);
source_handler
.put_request(&put_request)
.expect("put request failed");
}
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match source_tc_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
warn!("cfdp src entity error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Source handler state machine possible in permanent loop");
}
}
})
.unwrap();
let jh_dest = thread::Builder::new()
.name("cfdp dest entity".to_string())
.spawn(move || {
info!("Starting RUST DEST. Local ID {}", RUST_ID.value());
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match dest_tc_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Dest handler error: {}", e);
// TODO: I'd prefer a proper cancel request if a transfer is active..
dest_handler.reset();
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Destination handler state machine possible in permanent loop");
}
}
})
.unwrap();
let jh_udp_server = thread::Builder::new()
.name("cfdp udp server".to_string())
.spawn(move || {
info!("Starting UDP server on {}", remote_addr);
loop {
loop {
match udp_server.try_recv_tc() {
Ok(result) => match result {
Some((pdu, _addr)) => {
debug!("Received PDU on UDP server: {:?}", pdu);
pdu_printout(&pdu);
}
None => break,
},
Err(e) => {
warn!("UDP server error: {}", e);
break;
}
}
}
udp_server.recv_and_send_telemetry();
thread::sleep(Duration::from_millis(50));
}
})
.unwrap();
jh_source.join().unwrap();
jh_dest.join().unwrap();
jh_udp_server.join().unwrap();
}

View File

@ -0,0 +1 @@
cfdp-py @ git+https://github.com/us-irs/cfdp-py.git@main