diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 5eb0421..4b75576 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -17,10 +17,16 @@ delegate = ">0.7, <=0.10" paste = "1" embed-doc-image = "0.1" +[dependencies.smallvec] +version = "1" + [dependencies.num_enum] version = ">0.5, <=0.7" default-features = false +[dependencies.crc] +version = "3" + [dependencies.dyn-clone] version = "1" optional = true @@ -83,6 +89,7 @@ serde = "1" zerocopy = "0.7" once_cell = "1.13" serde_json = "1" +rand = "0.8" [dev-dependencies.postcard] version = "1" diff --git a/satrs-core/src/cfdp/dest.rs b/satrs-core/src/cfdp/dest.rs new file mode 100644 index 0000000..b66365a --- /dev/null +++ b/satrs-core/src/cfdp/dest.rs @@ -0,0 +1,871 @@ +use core::str::{from_utf8, Utf8Error}; +use std::{ + fs::{metadata, File}, + io::{BufReader, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, +}; + +use crate::cfdp::user::TransactionFinishedParams; + +use super::{ + user::{CfdpUser, MetadataReceivedParams}, + PacketInfo, PacketTarget, State, TransactionId, TransactionStep, CRC_32, +}; +use smallvec::SmallVec; +use spacepackets::{ + cfdp::{ + pdu::{ + eof::EofPdu, + file_data::FileDataPdu, + finished::{DeliveryCode, FileStatus, FinishedPdu}, + metadata::{MetadataGenericParams, MetadataPdu}, + CommonPduConfig, FileDirectiveType, PduError, PduHeader, + }, + tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, TlvType}, + ConditionCode, PduType, TransmissionMode, + }, + util::UnsignedByteField, +}; +use thiserror::Error; + +pub struct DestinationHandler { + id: UnsignedByteField, + step: TransactionStep, + state: State, + tparams: TransactionParams, + packets_to_send_ctx: PacketsToSendContext, +} + +#[derive(Debug, Default)] +struct PacketsToSendContext { + packet_available: bool, + directive: Option, +} + +#[derive(Debug)] +struct FileProperties { + src_file_name: [u8; u8::MAX as usize], + src_file_name_len: usize, + dest_file_name: [u8; u8::MAX as usize], + dest_file_name_len: usize, + dest_path_buf: PathBuf, +} + +#[derive(Debug)] +struct TransferState { + transaction_id: Option, + progress: usize, + condition_code: ConditionCode, + delivery_code: DeliveryCode, + file_status: FileStatus, + metadata_params: MetadataGenericParams, +} + +impl Default for TransferState { + fn default() -> Self { + Self { + transaction_id: None, + progress: Default::default(), + condition_code: ConditionCode::NoError, + delivery_code: DeliveryCode::Incomplete, + file_status: FileStatus::Unreported, + metadata_params: Default::default(), + } + } +} +#[derive(Debug)] +struct TransactionParams { + tstate: TransferState, + pdu_conf: CommonPduConfig, + file_properties: FileProperties, + cksum_buf: [u8; 1024], + msgs_to_user_size: usize, + msgs_to_user_buf: [u8; 1024], +} + +impl Default for FileProperties { + fn default() -> Self { + Self { + src_file_name: [0; u8::MAX as usize], + src_file_name_len: Default::default(), + dest_file_name: [0; u8::MAX as usize], + dest_file_name_len: Default::default(), + dest_path_buf: Default::default(), + } + } +} + +impl TransactionParams { + fn file_size(&self) -> usize { + self.tstate.metadata_params.file_size as usize + } + + fn metadata_params(&self) -> &MetadataGenericParams { + &self.tstate.metadata_params + } +} + +impl Default for TransactionParams { + fn default() -> Self { + Self { + pdu_conf: Default::default(), + cksum_buf: [0; 1024], + msgs_to_user_size: 0, + msgs_to_user_buf: [0; 1024], + tstate: Default::default(), + file_properties: Default::default(), + } + } +} + +impl TransactionParams { + fn reset(&mut self) { + self.tstate.condition_code = ConditionCode::NoError; + self.tstate.delivery_code = DeliveryCode::Incomplete; + } +} + +#[derive(Debug, Error)] +pub enum DestError { + /// File directive expected, but none specified + #[error("expected file directive")] + DirectiveExpected, + #[error("can not process packet type {0:?}")] + CantProcessPacketType(FileDirectiveType), + #[error("can not process file data PDUs in current state")] + WrongStateForFileDataAndEof, + // Received new metadata PDU while being already being busy with a file transfer. + #[error("busy with transfer")] + RecvdMetadataButIsBusy, + #[error("empty source file field")] + EmptySrcFileField, + #[error("empty dest file field")] + EmptyDestFileField, + #[error("pdu error {0}")] + Pdu(#[from] PduError), + #[error("io error {0}")] + Io(#[from] std::io::Error), + #[error("path conversion error {0}")] + PathConversion(#[from] Utf8Error), + #[error("error building dest path from source file name and dest folder")] + PathConcatError, +} + +impl DestinationHandler { + pub fn new(id: impl Into) -> Self { + Self { + id: id.into(), + step: TransactionStep::Idle, + state: State::Idle, + tparams: Default::default(), + packets_to_send_ctx: Default::default(), + } + } + + pub fn state_machine(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + match self.state { + State::Idle => todo!(), + State::BusyClass1Nacked => self.fsm_nacked(cfdp_user), + State::BusyClass2Acked => todo!("acknowledged mode not implemented yet"), + } + } + + pub fn insert_packet(&mut self, packet_info: &PacketInfo) -> Result<(), DestError> { + if packet_info.target() != PacketTarget::DestEntity { + // Unwrap is okay here, a PacketInfo for a file data PDU should always have the + // destination as the target. + return Err(DestError::CantProcessPacketType( + packet_info.pdu_directive().unwrap(), + )); + } + match packet_info.pdu_type { + PduType::FileDirective => { + if packet_info.pdu_directive.is_none() { + return Err(DestError::DirectiveExpected); + } + self.handle_file_directive( + packet_info.pdu_directive.unwrap(), + packet_info.raw_packet, + ) + } + PduType::FileData => self.handle_file_data(packet_info.raw_packet), + } + } + + pub fn packet_to_send_ready(&self) -> bool { + self.packets_to_send_ctx.packet_available + } + + pub fn get_next_packet_to_send( + &self, + buf: &mut [u8], + ) -> Result, DestError> { + if !self.packet_to_send_ready() { + return Ok(None); + } + let directive = self.packets_to_send_ctx.directive.unwrap(); + let written_size = match directive { + FileDirectiveType::FinishedPdu => { + let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0); + let finished_pdu = if self.tparams.tstate.condition_code == ConditionCode::NoError + || self.tparams.tstate.condition_code == ConditionCode::UnsupportedChecksumType + { + FinishedPdu::new_default( + pdu_header, + self.tparams.tstate.delivery_code, + self.tparams.tstate.file_status, + ) + } else { + // TODO: Are there cases where this ID is actually the source entity ID? + let entity_id = EntityIdTlv::new(self.id); + FinishedPdu::new_with_error( + pdu_header, + self.tparams.tstate.condition_code, + self.tparams.tstate.delivery_code, + self.tparams.tstate.file_status, + entity_id, + ) + }; + finished_pdu.write_to_bytes(buf)? + } + FileDirectiveType::AckPdu => todo!(), + FileDirectiveType::NakPdu => todo!(), + FileDirectiveType::KeepAlivePdu => todo!(), + _ => { + // This should never happen and is considered an internal impl error + panic!("invalid file directive {directive:?} for dest handler send packet"); + } + }; + Ok(Some((directive, written_size))) + } + + pub fn handle_file_directive( + &mut self, + pdu_directive: FileDirectiveType, + raw_packet: &[u8], + ) -> Result<(), DestError> { + match pdu_directive { + FileDirectiveType::EofPdu => self.handle_eof_pdu(raw_packet)?, + FileDirectiveType::FinishedPdu + | FileDirectiveType::NakPdu + | FileDirectiveType::KeepAlivePdu => { + return Err(DestError::CantProcessPacketType(pdu_directive)); + } + FileDirectiveType::AckPdu => { + todo!( + "check whether ACK pdu handling is applicable by checking the acked directive field" + ) + } + FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?, + FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?, + }; + Ok(()) + } + + pub fn handle_metadata_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> { + if self.state != State::Idle { + return Err(DestError::RecvdMetadataButIsBusy); + } + let metadata_pdu = MetadataPdu::from_bytes(raw_packet)?; + self.tparams.reset(); + self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params(); + let src_name = metadata_pdu.src_file_name(); + if src_name.is_empty() { + return Err(DestError::EmptySrcFileField); + } + self.tparams.file_properties.src_file_name[..src_name.len_value()] + .copy_from_slice(src_name.value()); + self.tparams.file_properties.src_file_name_len = src_name.len_value(); + let dest_name = metadata_pdu.dest_file_name(); + if dest_name.is_empty() { + return Err(DestError::EmptyDestFileField); + } + self.tparams.file_properties.dest_file_name[..dest_name.len_value()] + .copy_from_slice(dest_name.value()); + self.tparams.file_properties.dest_file_name_len = dest_name.len_value(); + self.tparams.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf(); + self.tparams.msgs_to_user_size = 0; + if metadata_pdu.options().is_some() { + for option_tlv in metadata_pdu.options_iter().unwrap() { + if option_tlv.is_standard_tlv() + && option_tlv.tlv_type().unwrap() == TlvType::MsgToUser + { + self.tparams + .msgs_to_user_buf + .copy_from_slice(option_tlv.raw_data().unwrap()); + self.tparams.msgs_to_user_size += option_tlv.len_full(); + } + } + } + if self.tparams.pdu_conf.trans_mode == TransmissionMode::Unacknowledged { + self.state = State::BusyClass1Nacked; + } else { + self.state = State::BusyClass2Acked; + } + self.step = TransactionStep::TransactionStart; + Ok(()) + } + + pub fn handle_file_data(&mut self, raw_packet: &[u8]) -> Result<(), DestError> { + if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus { + return Err(DestError::WrongStateForFileDataAndEof); + } + let fd_pdu = FileDataPdu::from_bytes(raw_packet)?; + let mut dest_file = File::options() + .write(true) + .open(&self.tparams.file_properties.dest_path_buf)?; + dest_file.seek(SeekFrom::Start(fd_pdu.offset()))?; + dest_file.write_all(fd_pdu.file_data())?; + Ok(()) + } + + #[allow(clippy::needless_if)] + pub fn handle_eof_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> { + if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus { + return Err(DestError::WrongStateForFileDataAndEof); + } + let eof_pdu = EofPdu::from_bytes(raw_packet)?; + let checksum = eof_pdu.file_checksum(); + // For a standard disk based file system, which is assumed to be used for now, the file + // will always be retained. This might change in the future. + self.tparams.tstate.file_status = FileStatus::Retained; + if self.checksum_check(checksum)? { + self.tparams.tstate.condition_code = ConditionCode::NoError; + self.tparams.tstate.delivery_code = DeliveryCode::Complete; + } else { + self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure; + } + // TODO: Check progress, and implement transfer completion timer as specified in the + // standard. This timer protects against out of order arrival of packets. + if self.tparams.tstate.progress != self.tparams.file_size() {} + if self.state == State::BusyClass1Nacked { + self.step = TransactionStep::TransferCompletion; + } else { + self.step = TransactionStep::SendingAckPdu; + } + Ok(()) + } + + pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> { + todo!(); + } + + fn checksum_check(&mut self, expected_checksum: u32) -> Result { + let mut digest = CRC_32.digest(); + let file_to_check = File::open(&self.tparams.file_properties.dest_path_buf)?; + let mut buf_reader = BufReader::new(file_to_check); + loop { + let bytes_read = buf_reader.read(&mut self.tparams.cksum_buf)?; + if bytes_read == 0 { + break; + } + digest.update(&self.tparams.cksum_buf[0..bytes_read]); + } + if digest.finalize() == expected_checksum { + return Ok(true); + } + Ok(false) + } + + fn fsm_nacked(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + if self.step == TransactionStep::TransactionStart { + self.transaction_start(cfdp_user)?; + } + if self.step == TransactionStep::TransferCompletion { + self.transfer_completion(cfdp_user)?; + } + if self.step == TransactionStep::SendingAckPdu { + todo!("no support for acknowledged mode yet"); + } + if self.step == TransactionStep::SendingFinishedPdu { + self.reset(); + } + Ok(()) + } + + /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. + pub fn step(&self) -> TransactionStep { + self.step + } + + /// Get the step, which denotes whether the CFDP handler is active, and which CFDP class + /// is used if it is active. + pub fn state(&self) -> State { + self.state + } + + fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + let dest_name = from_utf8( + &self.tparams.file_properties.dest_file_name + [..self.tparams.file_properties.dest_file_name_len], + )?; + let dest_path = Path::new(dest_name); + self.tparams.file_properties.dest_path_buf = dest_path.to_path_buf(); + let source_id = self.tparams.pdu_conf.source_id(); + let id = TransactionId::new(source_id, self.tparams.pdu_conf.transaction_seq_num); + let src_name = from_utf8( + &self.tparams.file_properties.src_file_name + [0..self.tparams.file_properties.src_file_name_len], + )?; + let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new(); + let mut num_msgs_to_user = 0; + if self.tparams.msgs_to_user_size > 0 { + let mut index = 0; + while index < self.tparams.msgs_to_user_size { + // This should never panic as the validity of the options was checked beforehand. + let msgs_to_user_tlv = + MsgToUserTlv::from_bytes(&self.tparams.msgs_to_user_buf[index..]) + .expect("message to user creation failed unexpectedly"); + msgs_to_user.push(msgs_to_user_tlv); + index += msgs_to_user_tlv.len_full(); + num_msgs_to_user += 1; + } + } + let metadata_recvd_params = MetadataReceivedParams { + id, + source_id, + file_size: self.tparams.tstate.metadata_params.file_size, + src_file_name: src_name, + dest_file_name: dest_name, + msgs_to_user: &msgs_to_user[..num_msgs_to_user], + }; + self.tparams.tstate.transaction_id = Some(id); + cfdp_user.metadata_recvd_indication(&metadata_recvd_params); + + if dest_path.exists() { + let dest_metadata = metadata(dest_path)?; + if dest_metadata.is_dir() { + // Create new destination path by concatenating the last part of the source source + // name and the destination folder. For example, for a source file of /tmp/hello.txt + // and a destination name of /home/test, the resulting file name should be + // /home/test/hello.txt + let source_path = Path::new(from_utf8( + &self.tparams.file_properties.src_file_name + [..self.tparams.file_properties.src_file_name_len], + )?); + + let source_name = source_path.file_name(); + if source_name.is_none() { + return Err(DestError::PathConcatError); + } + let source_name = source_name.unwrap(); + self.tparams.file_properties.dest_path_buf.push(source_name); + } + } + // This function does exactly what we require: Create a new file if it does not exist yet + // and trucate an existing one. + File::create(&self.tparams.file_properties.dest_path_buf)?; + self.step = TransactionStep::ReceivingFileDataPdus; + Ok(()) + } + + fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { + let transaction_finished_params = TransactionFinishedParams { + id: self.tparams.tstate.transaction_id.unwrap(), + condition_code: self.tparams.tstate.condition_code, + delivery_code: self.tparams.tstate.delivery_code, + file_status: self.tparams.tstate.file_status, + }; + cfdp_user.transaction_finished_indication(&transaction_finished_params); + // This function should never be called with metadata parameters not set + if self.tparams.metadata_params().closure_requested { + self.prepare_finished_pdu()?; + self.step = TransactionStep::SendingFinishedPdu; + } else { + self.reset(); + self.state = State::Idle; + self.step = TransactionStep::Idle; + } + Ok(()) + } + + fn reset(&mut self) { + self.step = TransactionStep::Idle; + self.state = State::Idle; + self.packets_to_send_ctx.packet_available = false; + self.tparams.reset(); + } + + fn prepare_finished_pdu(&mut self) -> Result<(), DestError> { + self.packets_to_send_ctx.packet_available = true; + self.packets_to_send_ctx.directive = Some(FileDirectiveType::FinishedPdu); + self.step = TransactionStep::SendingFinishedPdu; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use core::sync::atomic::{AtomicU8, Ordering}; + #[allow(unused_imports)] + use std::println; + use std::{env::temp_dir, fs}; + + use alloc::{format, string::String}; + use rand::Rng; + use spacepackets::{ + cfdp::{lv::Lv, ChecksumType}, + util::{UbfU16, UnsignedByteFieldU16}, + }; + + use super::*; + + const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); + const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); + + const SRC_NAME: &str = "__cfdp__source-file"; + const DEST_NAME: &str = "__cfdp__dest-file"; + + static ATOMIC_COUNTER: AtomicU8 = AtomicU8::new(0); + + #[derive(Default)] + struct TestCfdpUser { + next_expected_seq_num: u64, + expected_full_src_name: String, + expected_full_dest_name: String, + expected_file_size: usize, + } + + impl TestCfdpUser { + fn generic_id_check(&self, id: &crate::cfdp::TransactionId) { + assert_eq!(id.source_id, LOCAL_ID.into()); + assert_eq!(id.seq_num().value(), self.next_expected_seq_num); + } + } + + impl CfdpUser for TestCfdpUser { + fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) { + self.generic_id_check(id); + } + + fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) { + self.generic_id_check(id); + } + + fn transaction_finished_indication( + &mut self, + finished_params: &crate::cfdp::user::TransactionFinishedParams, + ) { + self.generic_id_check(&finished_params.id); + } + + fn metadata_recvd_indication( + &mut self, + md_recvd_params: &crate::cfdp::user::MetadataReceivedParams, + ) { + self.generic_id_check(&md_recvd_params.id); + assert_eq!( + String::from(md_recvd_params.src_file_name), + self.expected_full_src_name + ); + assert_eq!( + String::from(md_recvd_params.dest_file_name), + self.expected_full_dest_name + ); + assert_eq!(md_recvd_params.msgs_to_user.len(), 0); + assert_eq!(md_recvd_params.source_id, LOCAL_ID.into()); + assert_eq!(md_recvd_params.file_size as usize, self.expected_file_size); + } + + fn file_segment_recvd_indication( + &mut self, + _segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams, + ) { + } + + fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {} + + fn suspended_indication( + &mut self, + _id: &crate::cfdp::TransactionId, + _condition_code: ConditionCode, + ) { + panic!("unexpected suspended indication"); + } + + fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {} + + fn fault_indication( + &mut self, + _id: &crate::cfdp::TransactionId, + _condition_code: ConditionCode, + _progress: u64, + ) { + panic!("unexpected fault indication"); + } + + fn abandoned_indication( + &mut self, + _id: &crate::cfdp::TransactionId, + _condition_code: ConditionCode, + _progress: u64, + ) { + panic!("unexpected abandoned indication"); + } + + fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) { + self.generic_id_check(id); + } + } + + fn init_check(handler: &DestinationHandler) { + assert_eq!(handler.state(), State::Idle); + assert_eq!(handler.step(), TransactionStep::Idle); + } + + fn init_full_filenames() -> (PathBuf, PathBuf) { + let mut file_path = temp_dir(); + let mut src_path = file_path.clone(); + // Atomic counter used to allow concurrent tests. + let unique_counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed); + // Create unique test filenames. + let src_name_unique = format!("{SRC_NAME}{}.txt", unique_counter); + let dest_name_unique = format!("{DEST_NAME}{}.txt", unique_counter); + src_path.push(src_name_unique); + file_path.push(dest_name_unique); + (src_path, file_path) + } + + #[test] + fn test_basic() { + let dest_handler = DestinationHandler::new(REMOTE_ID); + init_check(&dest_handler); + } + + fn create_pdu_header(seq_num: impl Into) -> PduHeader { + let mut pdu_conf = + CommonPduConfig::new_with_byte_fields(LOCAL_ID, REMOTE_ID, seq_num).unwrap(); + pdu_conf.trans_mode = TransmissionMode::Unacknowledged; + PduHeader::new_no_file_data(pdu_conf, 0) + } + + fn create_metadata_pdu<'filename>( + pdu_header: &PduHeader, + src_name: &'filename Path, + dest_name: &'filename Path, + file_size: u64, + ) -> MetadataPdu<'filename, 'filename, 'static> { + let metadata_params = MetadataGenericParams::new(false, ChecksumType::Crc32, file_size); + MetadataPdu::new( + *pdu_header, + metadata_params, + Lv::new_from_str(src_name.as_os_str().to_str().unwrap()).unwrap(), + Lv::new_from_str(dest_name.as_os_str().to_str().unwrap()).unwrap(), + None, + ) + } + + fn insert_metadata_pdu( + metadata_pdu: &MetadataPdu, + buf: &mut [u8], + dest_handler: &mut DestinationHandler, + ) { + let written_len = metadata_pdu + .write_to_bytes(buf) + .expect("writing metadata PDU failed"); + let packet_info = + PacketInfo::new(&buf[..written_len]).expect("generating packet info failed"); + let insert_result = dest_handler.insert_packet(&packet_info); + if let Err(e) = insert_result { + panic!("insert result error: {e}"); + } + } + + fn insert_eof_pdu( + file_data: &[u8], + pdu_header: &PduHeader, + buf: &mut [u8], + dest_handler: &mut DestinationHandler, + ) { + let mut digest = CRC_32.digest(); + digest.update(file_data); + let crc32 = digest.finalize(); + let eof_pdu = EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64); + let result = eof_pdu.write_to_bytes(buf); + assert!(result.is_ok()); + let packet_info = PacketInfo::new(&buf).expect("generating packet info failed"); + let result = dest_handler.insert_packet(&packet_info); + assert!(result.is_ok()); + } + + #[test] + fn test_empty_file_transfer() { + let (src_name, dest_name) = init_full_filenames(); + assert!(!Path::exists(&dest_name)); + let mut buf: [u8; 512] = [0; 512]; + let mut test_user = TestCfdpUser { + next_expected_seq_num: 0, + expected_full_src_name: src_name.to_string_lossy().into(), + expected_full_dest_name: dest_name.to_string_lossy().into(), + expected_file_size: 0, + }; + // We treat the destination handler like it is a remote entity. + let mut dest_handler = DestinationHandler::new(REMOTE_ID); + init_check(&dest_handler); + + let seq_num = UbfU16::new(0); + let pdu_header = create_pdu_header(seq_num); + let metadata_pdu = + create_metadata_pdu(&pdu_header, src_name.as_path(), dest_name.as_path(), 0); + insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler); + let result = dest_handler.state_machine(&mut test_user); + if let Err(e) = result { + panic!("dest handler fsm error: {e}"); + } + assert_ne!(dest_handler.state(), State::Idle); + assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus); + + insert_eof_pdu(&[], &pdu_header, &mut buf, &mut dest_handler); + let result = dest_handler.state_machine(&mut test_user); + assert!(result.is_ok()); + assert_eq!(dest_handler.state(), State::Idle); + assert_eq!(dest_handler.step(), TransactionStep::Idle); + assert!(Path::exists(&dest_name)); + let read_content = fs::read(&dest_name).expect("reading back string failed"); + assert_eq!(read_content.len(), 0); + assert!(fs::remove_file(dest_name).is_ok()); + } + + #[test] + fn test_small_file_transfer() { + let (src_name, dest_name) = init_full_filenames(); + assert!(!Path::exists(&dest_name)); + let file_data_str = "Hello World!"; + let file_data = file_data_str.as_bytes(); + let mut buf: [u8; 512] = [0; 512]; + let mut test_user = TestCfdpUser { + next_expected_seq_num: 0, + expected_full_src_name: src_name.to_string_lossy().into(), + expected_full_dest_name: dest_name.to_string_lossy().into(), + expected_file_size: file_data.len(), + }; + // We treat the destination handler like it is a remote entity. + let mut dest_handler = DestinationHandler::new(REMOTE_ID); + init_check(&dest_handler); + + let seq_num = UbfU16::new(0); + let pdu_header = create_pdu_header(seq_num); + let metadata_pdu = create_metadata_pdu( + &pdu_header, + src_name.as_path(), + dest_name.as_path(), + file_data.len() as u64, + ); + insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler); + let result = dest_handler.state_machine(&mut test_user); + if let Err(e) = result { + panic!("dest handler fsm error: {e}"); + } + assert_ne!(dest_handler.state(), State::Idle); + assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus); + + let offset = 0; + let filedata_pdu = FileDataPdu::new_no_seg_metadata(pdu_header, offset, file_data); + filedata_pdu + .write_to_bytes(&mut buf) + .expect("writing file data PDU failed"); + let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); + let result = dest_handler.insert_packet(&packet_info); + if let Err(e) = result { + panic!("destination handler packet insertion error: {e}"); + } + let result = dest_handler.state_machine(&mut test_user); + assert!(result.is_ok()); + + insert_eof_pdu(file_data, &pdu_header, &mut buf, &mut dest_handler); + let result = dest_handler.state_machine(&mut test_user); + assert!(result.is_ok()); + assert_eq!(dest_handler.state(), State::Idle); + assert_eq!(dest_handler.step(), TransactionStep::Idle); + + assert!(Path::exists(&dest_name)); + let read_content = fs::read_to_string(&dest_name).expect("reading back string failed"); + assert_eq!(read_content, file_data_str); + assert!(fs::remove_file(dest_name).is_ok()); + } + + #[test] + fn test_segmented_file_transfer() { + let (src_name, dest_name) = init_full_filenames(); + assert!(!Path::exists(&dest_name)); + let mut rng = rand::thread_rng(); + let mut random_data = [0u8; 512]; + rng.fill(&mut random_data); + let mut buf: [u8; 512] = [0; 512]; + let mut test_user = TestCfdpUser { + next_expected_seq_num: 0, + expected_full_src_name: src_name.to_string_lossy().into(), + expected_full_dest_name: dest_name.to_string_lossy().into(), + expected_file_size: random_data.len(), + }; + + // We treat the destination handler like it is a remote entity. + let mut dest_handler = DestinationHandler::new(REMOTE_ID); + init_check(&dest_handler); + + let seq_num = UbfU16::new(0); + let pdu_header = create_pdu_header(seq_num); + let metadata_pdu = create_metadata_pdu( + &pdu_header, + src_name.as_path(), + dest_name.as_path(), + random_data.len() as u64, + ); + insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler); + let result = dest_handler.state_machine(&mut test_user); + if let Err(e) = result { + panic!("dest handler fsm error: {e}"); + } + assert_ne!(dest_handler.state(), State::Idle); + assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus); + + // First file data PDU + let mut offset: usize = 0; + let segment_len = 256; + let filedata_pdu = FileDataPdu::new_no_seg_metadata( + pdu_header, + offset as u64, + &random_data[0..segment_len], + ); + filedata_pdu + .write_to_bytes(&mut buf) + .expect("writing file data PDU failed"); + let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); + let result = dest_handler.insert_packet(&packet_info); + if let Err(e) = result { + panic!("destination handler packet insertion error: {e}"); + } + let result = dest_handler.state_machine(&mut test_user); + assert!(result.is_ok()); + + // Second file data PDU + offset += segment_len; + let filedata_pdu = FileDataPdu::new_no_seg_metadata( + pdu_header, + offset as u64, + &random_data[segment_len..], + ); + filedata_pdu + .write_to_bytes(&mut buf) + .expect("writing file data PDU failed"); + let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); + let result = dest_handler.insert_packet(&packet_info); + if let Err(e) = result { + panic!("destination handler packet insertion error: {e}"); + } + let result = dest_handler.state_machine(&mut test_user); + assert!(result.is_ok()); + + insert_eof_pdu(&random_data, &pdu_header, &mut buf, &mut dest_handler); + let result = dest_handler.state_machine(&mut test_user); + assert!(result.is_ok()); + assert_eq!(dest_handler.state(), State::Idle); + assert_eq!(dest_handler.step(), TransactionStep::Idle); + + // Clean up + assert!(Path::exists(&dest_name)); + let read_content = fs::read(&dest_name).expect("reading back string failed"); + assert_eq!(read_content, random_data); + assert!(fs::remove_file(dest_name).is_ok()); + } +} diff --git a/satrs-core/src/cfdp/mod.rs b/satrs-core/src/cfdp/mod.rs new file mode 100644 index 0000000..dc6e87a --- /dev/null +++ b/satrs-core/src/cfdp/mod.rs @@ -0,0 +1,320 @@ +use crc::{Crc, CRC_32_CKSUM}; +use spacepackets::{ + cfdp::{ + pdu::{FileDirectiveType, PduError, PduHeader}, + ChecksumType, PduType, TransmissionMode, + }, + util::UnsignedByteField, +}; + +#[cfg(feature = "alloc")] +use alloc::boxed::Box; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "std")] +pub mod dest; +#[cfg(feature = "std")] +pub mod source; +pub mod user; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EntityType { + Sending, + Receiving, +} + +/// Generic abstraction for a check timer which has different functionality depending on whether +/// the using entity is the sending entity or the receiving entity for the unacknowledged +/// transmission mode. +/// +/// For the sending entity, this timer determines the expiry period for declaring a check limit +/// fault after sending an EOF PDU with requested closure. This allows a timeout of the transfer. +/// Also see 4.6.3.2 of the CFDP standard. +/// +/// For the receiving entity, this timer determines the expiry period for incrementing a check +/// counter after an EOF PDU is received for an incomplete file transfer. This allows out-of-order +/// reception of file data PDUs and EOF PDUs. Also see 4.6.3.3 of the CFDP standard. +pub trait CheckTimerProvider { + fn has_expired(&self) -> bool; +} + +/// A generic trait which allows CFDP entities to create check timers which are required to +/// implement special procedures in unacknowledged transmission mode, as specified in 4.6.3.2 +/// and 4.6.3.3. The [CheckTimerProvider] provides more information about the purpose of the +/// check timer. +/// +/// This trait also allows the creation of different check timers depending on +/// the ID of the local entity, the ID of the remote entity for a given transaction, and the +/// type of entity. +#[cfg(feature = "alloc")] +pub trait CheckTimerCreator { + fn get_check_timer_provider( + local_id: &UnsignedByteField, + remote_id: &UnsignedByteField, + entity_type: EntityType, + ) -> Box; +} + +/// Simple implementation of the [CheckTimerProvider] trait assuming a standard runtime. +/// It also assumes that a second accuracy of the check timer period is sufficient. +#[cfg(feature = "std")] +pub struct StdCheckTimer { + expiry_time_seconds: u64, + start_time: std::time::Instant, +} + +#[cfg(feature = "std")] +impl StdCheckTimer { + pub fn new(expiry_time_seconds: u64) -> Self { + Self { + expiry_time_seconds, + start_time: std::time::Instant::now(), + } + } +} + +#[cfg(feature = "std")] +impl CheckTimerProvider for StdCheckTimer { + fn has_expired(&self) -> bool { + let elapsed_time = self.start_time.elapsed(); + if elapsed_time.as_secs() > self.expiry_time_seconds { + return true; + } + false + } +} + +#[derive(Debug)] +pub struct RemoteEntityConfig { + pub entity_id: UnsignedByteField, + pub max_file_segment_len: usize, + pub closure_requeted_by_default: bool, + pub crc_on_transmission_by_default: bool, + pub default_transmission_mode: TransmissionMode, + pub default_crc_type: ChecksumType, + pub check_limit: u32, +} + +pub trait RemoteEntityConfigProvider { + fn get_remote_config(&self, remote_id: &UnsignedByteField) -> Option<&RemoteEntityConfig>; +} + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct TransactionId { + source_id: UnsignedByteField, + seq_num: UnsignedByteField, +} + +impl TransactionId { + pub fn new(source_id: UnsignedByteField, seq_num: UnsignedByteField) -> Self { + Self { source_id, seq_num } + } + + pub fn source_id(&self) -> &UnsignedByteField { + &self.source_id + } + + pub fn seq_num(&self) -> &UnsignedByteField { + &self.seq_num + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum TransactionStep { + Idle = 0, + TransactionStart = 1, + ReceivingFileDataPdus = 2, + SendingAckPdu = 3, + TransferCompletion = 4, + SendingFinishedPdu = 5, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum State { + Idle = 0, + BusyClass1Nacked = 2, + BusyClass2Acked = 3, +} + +pub const CRC_32: Crc = Crc::::new(&CRC_32_CKSUM); + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum PacketTarget { + SourceEntity, + DestEntity, +} + +/// This is a helper struct which contains base information about a particular PDU packet. +/// This is also necessary information for CFDP packet routing. For example, some packet types +/// like file data PDUs can only be used by CFDP source entities. +pub struct PacketInfo<'raw_packet> { + pdu_type: PduType, + pdu_directive: Option, + target: PacketTarget, + raw_packet: &'raw_packet [u8], +} + +impl<'raw> PacketInfo<'raw> { + pub fn new(raw_packet: &'raw [u8]) -> Result { + let (pdu_header, header_len) = PduHeader::from_bytes(raw_packet)?; + if pdu_header.pdu_type() == PduType::FileData { + return Ok(Self { + pdu_type: pdu_header.pdu_type(), + pdu_directive: None, + target: PacketTarget::DestEntity, + raw_packet, + }); + } + if pdu_header.pdu_datafield_len() < 1 { + return Err(PduError::FormatError); + } + // Route depending on PDU type and directive type if applicable. Retrieve directive type + // from the raw stream for better performance (with sanity and directive code check). + // The routing is based on section 4.5 of the CFDP standard which specifies the PDU forwarding + // procedure. + let directive = FileDirectiveType::try_from(raw_packet[header_len]).map_err(|_| { + PduError::InvalidDirectiveType { + found: raw_packet[header_len], + expected: None, + } + })?; + let packet_target = match directive { + // Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a. + // the source handler + FileDirectiveType::NakPdu + | FileDirectiveType::FinishedPdu + | FileDirectiveType::KeepAlivePdu => PacketTarget::SourceEntity, + // Section b) of 4.5.3: These PDUs should always be targeted towards the file receiver a.k.a. + // the destination handler + FileDirectiveType::MetadataPdu + | FileDirectiveType::EofPdu + | FileDirectiveType::PromptPdu => PacketTarget::DestEntity, + // Section a): Recipient depends of the type of PDU that is being acknowledged. We can simply + // extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to + // the source handler, for a Finished PDU, it is passed to the destination handler. + FileDirectiveType::AckPdu => { + let acked_directive = FileDirectiveType::try_from(raw_packet[header_len + 1]) + .map_err(|_| PduError::InvalidDirectiveType { + found: raw_packet[header_len], + expected: None, + })?; + if acked_directive == FileDirectiveType::EofPdu { + PacketTarget::SourceEntity + } else if acked_directive == FileDirectiveType::FinishedPdu { + PacketTarget::DestEntity + } else { + // TODO: Maybe a better error? This might be confusing.. + return Err(PduError::InvalidDirectiveType { + found: raw_packet[header_len + 1], + expected: None, + }); + } + } + }; + Ok(Self { + pdu_type: pdu_header.pdu_type(), + pdu_directive: Some(directive), + target: packet_target, + raw_packet, + }) + } + + pub fn pdu_type(&self) -> PduType { + self.pdu_type + } + + pub fn pdu_directive(&self) -> Option { + self.pdu_directive + } + + pub fn target(&self) -> PacketTarget { + self.target + } + + pub fn raw_packet(&self) -> &[u8] { + self.raw_packet + } +} + +#[cfg(test)] +mod tests { + use spacepackets::cfdp::{ + lv::Lv, + pdu::{ + eof::EofPdu, + file_data::FileDataPdu, + metadata::{MetadataGenericParams, MetadataPdu}, + CommonPduConfig, FileDirectiveType, PduHeader, + }, + PduType, + }; + + use crate::cfdp::PacketTarget; + + use super::PacketInfo; + + fn generic_pdu_header() -> PduHeader { + let pdu_conf = CommonPduConfig::default(); + PduHeader::new_no_file_data(pdu_conf, 0) + } + + #[test] + fn test_metadata_pdu_info() { + let mut buf: [u8; 128] = [0; 128]; + let pdu_header = generic_pdu_header(); + let metadata_params = MetadataGenericParams::default(); + let src_file_name = "hello.txt"; + let dest_file_name = "hello-dest.txt"; + let src_lv = Lv::new_from_str(src_file_name).unwrap(); + let dest_lv = Lv::new_from_str(dest_file_name).unwrap(); + let metadata_pdu = MetadataPdu::new(pdu_header, metadata_params, src_lv, dest_lv, None); + metadata_pdu + .write_to_bytes(&mut buf) + .expect("writing metadata PDU failed"); + + let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); + assert_eq!(packet_info.pdu_type(), PduType::FileDirective); + assert!(packet_info.pdu_directive().is_some()); + assert_eq!( + packet_info.pdu_directive().unwrap(), + FileDirectiveType::MetadataPdu + ); + assert_eq!(packet_info.target(), PacketTarget::DestEntity); + } + + #[test] + fn test_filedata_pdu_info() { + let mut buf: [u8; 128] = [0; 128]; + let pdu_header = generic_pdu_header(); + let file_data_pdu = FileDataPdu::new_no_seg_metadata(pdu_header, 0, &[]); + file_data_pdu + .write_to_bytes(&mut buf) + .expect("writing file data PDU failed"); + let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); + assert_eq!(packet_info.pdu_type(), PduType::FileData); + assert!(packet_info.pdu_directive().is_none()); + assert_eq!(packet_info.target(), PacketTarget::DestEntity); + } + + #[test] + fn test_eof_pdu_info() { + let mut buf: [u8; 128] = [0; 128]; + let pdu_header = generic_pdu_header(); + let eof_pdu = EofPdu::new_no_error(pdu_header, 0, 0); + eof_pdu + .write_to_bytes(&mut buf) + .expect("writing file data PDU failed"); + let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); + assert_eq!(packet_info.pdu_type(), PduType::FileDirective); + assert!(packet_info.pdu_directive().is_some()); + assert_eq!( + packet_info.pdu_directive().unwrap(), + FileDirectiveType::EofPdu + ); + } +} diff --git a/satrs-core/src/cfdp/source.rs b/satrs-core/src/cfdp/source.rs new file mode 100644 index 0000000..433f4d2 --- /dev/null +++ b/satrs-core/src/cfdp/source.rs @@ -0,0 +1,15 @@ +#![allow(dead_code)] +use spacepackets::util::UnsignedByteField; + +pub struct SourceHandler { + id: UnsignedByteField, +} + +impl SourceHandler { + pub fn new(id: impl Into) -> Self { + Self { id: id.into() } + } +} + +#[cfg(test)] +mod tests {} diff --git a/satrs-core/src/cfdp/user.rs b/satrs-core/src/cfdp/user.rs new file mode 100644 index 0000000..9047bbd --- /dev/null +++ b/satrs-core/src/cfdp/user.rs @@ -0,0 +1,65 @@ +use spacepackets::{ + cfdp::{ + pdu::{ + file_data::RecordContinuationState, + finished::{DeliveryCode, FileStatus}, + }, + tlv::msg_to_user::MsgToUserTlv, + ConditionCode, + }, + util::UnsignedByteField, +}; + +use super::TransactionId; + +#[derive(Debug, Copy, Clone)] +pub struct TransactionFinishedParams { + pub id: TransactionId, + pub condition_code: ConditionCode, + pub delivery_code: DeliveryCode, + pub file_status: FileStatus, +} + +#[derive(Debug)] +pub struct MetadataReceivedParams<'src_file, 'dest_file, 'msgs_to_user> { + pub id: TransactionId, + pub source_id: UnsignedByteField, + pub file_size: u64, + pub src_file_name: &'src_file str, + pub dest_file_name: &'dest_file str, + pub msgs_to_user: &'msgs_to_user [MsgToUserTlv<'msgs_to_user>], +} + +#[derive(Debug)] +pub struct FileSegmentRecvdParams<'seg_meta> { + pub id: TransactionId, + pub offset: u64, + pub length: usize, + pub rec_cont_state: Option, + pub segment_metadata: Option<&'seg_meta [u8]>, +} + +pub trait CfdpUser { + fn transaction_indication(&mut self, id: &TransactionId); + fn eof_sent_indication(&mut self, id: &TransactionId); + fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams); + fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams); + fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams); + // TODO: The standard does not strictly specify how the report information looks.. + fn report_indication(&mut self, id: &TransactionId); + fn suspended_indication(&mut self, id: &TransactionId, condition_code: ConditionCode); + fn resumed_indication(&mut self, id: &TransactionId, progress: u64); + fn fault_indication( + &mut self, + id: &TransactionId, + condition_code: ConditionCode, + progress: u64, + ); + fn abandoned_indication( + &mut self, + id: &TransactionId, + condition_code: ConditionCode, + progress: u64, + ); + fn eof_recvd_indication(&mut self, id: &TransactionId); +} diff --git a/satrs-core/src/executable.rs b/satrs-core/src/executable.rs index 440f8ee..77ed178 100644 --- a/satrs-core/src/executable.rs +++ b/satrs-core/src/executable.rs @@ -29,7 +29,7 @@ pub trait Executable: Send { fn periodic_op(&mut self, op_code: i32) -> Result; } -/// This function allows executing one task which implements the [Executable][Executable] trait +/// This function allows executing one task which implements the [Executable] trait /// /// # Arguments /// @@ -78,7 +78,7 @@ pub fn exec_sched_single< } /// This function allows executing multiple tasks as long as the tasks implement the -/// [Executable][Executable] trait +/// [Executable] trait /// /// # Arguments /// diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 8ae56e5..22699e0 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -20,6 +20,7 @@ extern crate downcast_rs; #[cfg(any(feature = "std", test))] extern crate std; +pub mod cfdp; pub mod encoding; pub mod error; #[cfg(feature = "alloc")]