From 7776847364bea2a745069752cf2e00e8d01c05da Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 22 Dec 2023 19:24:48 +0100 Subject: [PATCH] rework timer and packet send handling --- satrs-core/src/cfdp/dest.rs | 202 ++++++++++++++++++++---------------- satrs-core/src/cfdp/mod.rs | 45 ++++++-- 2 files changed, 144 insertions(+), 103 deletions(-) diff --git a/satrs-core/src/cfdp/dest.rs b/satrs-core/src/cfdp/dest.rs index bb13ec5..627a3cd 100644 --- a/satrs-core/src/cfdp/dest.rs +++ b/satrs-core/src/cfdp/dest.rs @@ -6,7 +6,8 @@ use super::{ filestore::{FilestoreError, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, CheckTimer, CheckTimerCreator, EntityType, LocalEntityConfig, PacketInfo, PacketTarget, - RemoteEntityConfig, RemoteEntityConfigProvider, State, TransactionId, TransactionStep, + RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TransactionId, + TransactionStep, }; use alloc::boxed::Box; use smallvec::SmallVec; @@ -26,12 +27,6 @@ use spacepackets::{ }; use thiserror::Error; -#[derive(Debug, Default)] -struct PacketsToSendContext { - packet_available: bool, - directive: Option, -} - #[derive(Debug)] struct FileProperties { src_file_name: [u8; u8::MAX as usize], @@ -171,6 +166,15 @@ pub enum DestError { NoRemoteCfgFound(UnsignedByteField), } +pub trait CfdpPacketSender: Send { + fn send_pdu( + &mut self, + pdu_type: PduType, + file_directive_type: Option, + raw_pdu: &[u8], + ) -> Result<(), PduError>; +} + /// This is the primary CFDP destination handler. It models the CFDP destination entity, which is /// primarily responsible for receiving files sent from another CFDP entity. It performs the /// reception side of File Copy Operations. @@ -194,7 +198,8 @@ pub struct DestinationHandler { step: TransactionStep, state: State, tparams: TransactionParams, - packets_to_send_ctx: PacketsToSendContext, + packet_buf: alloc::vec::Vec, + packet_sender: Box, vfs: Box, remote_cfg_table: Box, check_timer_creator: Box, @@ -203,6 +208,8 @@ pub struct DestinationHandler { impl DestinationHandler { pub fn new( local_cfg: LocalEntityConfig, + max_packet_len: usize, + packet_sender: Box, vfs: Box, remote_cfg_table: Box, check_timer_creator: Box, @@ -212,7 +219,8 @@ impl DestinationHandler { step: TransactionStep::Idle, state: State::Idle, tparams: Default::default(), - packets_to_send_ctx: Default::default(), + packet_buf: alloc::vec![0; max_packet_len], + packet_sender, vfs, remote_cfg_table, check_timer_creator, @@ -232,10 +240,7 @@ impl DestinationHandler { &mut self, cfdp_user: &mut impl CfdpUser, packet_to_insert: Option<&PacketInfo>, - ) -> Result<(), DestError> { - if self.packet_to_send_ready() { - return Err(DestError::PacketToSendLeft); - } + ) -> Result { if let Some(packet) = packet_to_insert { self.insert_packet(cfdp_user, packet)?; } @@ -259,54 +264,6 @@ impl DestinationHandler { self.tstate().transaction_id } - pub fn packet_to_send_ready(&self) -> bool { - self.packets_to_send_ctx.packet_available - } - - pub fn get_next_packet( - &self, - buf: &mut [u8], - ) -> Result, DestError> { - if !self.packet_to_send_ready() { - return Ok(None); - } - let directive = self.packets_to_send_ctx.directive.unwrap(); - let tstate = self.tstate(); - let written_size = match directive { - FileDirectiveType::FinishedPdu => { - let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0); - let finished_pdu = if tstate.condition_code == ConditionCode::NoError - || tstate.condition_code == ConditionCode::UnsupportedChecksumType - { - FinishedPduCreator::new_default( - pdu_header, - tstate.delivery_code, - tstate.file_status, - ) - } else { - // TODO: Are there cases where this ID is actually the source entity ID? - let entity_id = EntityIdTlv::new(self.local_cfg.id); - FinishedPduCreator::new_with_error( - pdu_header, - tstate.condition_code, - tstate.delivery_code, - tstate.file_status, - entity_id, - ) - }; - finished_pdu.write_to_bytes(buf)? - } - FileDirectiveType::AckPdu => todo!(), - FileDirectiveType::NakPdu => todo!(), - FileDirectiveType::KeepAlivePdu => todo!(), - _ => { - // This should never happen and is considered an internal impl error - panic!("invalid file directive {directive:?} for dest handler send packet"); - } - }; - Ok(Some((directive, written_size))) - } - fn insert_packet( &mut self, cfdp_user: &mut impl CfdpUser, @@ -532,12 +489,14 @@ impl DestinationHandler { fn start_check_limit_handling(&mut self) { self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling; - self.tparams.tstate.current_check_timer = - Some(self.check_timer_creator.get_check_timer_provider( - &self.local_cfg.id, - &self.tparams.remote_cfg.unwrap().entity_id, - EntityType::Receiving, - )); + self.tparams.tstate.current_check_timer = Some( + self.check_timer_creator + .get_check_timer_provider(TimerContext::CheckLimit { + local_id: self.local_cfg.id, + remote_id: self.tparams.remote_cfg.unwrap().entity_id, + entity_type: EntityType::Receiving, + }), + ); self.tparams.tstate.current_check_count = 0; } @@ -571,7 +530,8 @@ impl DestinationHandler { todo!(); } - fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { + let mut sent_packets = 0; if self.step == TransactionStep::TransactionStart { self.transaction_start(cfdp_user)?; } @@ -579,7 +539,7 @@ impl DestinationHandler { self.check_limit_handling(); } if self.step == TransactionStep::TransferCompletion { - self.transfer_completion(cfdp_user)?; + sent_packets += self.transfer_completion(cfdp_user)?; } if self.step == TransactionStep::SendingAckPdu { todo!("no support for acknowledged mode yet"); @@ -587,7 +547,7 @@ impl DestinationHandler { if self.step == TransactionStep::SendingFinishedPdu { self.reset(); } - Ok(()) + Ok(sent_packets) } /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. @@ -669,17 +629,18 @@ impl DestinationHandler { Ok(()) } - fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { + let mut sent_packets = 0; self.notice_of_completion(cfdp_user)?; if self.tparams.transmission_mode() == TransmissionMode::Acknowledged || self.tparams.metadata_params().closure_requested { - self.prepare_finished_pdu()?; + sent_packets += self.send_finished_pdu()?; self.step = TransactionStep::SendingFinishedPdu; } else { self.reset(); } - Ok(()) + Ok(sent_packets) } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { @@ -745,15 +706,36 @@ impl DestinationHandler { fn reset(&mut self) { self.step = TransactionStep::Idle; self.state = State::Idle; - self.packets_to_send_ctx.packet_available = false; + // self.packets_to_send_ctx.packet_available = false; self.tparams.reset(); } - fn prepare_finished_pdu(&mut self) -> Result<(), DestError> { - self.packets_to_send_ctx.packet_available = true; - self.packets_to_send_ctx.directive = Some(FileDirectiveType::FinishedPdu); - self.step = TransactionStep::SendingFinishedPdu; - Ok(()) + fn send_finished_pdu(&mut self) -> Result { + let tstate = self.tstate(); + + let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0); + let finished_pdu = if tstate.condition_code == ConditionCode::NoError + || tstate.condition_code == ConditionCode::UnsupportedChecksumType + { + FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status) + } else { + // TODO: Are there cases where this ID is actually the source entity ID? + let entity_id = EntityIdTlv::new(self.local_cfg.id); + FinishedPduCreator::new_with_error( + pdu_header, + tstate.condition_code, + tstate.delivery_code, + tstate.file_status, + entity_id, + ) + }; + finished_pdu.write_to_bytes(&mut self.packet_buf)?; + self.packet_sender.send_pdu( + finished_pdu.pdu_type(), + finished_pdu.file_directive_type(), + &self.packet_buf[0..finished_pdu.len_written()], + )?; + Ok(1) } fn tstate(&self) -> &TransferState { @@ -800,6 +782,27 @@ mod tests { pub length: usize, } + type SharedPduPacketQueue = Arc, Vec)>>>; + #[derive(Default, Clone)] + struct TestCfdpSender { + packet_queue: SharedPduPacketQueue, + } + + impl CfdpPacketSender for TestCfdpSender { + fn send_pdu( + &mut self, + pdu_type: PduType, + file_directive_type: Option, + raw_pdu: &[u8], + ) -> Result<(), PduError> { + self.packet_queue.lock().unwrap().push_back(( + pdu_type, + file_directive_type, + raw_pdu.to_vec(), + )); + Ok(()) + } + } #[derive(Default)] struct TestCfdpUser { next_expected_seq_num: u64, @@ -1025,28 +1028,33 @@ mod tests { } struct TestCheckTimerCreator { - expired_flag: Arc, + check_limit_expired_flag: Arc, } impl TestCheckTimerCreator { pub fn new(expired_flag: Arc) -> Self { - Self { expired_flag } + Self { + check_limit_expired_flag: expired_flag, + } } } impl CheckTimerCreator for TestCheckTimerCreator { - fn get_check_timer_provider( - &self, - _local_id: &UnsignedByteField, - _remote_id: &UnsignedByteField, - _entity_type: crate::cfdp::EntityType, - ) -> Box { - Box::new(TestCheckTimer::new(self.expired_flag.clone())) + fn get_check_timer_provider(&self, timer_context: TimerContext) -> Box { + match timer_context { + TimerContext::CheckLimit { .. } => { + Box::new(TestCheckTimer::new(self.check_limit_expired_flag.clone())) + } + _ => { + panic!("invalid check timer creator, can only be used for check limit handling") + } + } } } struct DestHandlerTester { check_timer_expired: Arc, + test_sender: TestCfdpSender, handler: DestinationHandler, src_path: PathBuf, dest_path: PathBuf, @@ -1061,11 +1069,17 @@ mod tests { impl DestHandlerTester { fn new(fault_handler: TestFaultHandler) -> Self { let check_timer_expired = Arc::new(AtomicBool::new(false)); - let dest_handler = default_dest_handler(fault_handler, check_timer_expired.clone()); + let test_sender = TestCfdpSender::default(); + let dest_handler = default_dest_handler( + fault_handler, + test_sender.clone(), + check_timer_expired.clone(), + ); let (src_path, dest_path) = init_full_filenames(); assert!(!Path::exists(&dest_path)); let handler = Self { check_timer_expired, + test_sender, handler: dest_handler, src_path, dest_path, @@ -1134,7 +1148,7 @@ mod tests { user: &mut TestCfdpUser, offset: u64, file_data_chunk: &[u8], - ) -> Result<(), DestError> { + ) -> Result { let filedata_pdu = FileDataPdu::new_no_seg_metadata(self.pdu_header, offset, file_data_chunk); filedata_pdu @@ -1157,7 +1171,7 @@ mod tests { &mut self, user: &mut TestCfdpUser, expected_full_data: Vec, - ) -> Result<(), DestError> { + ) -> Result { self.expected_full_data = expected_full_data; let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header); let packet_info = create_packet_info(&eof_pdu, &mut self.buf); @@ -1218,6 +1232,7 @@ mod tests { fn default_dest_handler( test_fault_handler: TestFaultHandler, + test_packet_sender: TestCfdpSender, check_timer_expired: Arc, ) -> DestinationHandler { let local_entity_cfg = LocalEntityConfig { @@ -1227,6 +1242,8 @@ mod tests { }; DestinationHandler::new( local_entity_cfg, + 2048, + Box::new(test_packet_sender), Box::::default(), Box::new(basic_remote_cfg_table()), Box::new(TestCheckTimerCreator::new(check_timer_expired)), @@ -1284,7 +1301,8 @@ mod tests { #[test] fn test_basic() { let fault_handler = TestFaultHandler::default(); - let dest_handler = default_dest_handler(fault_handler.clone(), Arc::default()); + let test_sender = TestCfdpSender::default(); + let dest_handler = default_dest_handler(fault_handler.clone(), test_sender, Arc::default()); assert!(dest_handler.transmission_mode().is_none()); assert!(fault_handler.all_queues_empty()); } diff --git a/satrs-core/src/cfdp/mod.rs b/satrs-core/src/cfdp/mod.rs index cf18fe7..7bf6776 100644 --- a/satrs-core/src/cfdp/mod.rs +++ b/satrs-core/src/cfdp/mod.rs @@ -31,7 +31,23 @@ pub enum EntityType { Receiving, } -/// Generic abstraction for a check timer which has different functionality depending on whether +pub enum TimerContext { + CheckLimit { + local_id: UnsignedByteField, + remote_id: UnsignedByteField, + entity_type: EntityType, + }, + NakActivity(f32), + PositiveAck(f32), +} + +/// Generic abstraction for a check timer which is used by 3 mechanisms of the CFDP protocol. +/// +/// ## 1. Check limit handling +/// +/// The first mechanism is the check limit handling for unacknowledged transfers as specified +/// in 4.6.3.2 and 4.6.3.3 of the CFDP standard. +/// For this mechanism, the timer has different functionality depending on whether /// the using entity is the sending entity or the receiving entity for the unacknowledged /// transmission mode. /// @@ -42,6 +58,18 @@ pub enum EntityType { /// For the receiving entity, this timer determines the expiry period for incrementing a check /// counter after an EOF PDU is received for an incomplete file transfer. This allows out-of-order /// reception of file data PDUs and EOF PDUs. Also see 4.6.3.3 of the CFDP standard. +/// +/// ## 2. NAK activity limit +/// +/// The timer will be used to perform the NAK activity check as specified in 4.6.4.7 of the CFDP +/// standard. The expiration period will be provided by the NAK timer expiration limit of the +/// remote entity configuration. +/// +/// ## 3. Positive ACK procedures +/// +/// The timer will be used to perform the Positive Acknowledgement Procedures as specified in +/// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer +/// interval of the remote entity configuration. pub trait CheckTimer: Debug { fn has_expired(&self) -> bool; fn reset(&mut self); @@ -50,19 +78,14 @@ pub trait CheckTimer: Debug { /// A generic trait which allows CFDP entities to create check timers which are required to /// implement special procedures in unacknowledged transmission mode, as specified in 4.6.3.2 /// and 4.6.3.3. The [CheckTimer] documentation provides more information about the purpose of the -/// check timer. +/// check timer in the context of CFDP. /// -/// This trait also allows the creation of different check timers depending on -/// the ID of the local entity, the ID of the remote entity for a given transaction, and the -/// type of entity. +/// This trait also allows the creation of different check timers depending on context and purpose +/// of the timer, the runtime environment (e.g. standard clock timer vs. timer using a RTC) or +/// other factors. #[cfg(feature = "alloc")] pub trait CheckTimerCreator { - fn get_check_timer_provider( - &self, - local_id: &UnsignedByteField, - remote_id: &UnsignedByteField, - entity_type: EntityType, - ) -> Box; + fn get_check_timer_provider(&self, timer_context: TimerContext) -> Box; } /// Simple implementation of the [CheckTimerCreator] trait assuming a standard runtime.