527 lines
18 KiB
Rust
527 lines
18 KiB
Rust
use std::{
|
|
fmt::Debug,
|
|
fs::OpenOptions,
|
|
io::{self, ErrorKind, Write},
|
|
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
|
|
sync::{atomic::AtomicBool, mpsc, Arc},
|
|
thread,
|
|
time::Duration,
|
|
};
|
|
|
|
use cfdp::{
|
|
dest::DestinationHandler,
|
|
filestore::NativeFilestore,
|
|
request::{PutRequestOwned, StaticPutRequestCacher},
|
|
source::SourceHandler,
|
|
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
|
|
EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider,
|
|
RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
|
|
};
|
|
use log::{debug, info, warn};
|
|
use spacepackets::{
|
|
cfdp::{
|
|
pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError},
|
|
ChecksumType, ConditionCode, TransmissionMode,
|
|
},
|
|
seq_count::SeqCountProviderSyncU16,
|
|
util::{UnsignedByteFieldU16, UnsignedEnum},
|
|
};
|
|
|
|
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
|
|
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
|
|
|
|
const RUST_PORT: u16 = 5111;
|
|
const PY_PORT: u16 = 5222;
|
|
|
|
const LOG_LEVEL: log::LevelFilter = log::LevelFilter::Info;
|
|
|
|
const FILE_DATA: &str = "Hello World!";
|
|
|
|
#[derive(Default)]
|
|
pub struct ExampleFaultHandler {}
|
|
|
|
impl UserFaultHookProvider for ExampleFaultHandler {
|
|
fn notice_of_suspension_cb(
|
|
&mut self,
|
|
transaction_id: TransactionId,
|
|
cond: ConditionCode,
|
|
progress: u64,
|
|
) {
|
|
panic!(
|
|
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
|
|
transaction_id, cond, progress
|
|
);
|
|
}
|
|
|
|
fn notice_of_cancellation_cb(
|
|
&mut self,
|
|
transaction_id: TransactionId,
|
|
cond: ConditionCode,
|
|
progress: u64,
|
|
) {
|
|
panic!(
|
|
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
|
|
transaction_id, cond, progress
|
|
);
|
|
}
|
|
|
|
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
|
|
panic!(
|
|
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
|
|
transaction_id, cond, progress
|
|
);
|
|
}
|
|
|
|
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
|
|
panic!(
|
|
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
|
|
transaction_id, cond, progress
|
|
);
|
|
}
|
|
}
|
|
|
|
pub struct ExampleCfdpUser {
|
|
entity_type: EntityType,
|
|
completion_signal: Arc<AtomicBool>,
|
|
}
|
|
|
|
impl ExampleCfdpUser {
|
|
pub fn new(entity_type: EntityType, completion_signal: Arc<AtomicBool>) -> Self {
|
|
Self {
|
|
entity_type,
|
|
completion_signal,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl CfdpUser for ExampleCfdpUser {
|
|
fn transaction_indication(&mut self, id: &crate::TransactionId) {
|
|
println!(
|
|
"{:?} entity: Transaction indication for {:?}",
|
|
self.entity_type, id
|
|
);
|
|
}
|
|
|
|
fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
|
|
println!(
|
|
"{:?} entity: EOF sent for transaction {:?}",
|
|
self.entity_type, id
|
|
);
|
|
}
|
|
|
|
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
|
|
println!(
|
|
"{:?} entity: Transaction finished: {:?}",
|
|
self.entity_type, finished_params
|
|
);
|
|
self.completion_signal
|
|
.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
}
|
|
|
|
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
|
|
println!(
|
|
"{:?} entity: Metadata received: {:?}",
|
|
self.entity_type, md_recvd_params
|
|
);
|
|
}
|
|
|
|
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
|
|
println!(
|
|
"{:?} entity: File segment {:?} received",
|
|
self.entity_type, segment_recvd_params
|
|
);
|
|
}
|
|
|
|
fn report_indication(&mut self, _id: &crate::TransactionId) {}
|
|
|
|
fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
|
|
panic!("unexpected suspended indication");
|
|
}
|
|
|
|
fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
|
|
|
|
fn fault_indication(
|
|
&mut self,
|
|
_id: &crate::TransactionId,
|
|
_condition_code: ConditionCode,
|
|
_progress: u64,
|
|
) {
|
|
panic!("unexpected fault indication");
|
|
}
|
|
|
|
fn abandoned_indication(
|
|
&mut self,
|
|
_id: &crate::TransactionId,
|
|
_condition_code: ConditionCode,
|
|
_progress: u64,
|
|
) {
|
|
panic!("unexpected abandoned indication");
|
|
}
|
|
|
|
fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
|
|
println!(
|
|
"{:?} entity: EOF received for transaction {:?}",
|
|
self.entity_type, id
|
|
);
|
|
}
|
|
}
|
|
|
|
pub struct UdpServer {
|
|
pub socket: UdpSocket,
|
|
recv_buf: Vec<u8>,
|
|
remote_addr: SocketAddr,
|
|
source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
|
|
dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
|
|
source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
|
|
dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum UdpServerError {
|
|
#[error(transparent)]
|
|
Io(#[from] io::Error),
|
|
#[error("pdu error: {0}")]
|
|
Pdu(#[from] PduError),
|
|
#[error("send error")]
|
|
Send,
|
|
}
|
|
|
|
impl UdpServer {
|
|
pub fn new<A: ToSocketAddrs>(
|
|
addr: A,
|
|
remote_addr: SocketAddr,
|
|
max_recv_size: usize,
|
|
source_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
|
|
dest_tc_tx: mpsc::Sender<PduOwnedWithInfo>,
|
|
source_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
|
|
dest_tm_rx: mpsc::Receiver<PduOwnedWithInfo>,
|
|
) -> Result<Self, io::Error> {
|
|
let server = Self {
|
|
socket: UdpSocket::bind(addr)?,
|
|
recv_buf: vec![0; max_recv_size],
|
|
source_tc_tx,
|
|
dest_tc_tx,
|
|
remote_addr,
|
|
source_tm_rx,
|
|
dest_tm_rx,
|
|
};
|
|
server.socket.set_nonblocking(true)?;
|
|
Ok(server)
|
|
}
|
|
|
|
pub fn try_recv_tc(
|
|
&mut self,
|
|
) -> Result<Option<(PduOwnedWithInfo, SocketAddr)>, UdpServerError> {
|
|
let res = match self.socket.recv_from(&mut self.recv_buf) {
|
|
Ok(res) => res,
|
|
Err(e) => {
|
|
return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut {
|
|
Ok(None)
|
|
} else {
|
|
Err(e.into())
|
|
}
|
|
}
|
|
};
|
|
let (_, from) = res;
|
|
self.remote_addr = from;
|
|
let pdu_owned = PduOwnedWithInfo::new_from_raw_packet(&self.recv_buf)?;
|
|
match pdu_owned.packet_target()? {
|
|
cfdp::PacketTarget::SourceEntity => {
|
|
self.source_tc_tx
|
|
.send(pdu_owned.clone())
|
|
.map_err(|_| UdpServerError::Send)?;
|
|
}
|
|
cfdp::PacketTarget::DestEntity => {
|
|
self.dest_tc_tx
|
|
.send(pdu_owned.clone())
|
|
.map_err(|_| UdpServerError::Send)?;
|
|
}
|
|
}
|
|
Ok(Some((pdu_owned, from)))
|
|
}
|
|
|
|
pub fn recv_and_send_telemetry(&mut self) {
|
|
let tm_handler = |receiver: &mpsc::Receiver<PduOwnedWithInfo>| {
|
|
while let Ok(tm) = receiver.try_recv() {
|
|
debug!("Sending PDU: {:?}", tm);
|
|
pdu_printout(&tm);
|
|
let result = self.socket.send_to(tm.pdu(), self.remote_addr());
|
|
if let Err(e) = result {
|
|
warn!("Sending TM with UDP socket failed: {e}")
|
|
}
|
|
}
|
|
};
|
|
tm_handler(&self.source_tm_rx);
|
|
tm_handler(&self.dest_tm_rx);
|
|
}
|
|
|
|
pub fn remote_addr(&self) -> SocketAddr {
|
|
self.remote_addr
|
|
}
|
|
}
|
|
|
|
fn pdu_printout(pdu: &PduOwnedWithInfo) {
|
|
match pdu.pdu_type() {
|
|
spacepackets::cfdp::PduType::FileDirective => match pdu.file_directive_type().unwrap() {
|
|
spacepackets::cfdp::pdu::FileDirectiveType::EofPdu => (),
|
|
spacepackets::cfdp::pdu::FileDirectiveType::FinishedPdu => (),
|
|
spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (),
|
|
spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => {
|
|
let meta_pdu =
|
|
MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed");
|
|
debug!("Metadata PDU: {:?}", meta_pdu)
|
|
}
|
|
spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (),
|
|
spacepackets::cfdp::pdu::FileDirectiveType::PromptPdu => (),
|
|
spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (),
|
|
},
|
|
spacepackets::cfdp::PduType::FileData => {
|
|
let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed");
|
|
debug!("File data PDU: {:?}", fd_pdu);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn main() {
|
|
fern::Dispatch::new()
|
|
.format(|out, message, record| {
|
|
out.finish(format_args!(
|
|
"{}[{}][{}] {}",
|
|
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
|
|
std::thread::current().name().expect("thread is not named"),
|
|
record.level(),
|
|
message
|
|
))
|
|
})
|
|
.level(LOG_LEVEL)
|
|
.chain(std::io::stdout())
|
|
.apply()
|
|
.unwrap();
|
|
// Simplified event handling using atomic signals.
|
|
let stop_signal_source = Arc::new(AtomicBool::new(false));
|
|
let stop_signal_dest = stop_signal_source.clone();
|
|
// let stop_signal_ctrl = stop_signal_source.clone();
|
|
|
|
let completion_signal_source = Arc::new(AtomicBool::new(false));
|
|
// let completion_signal_source_main = completion_signal_source.clone();
|
|
|
|
let completion_signal_dest = Arc::new(AtomicBool::new(false));
|
|
// let completion_signal_dest_main = completion_signal_dest.clone();
|
|
|
|
let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
|
|
let mut file = OpenOptions::new()
|
|
.write(true)
|
|
.open(&srcfile)
|
|
.expect("opening file failed");
|
|
info!("created test source file {:?}", srcfile);
|
|
file.write_all(FILE_DATA.as_bytes())
|
|
.expect("writing file content failed");
|
|
let destdir = tempfile::tempdir().expect("creating temp directory failed");
|
|
let destfile = destdir.path().join("test.txt");
|
|
|
|
let local_cfg_source = LocalEntityConfig::new(
|
|
RUST_ID.into(),
|
|
IndicationConfig::default(),
|
|
ExampleFaultHandler::default(),
|
|
);
|
|
let (source_tm_tx, source_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
|
|
let (dest_tm_tx, dest_tm_rx) = mpsc::channel::<PduOwnedWithInfo>();
|
|
let put_request_cacher = StaticPutRequestCacher::new(2048);
|
|
let remote_cfg_python = RemoteEntityConfig::new_with_default_values(
|
|
PYTHON_ID.into(),
|
|
1024,
|
|
true,
|
|
false,
|
|
spacepackets::cfdp::TransmissionMode::Unacknowledged,
|
|
ChecksumType::Crc32C,
|
|
);
|
|
let seq_count_provider = SeqCountProviderSyncU16::default();
|
|
let mut source_handler = SourceHandler::new(
|
|
local_cfg_source,
|
|
source_tm_tx,
|
|
NativeFilestore::default(),
|
|
put_request_cacher,
|
|
2048,
|
|
remote_cfg_python,
|
|
seq_count_provider,
|
|
);
|
|
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source);
|
|
|
|
let local_cfg_dest = LocalEntityConfig::new(
|
|
RUST_ID.into(),
|
|
IndicationConfig::default(),
|
|
ExampleFaultHandler::default(),
|
|
);
|
|
let mut dest_handler = DestinationHandler::new(
|
|
local_cfg_dest,
|
|
1024,
|
|
dest_tm_tx,
|
|
NativeFilestore::default(),
|
|
remote_cfg_python,
|
|
StdCheckTimerCreator::default(),
|
|
);
|
|
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);
|
|
|
|
let put_request = PutRequestOwned::new_regular_request(
|
|
PYTHON_ID.into(),
|
|
srcfile.to_str().expect("invaid path string"),
|
|
destfile.to_str().expect("invaid path string"),
|
|
Some(TransmissionMode::Unacknowledged),
|
|
Some(true),
|
|
)
|
|
.expect("put request creation failed");
|
|
|
|
let (source_tc_tx, source_tc_rx) = mpsc::channel();
|
|
let (dest_tc_tx, dest_tc_rx) = mpsc::channel();
|
|
|
|
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT);
|
|
let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), PY_PORT);
|
|
let mut udp_server = UdpServer::new(
|
|
local_addr,
|
|
remote_addr,
|
|
2048,
|
|
source_tc_tx,
|
|
dest_tc_tx,
|
|
source_tm_rx,
|
|
dest_tm_rx,
|
|
)
|
|
.expect("creating UDP server failed");
|
|
|
|
let jh_source = thread::Builder::new()
|
|
.name("cfdp src entity".to_string())
|
|
.spawn(move || {
|
|
info!("Starting RUST SRC");
|
|
source_handler
|
|
.put_request(&put_request)
|
|
.expect("put request failed");
|
|
loop {
|
|
let mut next_delay = None;
|
|
let mut undelayed_call_count = 0;
|
|
let packet_info = match source_tc_rx.try_recv() {
|
|
Ok(pdu_with_info) => Some(pdu_with_info),
|
|
Err(e) => match e {
|
|
mpsc::TryRecvError::Empty => None,
|
|
mpsc::TryRecvError::Disconnected => {
|
|
panic!("unexpected disconnect from destination channel sender");
|
|
}
|
|
},
|
|
};
|
|
match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
|
|
Ok(sent_packets) => {
|
|
if sent_packets == 0 {
|
|
next_delay = Some(Duration::from_millis(50));
|
|
}
|
|
}
|
|
Err(e) => {
|
|
warn!("cfdp src entity error: {}", e);
|
|
next_delay = Some(Duration::from_millis(50));
|
|
}
|
|
}
|
|
if let Some(delay) = next_delay {
|
|
thread::sleep(delay);
|
|
} else {
|
|
undelayed_call_count += 1;
|
|
}
|
|
if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) {
|
|
break;
|
|
}
|
|
// Safety feature against configuration errors.
|
|
if undelayed_call_count >= 200 {
|
|
panic!("Source handler state machine possible in permanent loop");
|
|
}
|
|
}
|
|
})
|
|
.unwrap();
|
|
|
|
let jh_dest = thread::Builder::new()
|
|
.name("cfdp dest entity".to_string())
|
|
.spawn(move || {
|
|
info!("Starting RUST DEST. Local ID {}", RUST_ID.value());
|
|
loop {
|
|
let mut next_delay = None;
|
|
let mut undelayed_call_count = 0;
|
|
let packet_info = match dest_tc_rx.try_recv() {
|
|
Ok(pdu_with_info) => Some(pdu_with_info),
|
|
Err(e) => match e {
|
|
mpsc::TryRecvError::Empty => None,
|
|
mpsc::TryRecvError::Disconnected => {
|
|
panic!("unexpected disconnect from destination channel sender");
|
|
}
|
|
},
|
|
};
|
|
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
|
|
Ok(sent_packets) => {
|
|
if sent_packets == 0 {
|
|
next_delay = Some(Duration::from_millis(50));
|
|
}
|
|
}
|
|
Err(e) => {
|
|
println!("Source handler error: {}", e);
|
|
next_delay = Some(Duration::from_millis(50));
|
|
}
|
|
}
|
|
if let Some(delay) = next_delay {
|
|
thread::sleep(delay);
|
|
} else {
|
|
undelayed_call_count += 1;
|
|
}
|
|
if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) {
|
|
break;
|
|
}
|
|
// Safety feature against configuration errors.
|
|
if undelayed_call_count >= 200 {
|
|
panic!("Destination handler state machine possible in permanent loop");
|
|
}
|
|
}
|
|
})
|
|
.unwrap();
|
|
|
|
let jh_udp_server = thread::Builder::new()
|
|
.name("cfdp udp server".to_string())
|
|
.spawn(move || {
|
|
info!("Starting UDP server on {}", remote_addr);
|
|
loop {
|
|
loop {
|
|
match udp_server.try_recv_tc() {
|
|
Ok(result) => match result {
|
|
Some((pdu, _addr)) => {
|
|
debug!("Received PDU on UDP server: {:?}", pdu);
|
|
pdu_printout(&pdu);
|
|
}
|
|
None => break,
|
|
},
|
|
Err(e) => {
|
|
warn!("UDP server error: {}", e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
udp_server.recv_and_send_telemetry();
|
|
thread::sleep(Duration::from_millis(50));
|
|
}
|
|
})
|
|
.unwrap();
|
|
|
|
//loop {
|
|
/*
|
|
if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed)
|
|
&& completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed)
|
|
{
|
|
let file = std::fs::read_to_string(destfile).expect("reading file failed");
|
|
assert_eq!(file, FILE_DATA);
|
|
// Stop the threads gracefully.
|
|
stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
break;
|
|
}
|
|
if std::time::Instant::now() - start > Duration::from_secs(20) {
|
|
panic!("file transfer not finished in 20 seconds");
|
|
}
|
|
*/
|
|
//std::thread::sleep(Duration::from_millis(50));
|
|
//}
|
|
|
|
jh_source.join().unwrap();
|
|
jh_dest.join().unwrap();
|
|
jh_udp_server.join().unwrap();
|
|
}
|