Continue CFDP handlers #90

Merged
muellerr merged 34 commits from continue_cfdp_handlers into main 2024-01-29 23:42:04 +01:00
Showing only changes of commit 42cb3f7e6b - Show all commits

View File

@ -216,13 +216,17 @@ impl DestinationHandler {
/// Returns [None] if the state machine is IDLE, and the transmission mode of the current /// Returns [None] if the state machine is IDLE, and the transmission mode of the current
/// request otherwise. /// request otherwise.
pub fn current_transmission_mode(&self) -> Option<TransmissionMode> { pub fn transmission_mode(&self) -> Option<TransmissionMode> {
if self.state == State::Idle { if self.state == State::Idle {
return None; return None;
} }
Some(self.tparams.transmission_mode()) Some(self.tparams.transmission_mode())
} }
pub fn transaction_id(&self) -> Option<TransactionId> {
self.tstate().transaction_id
}
pub fn packet_to_send_ready(&self) -> bool { pub fn packet_to_send_ready(&self) -> bool {
self.packets_to_send_ctx.packet_available self.packets_to_send_ctx.packet_available
} }
@ -399,6 +403,7 @@ impl DestinationHandler {
self.declare_fault(ConditionCode::FilestoreRejection); self.declare_fault(ConditionCode::FilestoreRejection);
return Err(e.into()); return Err(e.into());
} }
self.tstate_mut().progress += fd_pdu.file_data().len() as u64;
Ok(()) Ok(())
} }
@ -698,7 +703,9 @@ impl DestinationHandler {
self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled; self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled;
} }
fn notice_of_suspension(&mut self) {} fn notice_of_suspension(&mut self) {
// TODO: Implement suspension handling.
}
fn abandon_transaction(&mut self) { fn abandon_transaction(&mut self) {
self.reset(); self.reset();
} }
@ -729,9 +736,9 @@ impl DestinationHandler {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use core::{cell::Cell, sync::atomic::AtomicBool}; use core::{cell::Cell, sync::atomic::AtomicBool};
use std::fs;
#[allow(unused_imports)] #[allow(unused_imports)]
use std::println; use std::println;
use std::{fs, sync::Mutex};
use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec}; use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec};
use rand::Rng; use rand::Rng;
@ -886,12 +893,12 @@ mod tests {
} }
} }
#[derive(Default)] #[derive(Default, Clone)]
struct TestFaultHandler { struct TestFaultHandler {
notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>, notice_of_suspension_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>, notice_of_cancellation_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
abandoned_queue: VecDeque<(TransactionId, ConditionCode, u64)>, abandoned_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>, ignored_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
} }
impl UserFaultHandler for TestFaultHandler { impl UserFaultHandler for TestFaultHandler {
@ -901,8 +908,11 @@ mod tests {
cond: ConditionCode, cond: ConditionCode,
progress: u64, progress: u64,
) { ) {
self.notice_of_suspension_queue self.notice_of_suspension_queue.lock().unwrap().push_back((
.push_back((transaction_id, cond, progress)) transaction_id,
cond,
progress,
))
} }
fn notice_of_cancellation_cb( fn notice_of_cancellation_cb(
@ -912,6 +922,8 @@ mod tests {
progress: u64, progress: u64,
) { ) {
self.notice_of_cancellation_queue self.notice_of_cancellation_queue
.lock()
.unwrap()
.push_back((transaction_id, cond, progress)) .push_back((transaction_id, cond, progress))
} }
@ -922,15 +934,40 @@ mod tests {
progress: u64, progress: u64,
) { ) {
self.abandoned_queue self.abandoned_queue
.lock()
.unwrap()
.push_back((transaction_id, cond, progress)) .push_back((transaction_id, cond, progress))
} }
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
self.ignored_queue self.ignored_queue
.lock()
.unwrap()
.push_back((transaction_id, cond, progress)) .push_back((transaction_id, cond, progress))
} }
} }
impl TestFaultHandler {
fn suspension_queue_empty(&self) -> bool {
self.notice_of_suspension_queue.lock().unwrap().is_empty()
}
fn cancellation_queue_empty(&self) -> bool {
self.notice_of_cancellation_queue.lock().unwrap().is_empty()
}
fn ignored_queue_empty(&self) -> bool {
self.ignored_queue.lock().unwrap().is_empty()
}
fn abandoned_queue_empty(&self) -> bool {
self.abandoned_queue.lock().unwrap().is_empty()
}
fn all_queues_empty(&self) -> bool {
self.suspension_queue_empty()
&& self.cancellation_queue_empty()
&& self.ignored_queue_empty()
&& self.abandoned_queue_empty()
}
}
#[derive(Debug)] #[derive(Debug)]
struct TestCheckTimer { struct TestCheckTimer {
counter: Cell<u32>, counter: Cell<u32>,
@ -976,7 +1013,7 @@ mod tests {
} }
} }
struct TestClass { struct DestHandlerTester {
check_timer_expired: Arc<AtomicBool>, check_timer_expired: Arc<AtomicBool>,
handler: DestinationHandler, handler: DestinationHandler,
src_path: PathBuf, src_path: PathBuf,
@ -989,10 +1026,10 @@ mod tests {
buf: [u8; 512], buf: [u8; 512],
} }
impl TestClass { impl DestHandlerTester {
fn new() -> Self { fn new(fault_handler: TestFaultHandler) -> Self {
let check_timer_expired = Arc::new(AtomicBool::new(false)); let check_timer_expired = Arc::new(AtomicBool::new(false));
let dest_handler = default_dest_handler(check_timer_expired.clone()); let dest_handler = default_dest_handler(fault_handler, check_timer_expired.clone());
let (src_path, dest_path) = init_full_filenames(); let (src_path, dest_path) = init_full_filenames();
assert!(!Path::exists(&dest_path)); assert!(!Path::exists(&dest_path));
let handler = Self { let handler = Self {
@ -1011,6 +1048,10 @@ mod tests {
handler handler
} }
fn dest_path(&self) -> &PathBuf {
&self.dest_path
}
#[allow(dead_code)] #[allow(dead_code)]
fn indication_cfg_mut(&mut self) -> &mut IndicationConfig { fn indication_cfg_mut(&mut self) -> &mut IndicationConfig {
&mut self.handler.local_cfg.indication_cfg &mut self.handler.local_cfg.indication_cfg
@ -1038,7 +1079,7 @@ mod tests {
&mut self, &mut self,
user: &mut TestCfdpUser, user: &mut TestCfdpUser,
file_size: u64, file_size: u64,
) -> Result<(), DestError> { ) -> Result<TransactionId, DestError> {
self.expected_file_size = file_size; self.expected_file_size = file_size;
let metadata_pdu = create_metadata_pdu( let metadata_pdu = create_metadata_pdu(
&self.pdu_header, &self.pdu_header,
@ -1047,13 +1088,13 @@ mod tests {
file_size, file_size,
); );
let packet_info = create_packet_info(&metadata_pdu, &mut self.buf); let packet_info = create_packet_info(&metadata_pdu, &mut self.buf);
let result = self.handler.state_machine(user, Some(&packet_info)); self.handler.state_machine(user, Some(&packet_info))?;
assert_eq!(user.metadata_recv_queue.len(), 1); assert_eq!(user.metadata_recv_queue.len(), 1);
assert_eq!( assert_eq!(
self.handler.current_transmission_mode().unwrap(), self.handler.transmission_mode().unwrap(),
TransmissionMode::Unacknowledged TransmissionMode::Unacknowledged
); );
result Ok(self.handler.transaction_id().unwrap())
} }
fn generic_file_data_insert( fn generic_file_data_insert(
@ -1103,7 +1144,7 @@ mod tests {
} }
} }
impl Drop for TestClass { impl Drop for DestHandlerTester {
fn drop(&mut self) { fn drop(&mut self) {
if self.check_handler_idle_at_drop { if self.check_handler_idle_at_drop {
self.state_check(State::Idle, TransactionStep::Idle); self.state_check(State::Idle, TransactionStep::Idle);
@ -1143,8 +1184,10 @@ mod tests {
table table
} }
fn default_dest_handler(check_timer_expired: Arc<AtomicBool>) -> DestinationHandler { fn default_dest_handler(
let test_fault_handler = TestFaultHandler::default(); test_fault_handler: TestFaultHandler,
check_timer_expired: Arc<AtomicBool>,
) -> DestinationHandler {
let local_entity_cfg = LocalEntityConfig { let local_entity_cfg = LocalEntityConfig {
id: REMOTE_ID.into(), id: REMOTE_ID.into(),
indication_cfg: IndicationConfig::default(), indication_cfg: IndicationConfig::default(),
@ -1208,13 +1251,16 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
let dest_handler = default_dest_handler(Arc::default()); let fault_handler = TestFaultHandler::default();
assert!(dest_handler.current_transmission_mode().is_none()); let dest_handler = default_dest_handler(fault_handler.clone(), Arc::default());
assert!(dest_handler.transmission_mode().is_none());
assert!(fault_handler.all_queues_empty());
} }
#[test] #[test]
fn test_empty_file_transfer_not_acked() { fn test_empty_file_transfer_not_acked() {
let mut test_obj = TestClass::new(); let fault_handler = TestFaultHandler::default();
let mut test_obj = DestHandlerTester::new(fault_handler.clone());
let mut test_user = test_obj.test_user_from_cached_paths(0); let mut test_user = test_obj.test_user_from_cached_paths(0);
test_obj test_obj
.generic_transfer_init(&mut test_user, 0) .generic_transfer_init(&mut test_user, 0)
@ -1223,6 +1269,7 @@ mod tests {
test_obj test_obj
.generic_eof_no_error(&mut test_user, Vec::new()) .generic_eof_no_error(&mut test_user, Vec::new())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert!(fault_handler.all_queues_empty());
} }
#[test] #[test]
@ -1230,8 +1277,9 @@ mod tests {
let file_data_str = "Hello World!"; let file_data_str = "Hello World!";
let file_data = file_data_str.as_bytes(); let file_data = file_data_str.as_bytes();
let file_size = file_data.len() as u64; let file_size = file_data.len() as u64;
let fault_handler = TestFaultHandler::default();
let mut test_obj = TestClass::new(); let mut test_obj = DestHandlerTester::new(fault_handler.clone());
let mut test_user = test_obj.test_user_from_cached_paths(file_size); let mut test_user = test_obj.test_user_from_cached_paths(file_size);
test_obj test_obj
.generic_transfer_init(&mut test_user, file_size) .generic_transfer_init(&mut test_user, file_size)
@ -1243,6 +1291,7 @@ mod tests {
test_obj test_obj
.generic_eof_no_error(&mut test_user, file_data.to_vec()) .generic_eof_no_error(&mut test_user, file_data.to_vec())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert!(fault_handler.all_queues_empty());
} }
#[test] #[test]
@ -1252,8 +1301,9 @@ mod tests {
rng.fill(&mut random_data); rng.fill(&mut random_data);
let file_size = random_data.len() as u64; let file_size = random_data.len() as u64;
let segment_len = 256; let segment_len = 256;
let fault_handler = TestFaultHandler::default();
let mut test_obj = TestClass::new(); let mut test_obj = DestHandlerTester::new(fault_handler.clone());
let mut test_user = test_obj.test_user_from_cached_paths(file_size); let mut test_user = test_obj.test_user_from_cached_paths(file_size);
test_obj test_obj
.generic_transfer_init(&mut test_user, file_size) .generic_transfer_init(&mut test_user, file_size)
@ -1272,6 +1322,7 @@ mod tests {
test_obj test_obj
.generic_eof_no_error(&mut test_user, random_data.to_vec()) .generic_eof_no_error(&mut test_user, random_data.to_vec())
.expect("EOF no error insertion failed"); .expect("EOF no error insertion failed");
assert!(fault_handler.all_queues_empty());
} }
#[test] #[test]
@ -1281,10 +1332,11 @@ mod tests {
rng.fill(&mut random_data); rng.fill(&mut random_data);
let file_size = random_data.len() as u64; let file_size = random_data.len() as u64;
let segment_len = 256; let segment_len = 256;
let fault_handler = TestFaultHandler::default();
let mut test_obj = TestClass::new(); let mut test_obj = DestHandlerTester::new(fault_handler.clone());
let mut test_user = test_obj.test_user_from_cached_paths(file_size); let mut test_user = test_obj.test_user_from_cached_paths(file_size);
test_obj let transaction_id = test_obj
.generic_transfer_init(&mut test_user, file_size) .generic_transfer_init(&mut test_user, file_size)
.expect("transfer init failed"); .expect("transfer init failed");
@ -1311,6 +1363,13 @@ mod tests {
.handler .handler
.state_machine(&mut test_user, None) .state_machine(&mut test_user, None)
.expect("fsm failure"); .expect("fsm failure");
let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
assert_eq!(ignored_queue.len(), 1);
let cancelled = *ignored_queue.front().unwrap();
assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
assert_eq!(cancelled.2, segment_len as u64);
} }
#[test] #[test]
@ -1321,9 +1380,10 @@ mod tests {
let file_size = random_data.len() as u64; let file_size = random_data.len() as u64;
let segment_len = 256; let segment_len = 256;
let mut test_obj = TestClass::new(); let fault_handler = TestFaultHandler::default();
let mut test_obj = DestHandlerTester::new(fault_handler.clone());
let mut test_user = test_obj.test_user_from_cached_paths(file_size); let mut test_user = test_obj.test_user_from_cached_paths(file_size);
test_obj let transaction_id = test_obj
.generic_transfer_init(&mut test_user, file_size) .generic_transfer_init(&mut test_user, file_size)
.expect("transfer init failed"); .expect("transfer init failed");
@ -1353,6 +1413,35 @@ mod tests {
.state_machine(&mut test_user, None) .state_machine(&mut test_user, None)
.expect("fsm error"); .expect("fsm error");
test_obj.state_check(State::Idle, TransactionStep::Idle); test_obj.state_check(State::Idle, TransactionStep::Idle);
assert!(fault_handler
.notice_of_suspension_queue
.lock()
.unwrap()
.is_empty());
let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
assert_eq!(ignored_queue.len(), 1);
let cancelled = *ignored_queue.front().unwrap();
assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
assert_eq!(cancelled.2, segment_len as u64);
let cancelled_queue = fault_handler.notice_of_cancellation_queue.lock().unwrap();
assert_eq!(cancelled_queue.len(), 1);
let cancelled = *cancelled_queue.front().unwrap();
assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::CheckLimitReached);
assert_eq!(cancelled.2, segment_len as u64);
drop(cancelled_queue);
// Check that the broken file exists.
test_obj.check_dest_file = false; test_obj.check_dest_file = false;
assert!(Path::exists(test_obj.dest_path()));
let read_content = fs::read(test_obj.dest_path()).expect("reading back string failed");
assert_eq!(read_content.len(), segment_len);
assert_eq!(read_content, &random_data[0..segment_len]);
assert!(fs::remove_file(test_obj.dest_path().as_path()).is_ok());
} }
} }