simplify interop code
All checks were successful
Rust/cfdp/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2024-08-29 11:56:16 +02:00
parent 5d2cd6d383
commit 2c73d0afca
Signed by: muellerr
GPG Key ID: A649FB78196E3849

View File

@ -3,7 +3,7 @@ use std::{
fs::OpenOptions, fs::OpenOptions,
io::{self, ErrorKind, Write}, io::{self, ErrorKind, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::{atomic::AtomicBool, mpsc, Arc}, sync::mpsc,
thread, thread,
time::Duration, time::Duration,
}; };
@ -82,15 +82,11 @@ impl UserFaultHookProvider for ExampleFaultHandler {
pub struct ExampleCfdpUser { pub struct ExampleCfdpUser {
entity_type: EntityType, entity_type: EntityType,
completion_signal: Arc<AtomicBool>,
} }
impl ExampleCfdpUser { impl ExampleCfdpUser {
pub fn new(entity_type: EntityType, completion_signal: Arc<AtomicBool>) -> Self { pub fn new(entity_type: EntityType) -> Self {
Self { Self { entity_type }
entity_type,
completion_signal,
}
} }
} }
@ -114,8 +110,6 @@ impl CfdpUser for ExampleCfdpUser {
"{:?} entity: Transaction finished: {:?}", "{:?} entity: Transaction finished: {:?}",
self.entity_type, finished_params self.entity_type, finished_params
); );
self.completion_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
} }
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) { fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
@ -297,16 +291,6 @@ fn main() {
.chain(std::io::stdout()) .chain(std::io::stdout())
.apply() .apply()
.unwrap(); .unwrap();
// Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone();
// let stop_signal_ctrl = stop_signal_source.clone();
let completion_signal_source = Arc::new(AtomicBool::new(false));
// let completion_signal_source_main = completion_signal_source.clone();
let completion_signal_dest = Arc::new(AtomicBool::new(false));
// let completion_signal_dest_main = completion_signal_dest.clone();
let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
@ -345,7 +329,7 @@ fn main() {
remote_cfg_python, remote_cfg_python,
seq_count_provider, seq_count_provider,
); );
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source); let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending);
let local_cfg_dest = LocalEntityConfig::new( let local_cfg_dest = LocalEntityConfig::new(
RUST_ID.into(), RUST_ID.into(),
@ -360,7 +344,7 @@ fn main() {
remote_cfg_python, remote_cfg_python,
StdCheckTimerCreator::default(), StdCheckTimerCreator::default(),
); );
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);
let put_request = PutRequestOwned::new_regular_request( let put_request = PutRequestOwned::new_regular_request(
PYTHON_ID.into(), PYTHON_ID.into(),
@ -422,9 +406,6 @@ fn main() {
} else { } else {
undelayed_call_count += 1; undelayed_call_count += 1;
} }
if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors. // Safety feature against configuration errors.
if undelayed_call_count >= 200 { if undelayed_call_count >= 200 {
panic!("Source handler state machine possible in permanent loop"); panic!("Source handler state machine possible in permanent loop");
@ -465,9 +446,6 @@ fn main() {
} else { } else {
undelayed_call_count += 1; undelayed_call_count += 1;
} }
if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors. // Safety feature against configuration errors.
if undelayed_call_count >= 200 { if undelayed_call_count >= 200 {
panic!("Destination handler state machine possible in permanent loop"); panic!("Destination handler state machine possible in permanent loop");
@ -502,24 +480,6 @@ fn main() {
}) })
.unwrap(); .unwrap();
//loop {
/*
if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed)
&& completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed)
{
let file = std::fs::read_to_string(destfile).expect("reading file failed");
assert_eq!(file, FILE_DATA);
// Stop the threads gracefully.
stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
if std::time::Instant::now() - start > Duration::from_secs(20) {
panic!("file transfer not finished in 20 seconds");
}
*/
//std::thread::sleep(Duration::from_millis(50));
//}
jh_source.join().unwrap(); jh_source.join().unwrap();
jh_dest.join().unwrap(); jh_dest.join().unwrap();
jh_udp_server.join().unwrap(); jh_udp_server.join().unwrap();