From 259bf3294cf622ab2815e03dd6a0574a305cb4a7 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 27 Aug 2024 16:01:17 +0200 Subject: [PATCH] continue with rust and python --- examples/python-interop/main.py | 24 +-- examples/python-interop/main.rs | 334 +++++++++++++++++++++++++++++++- src/dest.rs | 6 +- tests/end-to-end.rs | 4 +- 4 files changed, 349 insertions(+), 19 deletions(-) diff --git a/examples/python-interop/main.py b/examples/python-interop/main.py index 4071fd3..9168f4d 100755 --- a/examples/python-interop/main.py +++ b/examples/python-interop/main.py @@ -55,8 +55,8 @@ from tmtccmd.config.cfdp import CfdpParams, generic_cfdp_params_to_put_request from tmtccmd.config.args import add_cfdp_procedure_arguments, cfdp_args_to_cfdp_params -LOCAL_ENTITY_ID = ByteFieldU16(1) -REMOTE_ENTITY_ID = ByteFieldU16(2) +PYTHON_ENTITY_ID = ByteFieldU16(1) +RUST_ENTITY_ID = ByteFieldU16(2) # Enable all indications for both local and remote entity. INDICATION_CFG = IndicationCfg() @@ -79,8 +79,8 @@ DEST_ENTITY_QUEUE = Queue() # be sent by the UDP server. TM_QUEUE = Queue() -REMOTE_CFG_OF_LOCAL_ENTITY = RemoteEntityCfg( - entity_id=LOCAL_ENTITY_ID, +REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg( + entity_id=PYTHON_ENTITY_ID, max_packet_len=MAX_PACKET_LEN, max_file_segment_len=FILE_SEGMENT_SIZE, closure_requested=True, @@ -89,8 +89,8 @@ REMOTE_CFG_OF_LOCAL_ENTITY = RemoteEntityCfg( crc_type=ChecksumType.CRC_32, ) -REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_LOCAL_ENTITY) -REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = REMOTE_ENTITY_ID +REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_PY_ENTITY) +REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = RUST_ENTITY_ID LOCAL_PORT = 5111 REMOTE_PORT = 5222 @@ -196,10 +196,10 @@ class SourceEntityHandler(Thread): try: put_req: PutRequest = self.put_req_queue.get(False) _LOGGER.info(f"{self.base_str}: Handling Put Request: {put_req}") - if put_req.destination_id not in [LOCAL_ENTITY_ID, REMOTE_ENTITY_ID]: + if put_req.destination_id not in [PYTHON_ENTITY_ID, RUST_ENTITY_ID]: _LOGGER.warning( - f"can only handle put requests target towards {REMOTE_ENTITY_ID} or " - f"{LOCAL_ENTITY_ID}" + f"can only handle put requests target towards {RUST_ENTITY_ID} or " + f"{PYTHON_ENTITY_ID}" ) else: try: @@ -557,7 +557,7 @@ def main(): cfdp_params = CfdpParams() cfdp_args_to_cfdp_params(args, cfdp_params) put_req = generic_cfdp_params_to_put_request( - cfdp_params, LOCAL_ENTITY_ID, REMOTE_ENTITY_ID, LOCAL_ENTITY_ID + cfdp_params, PYTHON_ENTITY_ID, RUST_ENTITY_ID, PYTHON_ENTITY_ID ) PUT_REQ_QUEUE.put(put_req) @@ -572,7 +572,7 @@ def main(): src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE) check_timer_provider = CustomCheckTimerProvider() source_handler = SourceHandler( - cfg=LocalEntityCfg(LOCAL_ENTITY_ID, INDICATION_CFG, src_fault_handler), + cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler), seq_num_provider=src_seq_count_provider, remote_cfg_table=remote_cfg_table, user=src_user, @@ -592,7 +592,7 @@ def main(): dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST) dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE) dest_handler = DestHandler( - cfg=LocalEntityCfg(LOCAL_ENTITY_ID, INDICATION_CFG, dest_fault_handler), + cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler), user=dest_user, remote_cfg_table=remote_cfg_table, check_timer_provider=check_timer_provider, diff --git a/examples/python-interop/main.rs b/examples/python-interop/main.rs index f328e4d..10c88a6 100644 --- a/examples/python-interop/main.rs +++ b/examples/python-interop/main.rs @@ -1 +1,333 @@ -fn main() {} +use std::{ + fs::OpenOptions, + io::Write, + sync::{atomic::AtomicBool, mpsc, Arc}, + thread, + time::Duration, +}; + +use cfdp::{ + dest::DestinationHandler, + filestore::NativeFilestore, + request::{PutRequestOwned, StaticPutRequestCacher}, + source::SourceHandler, + user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, + EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig, + StdCheckTimerCreator, TransactionId, UserFaultHookProvider, +}; +use spacepackets::{ + cfdp::{ChecksumType, ConditionCode, TransmissionMode}, + seq_count::SeqCountProviderSyncU16, + util::UnsignedByteFieldU16, +}; + +const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); +const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); + +const FILE_DATA: &str = "Hello World!"; + +#[derive(Default)] +pub struct ExampleFaultHandler {} + +impl UserFaultHookProvider for ExampleFaultHandler { + fn notice_of_suspension_cb( + &mut self, + transaction_id: TransactionId, + cond: ConditionCode, + progress: u64, + ) { + panic!( + "unexpected suspension of transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); + } + + fn notice_of_cancellation_cb( + &mut self, + transaction_id: TransactionId, + cond: ConditionCode, + progress: u64, + ) { + panic!( + "unexpected cancellation of transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); + } + + fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { + panic!( + "unexpected abandonment of transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); + } + + fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { + panic!( + "ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); + } +} + +pub struct ExampleCfdpUser { + entity_type: EntityType, + completion_signal: Arc, +} + +impl ExampleCfdpUser { + pub fn new(entity_type: EntityType, completion_signal: Arc) -> Self { + Self { + entity_type, + completion_signal, + } + } +} + +impl CfdpUser for ExampleCfdpUser { + fn transaction_indication(&mut self, id: &crate::TransactionId) { + println!( + "{:?} entity: Transaction indication for {:?}", + self.entity_type, id + ); + } + + fn eof_sent_indication(&mut self, id: &crate::TransactionId) { + println!( + "{:?} entity: EOF sent for transaction {:?}", + self.entity_type, id + ); + } + + fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) { + println!( + "{:?} entity: Transaction finished: {:?}", + self.entity_type, finished_params + ); + self.completion_signal + .store(true, std::sync::atomic::Ordering::Relaxed); + } + + fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) { + println!( + "{:?} entity: Metadata received: {:?}", + self.entity_type, md_recvd_params + ); + } + + fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) { + println!( + "{:?} entity: File segment {:?} received", + self.entity_type, segment_recvd_params + ); + } + + fn report_indication(&mut self, _id: &crate::TransactionId) {} + + fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) { + panic!("unexpected suspended indication"); + } + + fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {} + + fn fault_indication( + &mut self, + _id: &crate::TransactionId, + _condition_code: ConditionCode, + _progress: u64, + ) { + panic!("unexpected fault indication"); + } + + fn abandoned_indication( + &mut self, + _id: &crate::TransactionId, + _condition_code: ConditionCode, + _progress: u64, + ) { + panic!("unexpected abandoned indication"); + } + + fn eof_recvd_indication(&mut self, id: &crate::TransactionId) { + println!( + "{:?} entity: EOF received for transaction {:?}", + self.entity_type, id + ); + } +} + +fn main() { + // Simplified event handling using atomic signals. + let stop_signal_source = Arc::new(AtomicBool::new(false)); + let stop_signal_dest = stop_signal_source.clone(); + let stop_signal_ctrl = stop_signal_source.clone(); + + let completion_signal_source = Arc::new(AtomicBool::new(false)); + let completion_signal_source_main = completion_signal_source.clone(); + + let completion_signal_dest = Arc::new(AtomicBool::new(false)); + let completion_signal_dest_main = completion_signal_dest.clone(); + + let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + let mut file = OpenOptions::new() + .write(true) + .open(&srcfile) + .expect("opening file failed"); + file.write_all(FILE_DATA.as_bytes()) + .expect("writing file content failed"); + let destdir = tempfile::tempdir().expect("creating temp directory failed"); + let destfile = destdir.path().join("test.txt"); + + let local_cfg_source = LocalEntityConfig::new( + RUST_ID.into(), + IndicationConfig::default(), + ExampleFaultHandler::default(), + ); + let (source_tx, source_rx) = mpsc::channel::(); + let (dest_tx, dest_rx) = mpsc::channel::(); + let put_request_cacher = StaticPutRequestCacher::new(2048); + let remote_cfg_python = RemoteEntityConfig::new_with_default_values( + PYTHON_ID.into(), + 1024, + true, + false, + spacepackets::cfdp::TransmissionMode::Unacknowledged, + ChecksumType::Crc32, + ); + let seq_count_provider = SeqCountProviderSyncU16::default(); + let mut source_handler = SourceHandler::new( + local_cfg_source, + source_tx, + NativeFilestore::default(), + put_request_cacher, + 2048, + remote_cfg_python, + seq_count_provider, + ); + let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source); + + let local_cfg_dest = LocalEntityConfig::new( + RUST_ID.into(), + IndicationConfig::default(), + ExampleFaultHandler::default(), + ); + let mut dest_handler = DestinationHandler::new( + local_cfg_dest, + 1024, + dest_tx, + NativeFilestore::default(), + remote_cfg_python, + StdCheckTimerCreator::default(), + ); + let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest); + + let put_request = PutRequestOwned::new_regular_request( + PYTHON_ID.into(), + srcfile.to_str().expect("invaid path string"), + destfile.to_str().expect("invaid path string"), + Some(TransmissionMode::Unacknowledged), + Some(true), + ) + .expect("put request creation failed"); + + let start = std::time::Instant::now(); + + let jh_source = thread::spawn(move || { + source_handler + .put_request(&put_request) + .expect("put request failed"); + loop { + let mut next_delay = None; + let mut undelayed_call_count = 0; + let packet_info = match dest_rx.try_recv() { + Ok(pdu_with_info) => Some(pdu_with_info), + Err(e) => match e { + mpsc::TryRecvError::Empty => None, + mpsc::TryRecvError::Disconnected => { + panic!("unexpected disconnect from destination channel sender"); + } + }, + }; + match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) { + Ok(sent_packets) => { + if sent_packets == 0 { + next_delay = Some(Duration::from_millis(50)); + } + } + Err(e) => { + println!("Source handler error: {}", e); + next_delay = Some(Duration::from_millis(50)); + } + } + if let Some(delay) = next_delay { + thread::sleep(delay); + } else { + undelayed_call_count += 1; + } + if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + // Safety feature against configuration errors. + if undelayed_call_count >= 200 { + panic!("Source handler state machine possible in permanent loop"); + } + } + }); + + let jh_dest = thread::spawn(move || { + loop { + let mut next_delay = None; + let mut undelayed_call_count = 0; + let packet_info = match source_rx.try_recv() { + Ok(pdu_with_info) => Some(pdu_with_info), + Err(e) => match e { + mpsc::TryRecvError::Empty => None, + mpsc::TryRecvError::Disconnected => { + panic!("unexpected disconnect from destination channel sender"); + } + }, + }; + match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) { + Ok(sent_packets) => { + if sent_packets == 0 { + next_delay = Some(Duration::from_millis(50)); + } + } + Err(e) => { + println!("Source handler error: {}", e); + next_delay = Some(Duration::from_millis(50)); + } + } + if let Some(delay) = next_delay { + thread::sleep(delay); + } else { + undelayed_call_count += 1; + } + if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + // Safety feature against configuration errors. + if undelayed_call_count >= 200 { + panic!("Destination handler state machine possible in permanent loop"); + } + } + }); + + loop { + if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed) + && completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed) + { + let file = std::fs::read_to_string(destfile).expect("reading file failed"); + assert_eq!(file, FILE_DATA); + // Stop the threads gracefully. + stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed); + break; + } + if std::time::Instant::now() - start > Duration::from_secs(2) { + panic!("file transfer not finished in 2 seconds"); + } + std::thread::sleep(Duration::from_millis(50)); + } + + jh_source.join().unwrap(); + jh_dest.join().unwrap(); +} diff --git a/src/dest.rs b/src/dest.rs index e168dd4..7657442 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -1535,8 +1535,7 @@ mod tests { TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); - tb - .handler + tb.handler .state_machine_no_packet(&mut user) .expect("fsm error"); tb.state_check( @@ -1544,8 +1543,7 @@ mod tests { TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); - tb - .handler + tb.handler .state_machine_no_packet(&mut user) .expect("fsm error"); tb.state_check(State::Idle, TransactionStep::Idle); diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index bd44b0b..bb4a285 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -13,8 +13,8 @@ use cfdp::{ request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, - DummyPduProvider, EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, - RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider, + EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig, + StdCheckTimerCreator, TransactionId, UserFaultHookProvider, }; use spacepackets::{ cfdp::{ChecksumType, ConditionCode, TransmissionMode},