diff --git a/satrs-core/src/cfdp/dest.rs b/satrs-core/src/cfdp/dest.rs index a6fa9e5..28ba401 100644 --- a/satrs-core/src/cfdp/dest.rs +++ b/satrs-core/src/cfdp/dest.rs @@ -216,13 +216,17 @@ impl DestinationHandler { /// Returns [None] if the state machine is IDLE, and the transmission mode of the current /// request otherwise. - pub fn current_transmission_mode(&self) -> Option { + pub fn transmission_mode(&self) -> Option { if self.state == State::Idle { return None; } Some(self.tparams.transmission_mode()) } + pub fn transaction_id(&self) -> Option { + self.tstate().transaction_id + } + pub fn packet_to_send_ready(&self) -> bool { self.packets_to_send_ctx.packet_available } @@ -399,6 +403,7 @@ impl DestinationHandler { self.declare_fault(ConditionCode::FilestoreRejection); return Err(e.into()); } + self.tstate_mut().progress += fd_pdu.file_data().len() as u64; Ok(()) } @@ -698,7 +703,9 @@ impl DestinationHandler { self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled; } - fn notice_of_suspension(&mut self) {} + fn notice_of_suspension(&mut self) { + // TODO: Implement suspension handling. + } fn abandon_transaction(&mut self) { self.reset(); } @@ -729,9 +736,9 @@ impl DestinationHandler { #[cfg(test)] mod tests { use core::{cell::Cell, sync::atomic::AtomicBool}; - use std::fs; #[allow(unused_imports)] use std::println; + use std::{fs, sync::Mutex}; use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec}; use rand::Rng; @@ -886,12 +893,12 @@ mod tests { } } - #[derive(Default)] + #[derive(Default, Clone)] struct TestFaultHandler { - notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, - notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>, - abandoned_queue: VecDeque<(TransactionId, ConditionCode, u64)>, - ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>, + notice_of_suspension_queue: Arc>>, + notice_of_cancellation_queue: Arc>>, + abandoned_queue: Arc>>, + ignored_queue: Arc>>, } impl UserFaultHandler for TestFaultHandler { @@ -901,8 +908,11 @@ mod tests { cond: ConditionCode, progress: u64, ) { - self.notice_of_suspension_queue - .push_back((transaction_id, cond, progress)) + self.notice_of_suspension_queue.lock().unwrap().push_back(( + transaction_id, + cond, + progress, + )) } fn notice_of_cancellation_cb( @@ -912,6 +922,8 @@ mod tests { progress: u64, ) { self.notice_of_cancellation_queue + .lock() + .unwrap() .push_back((transaction_id, cond, progress)) } @@ -922,15 +934,40 @@ mod tests { progress: u64, ) { self.abandoned_queue + .lock() + .unwrap() .push_back((transaction_id, cond, progress)) } fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { self.ignored_queue + .lock() + .unwrap() .push_back((transaction_id, cond, progress)) } } + impl TestFaultHandler { + fn suspension_queue_empty(&self) -> bool { + self.notice_of_suspension_queue.lock().unwrap().is_empty() + } + fn cancellation_queue_empty(&self) -> bool { + self.notice_of_cancellation_queue.lock().unwrap().is_empty() + } + fn ignored_queue_empty(&self) -> bool { + self.ignored_queue.lock().unwrap().is_empty() + } + fn abandoned_queue_empty(&self) -> bool { + self.abandoned_queue.lock().unwrap().is_empty() + } + fn all_queues_empty(&self) -> bool { + self.suspension_queue_empty() + && self.cancellation_queue_empty() + && self.ignored_queue_empty() + && self.abandoned_queue_empty() + } + } + #[derive(Debug)] struct TestCheckTimer { counter: Cell, @@ -976,7 +1013,7 @@ mod tests { } } - struct TestClass { + struct DestHandlerTester { check_timer_expired: Arc, handler: DestinationHandler, src_path: PathBuf, @@ -989,10 +1026,10 @@ mod tests { buf: [u8; 512], } - impl TestClass { - fn new() -> Self { + impl DestHandlerTester { + fn new(fault_handler: TestFaultHandler) -> Self { let check_timer_expired = Arc::new(AtomicBool::new(false)); - let dest_handler = default_dest_handler(check_timer_expired.clone()); + let dest_handler = default_dest_handler(fault_handler, check_timer_expired.clone()); let (src_path, dest_path) = init_full_filenames(); assert!(!Path::exists(&dest_path)); let handler = Self { @@ -1011,6 +1048,10 @@ mod tests { handler } + fn dest_path(&self) -> &PathBuf { + &self.dest_path + } + #[allow(dead_code)] fn indication_cfg_mut(&mut self) -> &mut IndicationConfig { &mut self.handler.local_cfg.indication_cfg @@ -1038,7 +1079,7 @@ mod tests { &mut self, user: &mut TestCfdpUser, file_size: u64, - ) -> Result<(), DestError> { + ) -> Result { self.expected_file_size = file_size; let metadata_pdu = create_metadata_pdu( &self.pdu_header, @@ -1047,13 +1088,13 @@ mod tests { file_size, ); let packet_info = create_packet_info(&metadata_pdu, &mut self.buf); - let result = self.handler.state_machine(user, Some(&packet_info)); + self.handler.state_machine(user, Some(&packet_info))?; assert_eq!(user.metadata_recv_queue.len(), 1); assert_eq!( - self.handler.current_transmission_mode().unwrap(), + self.handler.transmission_mode().unwrap(), TransmissionMode::Unacknowledged ); - result + Ok(self.handler.transaction_id().unwrap()) } fn generic_file_data_insert( @@ -1103,7 +1144,7 @@ mod tests { } } - impl Drop for TestClass { + impl Drop for DestHandlerTester { fn drop(&mut self) { if self.check_handler_idle_at_drop { self.state_check(State::Idle, TransactionStep::Idle); @@ -1143,8 +1184,10 @@ mod tests { table } - fn default_dest_handler(check_timer_expired: Arc) -> DestinationHandler { - let test_fault_handler = TestFaultHandler::default(); + fn default_dest_handler( + test_fault_handler: TestFaultHandler, + check_timer_expired: Arc, + ) -> DestinationHandler { let local_entity_cfg = LocalEntityConfig { id: REMOTE_ID.into(), indication_cfg: IndicationConfig::default(), @@ -1208,13 +1251,16 @@ mod tests { #[test] fn test_basic() { - let dest_handler = default_dest_handler(Arc::default()); - assert!(dest_handler.current_transmission_mode().is_none()); + let fault_handler = TestFaultHandler::default(); + let dest_handler = default_dest_handler(fault_handler.clone(), Arc::default()); + assert!(dest_handler.transmission_mode().is_none()); + assert!(fault_handler.all_queues_empty()); } #[test] fn test_empty_file_transfer_not_acked() { - let mut test_obj = TestClass::new(); + let fault_handler = TestFaultHandler::default(); + let mut test_obj = DestHandlerTester::new(fault_handler.clone()); let mut test_user = test_obj.test_user_from_cached_paths(0); test_obj .generic_transfer_init(&mut test_user, 0) @@ -1223,6 +1269,7 @@ mod tests { test_obj .generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); + assert!(fault_handler.all_queues_empty()); } #[test] @@ -1230,8 +1277,9 @@ mod tests { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; + let fault_handler = TestFaultHandler::default(); - let mut test_obj = TestClass::new(); + let mut test_obj = DestHandlerTester::new(fault_handler.clone()); let mut test_user = test_obj.test_user_from_cached_paths(file_size); test_obj .generic_transfer_init(&mut test_user, file_size) @@ -1243,6 +1291,7 @@ mod tests { test_obj .generic_eof_no_error(&mut test_user, file_data.to_vec()) .expect("EOF no error insertion failed"); + assert!(fault_handler.all_queues_empty()); } #[test] @@ -1252,8 +1301,9 @@ mod tests { rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; + let fault_handler = TestFaultHandler::default(); - let mut test_obj = TestClass::new(); + let mut test_obj = DestHandlerTester::new(fault_handler.clone()); let mut test_user = test_obj.test_user_from_cached_paths(file_size); test_obj .generic_transfer_init(&mut test_user, file_size) @@ -1272,6 +1322,7 @@ mod tests { test_obj .generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); + assert!(fault_handler.all_queues_empty()); } #[test] @@ -1281,10 +1332,11 @@ mod tests { rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; + let fault_handler = TestFaultHandler::default(); - let mut test_obj = TestClass::new(); + let mut test_obj = DestHandlerTester::new(fault_handler.clone()); let mut test_user = test_obj.test_user_from_cached_paths(file_size); - test_obj + let transaction_id = test_obj .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); @@ -1311,6 +1363,13 @@ mod tests { .handler .state_machine(&mut test_user, None) .expect("fsm failure"); + + let ignored_queue = fault_handler.ignored_queue.lock().unwrap(); + assert_eq!(ignored_queue.len(), 1); + let cancelled = *ignored_queue.front().unwrap(); + assert_eq!(cancelled.0, transaction_id); + assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); + assert_eq!(cancelled.2, segment_len as u64); } #[test] @@ -1321,9 +1380,10 @@ mod tests { let file_size = random_data.len() as u64; let segment_len = 256; - let mut test_obj = TestClass::new(); + let fault_handler = TestFaultHandler::default(); + let mut test_obj = DestHandlerTester::new(fault_handler.clone()); let mut test_user = test_obj.test_user_from_cached_paths(file_size); - test_obj + let transaction_id = test_obj .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); @@ -1353,6 +1413,35 @@ mod tests { .state_machine(&mut test_user, None) .expect("fsm error"); test_obj.state_check(State::Idle, TransactionStep::Idle); + + assert!(fault_handler + .notice_of_suspension_queue + .lock() + .unwrap() + .is_empty()); + + let ignored_queue = fault_handler.ignored_queue.lock().unwrap(); + assert_eq!(ignored_queue.len(), 1); + let cancelled = *ignored_queue.front().unwrap(); + assert_eq!(cancelled.0, transaction_id); + assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure); + assert_eq!(cancelled.2, segment_len as u64); + + let cancelled_queue = fault_handler.notice_of_cancellation_queue.lock().unwrap(); + assert_eq!(cancelled_queue.len(), 1); + let cancelled = *cancelled_queue.front().unwrap(); + assert_eq!(cancelled.0, transaction_id); + assert_eq!(cancelled.1, ConditionCode::CheckLimitReached); + assert_eq!(cancelled.2, segment_len as u64); + + drop(cancelled_queue); + + // Check that the broken file exists. test_obj.check_dest_file = false; + assert!(Path::exists(test_obj.dest_path())); + let read_content = fs::read(test_obj.dest_path()).expect("reading back string failed"); + assert_eq!(read_content.len(), segment_len); + assert_eq!(read_content, &random_data[0..segment_len]); + assert!(fs::remove_file(test_obj.dest_path().as_path()).is_ok()); } }