diff --git a/satrs-core/src/cfdp/dest.rs b/satrs-core/src/cfdp/dest.rs index 1487467..5bbedd7 100644 --- a/satrs-core/src/cfdp/dest.rs +++ b/satrs-core/src/cfdp/dest.rs @@ -1,17 +1,17 @@ use core::str::{from_utf8, Utf8Error}; use std::{ fs::{metadata, File}, - io::{BufReader, Read, Seek, SeekFrom, Write}, + io::{Seek, SeekFrom, Write}, path::{Path, PathBuf}, }; use crate::cfdp::user::TransactionFinishedParams; use super::{ - filestore::{NativeFilestore, VirtualFilestore}, + filestore::{FilestoreError, VirtualFilestore}, user::{CfdpUser, MetadataReceivedParams}, - CheckTimerCreator, DefaultFaultHandler, PacketInfo, PacketTarget, RemoteEntityConfig, - RemoteEntityConfigProvider, State, TransactionId, TransactionStep, CRC_32, + CheckTimerCreator, LocalEntityConfig, PacketInfo, PacketTarget, RemoteEntityConfig, + RemoteEntityConfigProvider, State, TransactionId, TransactionStep, }; use alloc::boxed::Box; use smallvec::SmallVec; @@ -32,13 +32,12 @@ use spacepackets::{ use thiserror::Error; pub struct DestinationHandler { - id: UnsignedByteField, + local_cfg: LocalEntityConfig, step: TransactionStep, state: State, tparams: TransactionParams, packets_to_send_ctx: PacketsToSendContext, vfs: Box, - default_fault_handler: DefaultFaultHandler, remote_cfg_table: Box, check_timer_creator: Box, } @@ -61,7 +60,7 @@ struct FileProperties { #[derive(Debug)] struct TransferState { transaction_id: Option, - progress: usize, + progress: u64, condition_code: ConditionCode, delivery_code: DeliveryCode, file_status: FileStatus, @@ -111,8 +110,8 @@ impl Default for FileProperties { } impl TransactionParams { - fn file_size(&self) -> usize { - self.tstate.metadata_params.file_size as usize + fn file_size(&self) -> u64 { + self.tstate.metadata_params.file_size } fn metadata_params(&self) -> &MetadataGenericParams { @@ -171,20 +170,18 @@ pub enum DestError { impl DestinationHandler { pub fn new( - entity_id: impl Into, + local_cfg: LocalEntityConfig, vfs: Box, - default_fault_handler: DefaultFaultHandler, remote_cfg_table: Box, check_timer_creator: Box, ) -> Self { Self { - id: entity_id.into(), + local_cfg, step: TransactionStep::Idle, state: State::Idle, tparams: Default::default(), packets_to_send_ctx: Default::default(), vfs, - default_fault_handler, remote_cfg_table, check_timer_creator, } @@ -196,7 +193,7 @@ impl DestinationHandler { packet_to_insert: Option<&PacketInfo>, ) -> Result<(), DestError> { if let Some(packet) = packet_to_insert { - self.insert_packet(packet)?; + self.insert_packet(cfdp_user, packet)?; } match self.state { State::Idle => todo!(), @@ -205,7 +202,11 @@ impl DestinationHandler { } } - fn insert_packet(&mut self, packet_info: &PacketInfo) -> Result<(), DestError> { + fn insert_packet( + &mut self, + cfdp_user: &mut impl CfdpUser, + packet_info: &PacketInfo, + ) -> Result<(), DestError> { if packet_info.target() != PacketTarget::DestEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. @@ -219,6 +220,7 @@ impl DestinationHandler { return Err(DestError::DirectiveExpected); } self.handle_file_directive( + cfdp_user, packet_info.pdu_directive.unwrap(), packet_info.raw_packet, ) @@ -252,7 +254,7 @@ impl DestinationHandler { ) } else { // TODO: Are there cases where this ID is actually the source entity ID? - let entity_id = EntityIdTlv::new(self.id); + let entity_id = EntityIdTlv::new(self.local_cfg.id); FinishedPduCreator::new_with_error( pdu_header, self.tparams.tstate.condition_code, @@ -276,11 +278,12 @@ impl DestinationHandler { pub fn handle_file_directive( &mut self, + cfdp_user: &mut impl CfdpUser, pdu_directive: FileDirectiveType, raw_packet: &[u8], ) -> Result<(), DestError> { match pdu_directive { - FileDirectiveType::EofPdu => self.handle_eof_pdu(raw_packet)?, + FileDirectiveType::EofPdu => self.handle_eof_pdu(cfdp_user, raw_packet)?, FileDirectiveType::FinishedPdu | FileDirectiveType::NakPdu | FileDirectiveType::KeepAlivePdu => { @@ -359,11 +362,19 @@ impl DestinationHandler { Ok(()) } - pub fn handle_eof_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> { + pub fn handle_eof_pdu( + &mut self, + cfdp_user: &mut impl CfdpUser, + raw_packet: &[u8], + ) -> Result<(), DestError> { if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus { return Err(DestError::WrongStateForFileDataAndEof); } let eof_pdu = EofPdu::from_bytes(raw_packet)?; + if self.local_cfg.indication_cfg.eof_recv { + // Unwrap is okay here, application logic ensures that transaction ID is valid here. + cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap()); + } if eof_pdu.condition_code() == ConditionCode::NoError { self.handle_no_error_eof_pdu(&eof_pdu)?; } else { @@ -372,49 +383,73 @@ impl DestinationHandler { Ok(()) } - fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<(), DestError> { + fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result { + // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size + if self.tparams.tstate.progress > eof_pdu.file_size() + && self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError + { + return Ok(true); + } + + if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged + && !self.checksum_verify(eof_pdu.file_checksum()) + { + self.start_check_limit_handling() + } + + // TODO: Continue here based on Python implementation. + // For a standard disk based file system, which is assumed to be used for now, the file // will always be retained. This might change in the future. + /* self.tparams.tstate.file_status = FileStatus::Retained; - if self.checksum_check(eof_pdu.file_checksum())? { + if self.checksum_verify(eof_pdu.file_checksum())? { self.tparams.tstate.condition_code = ConditionCode::NoError; self.tparams.tstate.delivery_code = DeliveryCode::Complete; } else { self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure; } + */ // TODO: Check progress, and implement transfer completion timer as specified in the // standard. This timer protects against out of order arrival of packets. - if self.tparams.tstate.progress != self.tparams.file_size() {} + // if self.tparams.tstate.progress != self.tparams.file_size() {} if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged { self.step = TransactionStep::TransferCompletion; } else { self.step = TransactionStep::SendingAckPdu; } - Ok(()) + Ok(false) } + fn checksum_verify(&mut self, checksum: u32) -> bool { + match self.vfs.checksum_verify( + self.tparams.file_properties.dest_path_buf.to_str().unwrap(), + self.tparams.metadata_params().checksum_type, + checksum, + &mut self.tparams.cksum_buf, + ) { + Ok(checksum_success) => checksum_success, + Err(e) => match e { + FilestoreError::ChecksumTypeNotImplemented(checksum) => { + self.declare_fault(ConditionCode::UnsupportedChecksumType); + // For this case, the applicable algorithm shall the the null checksum, which + // is always succesful. + true + } + _ => { + self.declare_fault(ConditionCode::FilestoreRejection); + // Treat this equivalent to a failed checksum procedure. + false + } + }, + } + } + + fn start_check_limit_handling(&mut self) {} pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> { todo!(); } - fn checksum_check(&mut self, expected_checksum: u32) -> Result { - // TODO: Implement this using the new virtual filestore abstraction. - let mut digest = CRC_32.digest(); - let file_to_check = File::open(&self.tparams.file_properties.dest_path_buf)?; - let mut buf_reader = BufReader::new(file_to_check); - loop { - let bytes_read = buf_reader.read(&mut self.tparams.cksum_buf)?; - if bytes_read == 0 { - break; - } - digest.update(&self.tparams.cksum_buf[0..bytes_read]); - } - if digest.finalize() == expected_checksum { - return Ok(true); - } - Ok(false) - } - fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { if self.step == TransactionStep::TransactionStart { self.transaction_start(cfdp_user)?; @@ -472,7 +507,7 @@ impl DestinationHandler { let metadata_recvd_params = MetadataReceivedParams { id, source_id, - file_size: self.tparams.tstate.metadata_params.file_size, + file_size: self.tparams.file_size(), src_file_name: src_name, dest_file_name: dest_name, msgs_to_user: &msgs_to_user[..num_msgs_to_user], @@ -528,10 +563,28 @@ impl DestinationHandler { } fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode { - todo!("implement this. requires cached fault handler abstraction"); - FaultHandlerCode::IgnoreError + let fh_code = self + .local_cfg + .default_fault_handler + .get_fault_handler(condition_code); + match fh_code { + FaultHandlerCode::NoticeOfCancellation => { + self.notice_of_cancellation(); + } + FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(), + FaultHandlerCode::IgnoreError => todo!(), + FaultHandlerCode::AbandonTransaction => todo!(), + } + self.local_cfg.default_fault_handler.report_fault( + self.tparams.tstate.transaction_id.unwrap(), + condition_code, + self.tparams.tstate.progress, + ) } + fn notice_of_cancellation(&mut self) {} + fn notice_of_suspension(&mut self) {} + fn reset(&mut self) { self.step = TransactionStep::Idle; self.state = State::Idle; @@ -569,8 +622,9 @@ mod tests { }; use crate::cfdp::{ - CheckTimer, CheckTimerCreator, RemoteEntityConfig, StdRemoteEntityConfigProvider, - UserFaultHandler, + filestore::NativeFilestore, CheckTimer, CheckTimerCreator, DefaultFaultHandler, + IndicationConfig, RemoteEntityConfig, StdRemoteEntityConfigProvider, UserFaultHandler, + CRC_32, }; use super::*; @@ -806,10 +860,14 @@ mod tests { fn default_dest_handler() -> DestinationHandler { let test_fault_handler = TestFaultHandler::default(); + let local_entity_cfg = LocalEntityConfig { + id: REMOTE_ID.into(), + indication_cfg: IndicationConfig::default(), + default_fault_handler: DefaultFaultHandler::new(Box::new(test_fault_handler)), + }; DestinationHandler::new( - REMOTE_ID, + local_entity_cfg, Box::new(NativeFilestore::default()), - DefaultFaultHandler::new(Box::new(test_fault_handler)), Box::new(basic_remote_cfg_table()), Box::new(TestCheckTimerCreator::new(2, 2)), ) @@ -936,10 +994,6 @@ mod tests { .write_to_bytes(&mut buf) .expect("writing file data PDU failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); - let result = dest_handler.insert_packet(&packet_info); - if let Err(e) = result { - panic!("destination handler packet insertion error: {e}"); - } let result = dest_handler.state_machine(&mut test_user, Some(&packet_info)); assert!(result.is_ok()); @@ -1003,10 +1057,6 @@ mod tests { .write_to_bytes(&mut buf) .expect("writing file data PDU failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); - let result = dest_handler.insert_packet(&packet_info); - if let Err(e) = result { - panic!("destination handler packet insertion error: {e}"); - } let result = dest_handler.state_machine(&mut test_user, Some(&packet_info)); assert!(result.is_ok()); @@ -1021,10 +1071,6 @@ mod tests { .write_to_bytes(&mut buf) .expect("writing file data PDU failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); - let result = dest_handler.insert_packet(&packet_info); - if let Err(e) = result { - panic!("destination handler packet insertion error: {e}"); - } let result = dest_handler.state_machine(&mut test_user, Some(&packet_info)); assert!(result.is_ok()); diff --git a/satrs-core/src/cfdp/mod.rs b/satrs-core/src/cfdp/mod.rs index 82c6f64..436cdd4 100644 --- a/satrs-core/src/cfdp/mod.rs +++ b/satrs-core/src/cfdp/mod.rs @@ -206,6 +206,18 @@ impl DefaultFaultHandler { }) } + pub fn set_fault_handler( + &mut self, + condition_code: ConditionCode, + fault_handler: FaultHandlerCode, + ) { + let array_idx = Self::condition_code_to_array_index(condition_code); + if array_idx.is_none() { + return; + } + self.handler_array[array_idx.unwrap()] = fault_handler; + } + pub fn new(user_fault_handler: Box) -> Self { let mut init_array = [FaultHandlerCode::NoticeOfCancellation; 10]; init_array @@ -219,7 +231,15 @@ impl DefaultFaultHandler { } } - fn report_fault( + pub fn get_fault_handler(&self, condition_code: ConditionCode) -> FaultHandlerCode { + let array_idx = Self::condition_code_to_array_index(condition_code); + if array_idx.is_none() { + return FaultHandlerCode::IgnoreError; + } + self.handler_array[array_idx.unwrap()] + } + + pub fn report_fault( &mut self, transaction_id: TransactionId, condition: ConditionCode, @@ -258,6 +278,34 @@ impl DefaultFaultHandler { } } +pub struct IndicationConfig { + pub eof_sent: bool, + pub eof_recv: bool, + pub file_segment_recv: bool, + pub transaction_finished: bool, + pub suspended: bool, + pub resumed: bool, +} + +impl Default for IndicationConfig { + fn default() -> Self { + Self { + eof_sent: true, + eof_recv: true, + file_segment_recv: true, + transaction_finished: true, + suspended: true, + resumed: true, + } + } +} + +pub struct LocalEntityConfig { + pub id: UnsignedByteField, + pub indication_cfg: IndicationConfig, + pub default_fault_handler: DefaultFaultHandler, +} + #[derive(Debug, Eq, Copy, Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TransactionId {