From 8a48a624cd6c2105d9d7487464a6347b578af17f Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 11 Jun 2024 09:45:55 +0200 Subject: [PATCH] introduce put request cacher --- satrs-shared/Cargo.toml | 2 +- satrs/Cargo.toml | 2 +- satrs/src/cfdp/request.rs | 430 ++++++++++++++++++++++++++++++++++++-- satrs/src/cfdp/source.rs | 8 +- 4 files changed, 417 insertions(+), 25 deletions(-) diff --git a/satrs-shared/Cargo.toml b/satrs-shared/Cargo.toml index 3a2520b..628ffa1 100644 --- a/satrs-shared/Cargo.toml +++ b/satrs-shared/Cargo.toml @@ -25,7 +25,7 @@ optional = true version = ">0.9" default-features = false git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets" -branch = "cfdp-tlv-owned-type" +branch = "all-cfdp-updates" [features] serde = ["dep:serde", "spacepackets/serde"] diff --git a/satrs/Cargo.toml b/satrs/Cargo.toml index 5c9fd0d..3890dbd 100644 --- a/satrs/Cargo.toml +++ b/satrs/Cargo.toml @@ -31,7 +31,7 @@ default-features = false version = "0.12" default-features = false git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets" -branch = "cfdp-tlv-owned-type" +branch = "all-cfdp-updates" [dependencies.cobs] git = "https://github.com/robamu/cobs.rs.git" diff --git a/satrs/src/cfdp/request.rs b/satrs/src/cfdp/request.rs index ce87c04..7fc6e0a 100644 --- a/satrs/src/cfdp/request.rs +++ b/satrs/src/cfdp/request.rs @@ -6,24 +6,35 @@ use spacepackets::{ util::UnsignedByteField, }; -trait ReadablePutRequest { +#[cfg(feature = "alloc")] +pub use alloc_mod::*; + +#[derive(Debug, PartialEq, Eq)] +pub struct FilePathTooLarge(pub usize); + +/// This trait is an abstraction for different Put Request structures which can be used +/// by Put Request consumers. +pub trait ReadablePutRequest { fn destination_id(&self) -> UnsignedByteField; fn source_file(&self) -> Option<&str>; fn dest_file(&self) -> Option<&str>; fn trans_mode(&self) -> Option; fn closure_requested(&self) -> Option; fn seg_ctrl(&self) -> Option; + fn has_msgs_to_user(&self) -> bool; fn msgs_to_user(&self, f: impl FnMut(&Tlv)); + fn has_fault_handler_overrides(&self) -> bool; fn fault_handler_overrides(&self, f: impl FnMut(&Tlv)); fn flow_label(&self) -> Option; + fn has_fs_requests(&self) -> bool; fn fs_requests(&self, f: impl FnMut(&Tlv)); } #[derive(Debug, PartialEq, Eq)] pub struct PutRequest<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_label, 'fs_requests> { pub destination_id: UnsignedByteField, - pub source_file: Option<&'src_file str>, - pub dest_file: Option<&'dest_file str>, + source_file: Option<&'src_file str>, + dest_file: Option<&'dest_file str>, pub trans_mode: Option, pub closure_requested: Option, pub seg_ctrl: Option, @@ -33,6 +44,38 @@ pub struct PutRequest<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_lab pub fs_requests: Option<&'fs_requests [Tlv<'fs_requests>]>, } +impl<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_label, 'fs_requests> + PutRequest<'src_file, 'dest_file, 'msgs_to_user, 'fh_ovrds, 'flow_label, 'fs_requests> +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + destination_id: UnsignedByteField, + source_file: Option<&'src_file str>, + dest_file: Option<&'dest_file str>, + trans_mode: Option, + closure_requested: Option, + seg_ctrl: Option, + msgs_to_user: Option<&'msgs_to_user [Tlv<'msgs_to_user>]>, + fault_handler_overrides: Option<&'fh_ovrds [Tlv<'fh_ovrds>]>, + flow_label: Option>, + fs_requests: Option<&'fs_requests [Tlv<'fs_requests>]>, + ) -> Result { + generic_path_checks(source_file, dest_file)?; + Ok(Self { + destination_id, + source_file, + dest_file, + trans_mode, + closure_requested, + seg_ctrl, + msgs_to_user, + fault_handler_overrides, + flow_label, + fs_requests, + }) + } +} + impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> { fn destination_id(&self) -> UnsignedByteField { self.destination_id @@ -58,6 +101,10 @@ impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> { self.seg_ctrl } + fn has_msgs_to_user(&self) -> bool { + self.msgs_to_user.is_some() && self.msgs_to_user.unwrap().is_empty() + } + fn msgs_to_user(&self, mut f: impl FnMut(&Tlv)) { if let Some(msgs_to_user) = self.msgs_to_user { for msg_to_user in msgs_to_user { @@ -66,6 +113,10 @@ impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> { } } + fn has_fault_handler_overrides(&self) -> bool { + self.fault_handler_overrides.is_some() && self.fault_handler_overrides.unwrap().is_empty() + } + fn fault_handler_overrides(&self, mut f: impl FnMut(&Tlv)) { if let Some(fh_overrides) = self.fault_handler_overrides { for fh_override in fh_overrides { @@ -78,6 +129,10 @@ impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> { self.flow_label } + fn has_fs_requests(&self) -> bool { + self.fs_requests.is_some() && self.fs_requests.unwrap().is_empty() + } + fn fs_requests(&self, mut f: impl FnMut(&Tlv)) { if let Some(fs_requests) = self.fs_requests { for fs_request in fs_requests { @@ -87,6 +142,23 @@ impl ReadablePutRequest for PutRequest<'_, '_, '_, '_, '_, '_> { } } +pub fn generic_path_checks( + source_file: Option<&str>, + dest_file: Option<&str>, +) -> Result<(), FilePathTooLarge> { + if let Some(src_file) = source_file { + if src_file.len() > u8::MAX as usize { + return Err(FilePathTooLarge(src_file.len())); + } + } + if let Some(dest_file) = dest_file { + if dest_file.len() > u8::MAX as usize { + return Err(FilePathTooLarge(dest_file.len())); + } + } + Ok(()) +} + impl<'src_file, 'dest_file> PutRequest<'src_file, 'dest_file, 'static, 'static, 'static, 'static> { pub fn new_regular_request( dest_id: UnsignedByteField, @@ -94,8 +166,9 @@ impl<'src_file, 'dest_file> PutRequest<'src_file, 'dest_file, 'static, 'static, dest_file: &'dest_file str, trans_mode: Option, closure_requested: Option, - ) -> Self { - Self { + ) -> Result { + generic_path_checks(Some(source_file), Some(dest_file))?; + Ok(Self { destination_id: dest_id, source_file: Some(source_file), dest_file: Some(dest_file), @@ -106,7 +179,7 @@ impl<'src_file, 'dest_file> PutRequest<'src_file, 'dest_file, 'static, 'static, fault_handler_overrides: None, flow_label: None, fs_requests: None, - } + }) } } @@ -135,14 +208,12 @@ impl<'msgs_to_user> PutRequest<'static, 'static, 'msgs_to_user, 'static, 'static /// Uses [generic_tlv_list_type_check] to check the TLV type validity of all TLV fields. pub fn check_tlv_type_validities(&self) -> bool { generic_tlv_list_type_check(self.msgs_to_user, TlvType::MsgToUser); - if let Some(msgs_to_user) = self.msgs_to_user { - for msg_to_user in msgs_to_user { - if msg_to_user.tlv_type().is_none() { - return false; - } - if msg_to_user.tlv_type().unwrap() != TlvType::MsgToUser { - return false; - } + if let Some(flow_label) = &self.flow_label { + if flow_label.tlv_type().is_none() { + return false; + } + if flow_label.tlv_type().unwrap() != TlvType::FlowLabel { + return false; } } generic_tlv_list_type_check(self.fault_handler_overrides, TlvType::FaultHandler); @@ -151,7 +222,10 @@ impl<'msgs_to_user> PutRequest<'static, 'static, 'msgs_to_user, 'static, 'static } } -pub fn generic_tlv_list_type_check(opt_tlvs: Option<&[Tlv<'_>]>, tlv_type: TlvType) -> bool { +pub fn generic_tlv_list_type_check( + opt_tlvs: Option<&[TlvProvider]>, + tlv_type: TlvType, +) -> bool { if let Some(tlvs) = opt_tlvs { for tlv in tlvs { if tlv.tlv_type().is_none() { @@ -167,16 +241,22 @@ pub fn generic_tlv_list_type_check(opt_tlvs: Option<&[Tlv<'_>]>, tlv_type: TlvTy #[cfg(feature = "alloc")] pub mod alloc_mod { + use core::str::Utf8Error; + use super::*; use alloc::string::ToString; - use spacepackets::cfdp::tlv::{msg_to_user::MsgToUserTlv, TlvOwned}; + use spacepackets::{ + cfdp::tlv::{msg_to_user::MsgToUserTlv, ReadableTlv, TlvOwned, WritableTlv}, + ByteConversionError, + }; + /// Owned variant of [PutRequest] with no lifetimes which is also [Clone]able. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct PutRequestOwned { pub destination_id: UnsignedByteField, - pub source_file: Option, - pub dest_file: Option, + source_file: Option, + dest_file: Option, pub trans_mode: Option, pub closure_requested: Option, pub seg_ctrl: Option, @@ -193,8 +273,14 @@ pub mod alloc_mod { dest_file: &str, trans_mode: Option, closure_requested: Option, - ) -> Self { - Self { + ) -> Result { + if source_file.len() > u8::MAX as usize { + return Err(FilePathTooLarge(source_file.len())); + } + if dest_file.len() > u8::MAX as usize { + return Err(FilePathTooLarge(dest_file.len())); + } + Ok(Self { destination_id: dest_id, source_file: Some(source_file.to_string()), dest_file: Some(dest_file.to_string()), @@ -205,7 +291,7 @@ pub mod alloc_mod { fault_handler_overrides: None, flow_label: None, fs_requests: None, - } + }) } pub fn new_msgs_to_user_only( @@ -225,6 +311,25 @@ pub mod alloc_mod { fs_requests: None, }) } + + /// Uses [generic_tlv_list_type_check] to check the TLV type validity of all TLV fields. + pub fn check_tlv_type_validities(&self) -> bool { + generic_tlv_list_type_check(self.msgs_to_user.as_deref(), TlvType::MsgToUser); + if let Some(flow_label) = &self.flow_label { + if flow_label.tlv_type().is_none() { + return false; + } + if flow_label.tlv_type().unwrap() != TlvType::FlowLabel { + return false; + } + } + generic_tlv_list_type_check( + self.fault_handler_overrides.as_deref(), + TlvType::FaultHandler, + ); + generic_tlv_list_type_check(self.fs_requests.as_deref(), TlvType::FilestoreRequest); + true + } } impl From> for PutRequestOwned { @@ -304,5 +409,288 @@ pub mod alloc_mod { } } } + + fn has_msgs_to_user(&self) -> bool { + self.msgs_to_user.is_some() && !self.msgs_to_user.as_ref().unwrap().is_empty() + } + + fn has_fault_handler_overrides(&self) -> bool { + self.fault_handler_overrides.is_some() + && !self.fault_handler_overrides.as_ref().unwrap().is_empty() + } + + fn has_fs_requests(&self) -> bool { + self.fs_requests.is_some() && !self.fs_requests.as_ref().unwrap().is_empty() + } + } + + pub struct StaticPutRequestFields { + pub destination_id: UnsignedByteField, + /// Static buffer to store source file path. + pub source_file_buf: [u8; u8::MAX as usize], + /// Current source path length. + pub source_file_len: usize, + /// Static buffer to store dest file path. + pub dest_file_buf: [u8; u8::MAX as usize], + /// Current destination path length. + pub dest_file_len: usize, + pub trans_mode: Option, + pub closure_requested: Option, + pub seg_ctrl: Option, + } + + impl Default for StaticPutRequestFields { + fn default() -> Self { + Self { + destination_id: UnsignedByteField::new(0, 0), + source_file_buf: [0; u8::MAX as usize], + source_file_len: Default::default(), + dest_file_buf: [0; u8::MAX as usize], + dest_file_len: Default::default(), + trans_mode: Default::default(), + closure_requested: Default::default(), + seg_ctrl: Default::default(), + } + } + } + + impl StaticPutRequestFields { + pub fn clear(&mut self) { + self.destination_id = UnsignedByteField::new(0, 0); + self.source_file_len = 0; + self.dest_file_len = 0; + self.trans_mode = None; + self.closure_requested = None; + self.seg_ctrl = None; + } + } + + /// This is a put request cache structure which can be used to cache [ReadablePutRequest]s + /// without requiring run-time allocation. The user must specify the static buffer sizes used + /// to store TLVs or list of TLVs. + pub struct StaticPutRequestCacher { + pub static_fields: StaticPutRequestFields, + /// Static buffer to store file store requests. + pub fs_requests: alloc::vec::Vec, + /// Current total length of stored filestore requests. + pub fs_requests_len: usize, + } + + pub struct PutRequestCacheConfig { + pub max_msgs_to_user_storage: usize, + pub max_fault_handler_overrides_storage: usize, + pub max_flow_label_storage: usize, + pub max_fs_requests_storage: usize, + } + + impl StaticPutRequestCacher { + pub fn new(cfg: PutRequestCacheConfig) -> Self { + Self { + static_fields: StaticPutRequestFields::default(), + fs_requests: alloc::vec![0; cfg.max_fs_requests_storage], + fs_requests_len: 0, + } + } + + pub fn set( + &mut self, + put_request: &impl ReadablePutRequest, + ) -> Result<(), ByteConversionError> { + self.static_fields.destination_id = put_request.destination_id(); + if let Some(source_file) = put_request.source_file() { + if source_file.len() > u8::MAX as usize { + return Err(ByteConversionError::ToSliceTooSmall { + found: self.static_fields.source_file_buf.len(), + expected: source_file.len(), + }); + } + self.static_fields.source_file_buf[..source_file.len()] + .copy_from_slice(source_file.as_bytes()); + self.static_fields.source_file_len = source_file.len(); + } + if let Some(dest_file) = put_request.dest_file() { + if dest_file.len() > u8::MAX as usize { + return Err(ByteConversionError::ToSliceTooSmall { + found: self.static_fields.source_file_buf.len(), + expected: dest_file.len(), + }); + } + self.static_fields.dest_file_buf[..dest_file.len()] + .copy_from_slice(dest_file.as_bytes()); + self.static_fields.dest_file_len = dest_file.len(); + } + self.static_fields.trans_mode = put_request.trans_mode(); + self.static_fields.closure_requested = put_request.closure_requested(); + self.static_fields.seg_ctrl = put_request.seg_ctrl(); + let mut current_idx = 0; + let mut error_if_too_large = None; + let mut store_fs_requests = |tlv: &Tlv| { + if current_idx + tlv.len_full() > self.fs_requests.len() { + error_if_too_large = Some(ByteConversionError::ToSliceTooSmall { + found: self.fs_requests.len(), + expected: current_idx + tlv.len_full(), + }); + return; + } + // We checked the buffer lengths, so this should never fail. + tlv.write_to_bytes( + &mut self.fs_requests[current_idx..current_idx + tlv.len_full()], + ) + .unwrap(); + current_idx += tlv.len_full(); + }; + put_request.fs_requests(&mut store_fs_requests); + if let Some(err) = error_if_too_large { + return Err(err); + } + self.fs_requests_len = current_idx; + Ok(()) + } + + pub fn source_file(&self) -> Result<&str, Utf8Error> { + core::str::from_utf8( + &self.static_fields.source_file_buf[0..self.static_fields.source_file_len], + ) + } + + pub fn dest_file(&self) -> Result<&str, Utf8Error> { + core::str::from_utf8( + &self.static_fields.dest_file_buf[0..self.static_fields.dest_file_len], + ) + } + + /// This clears the cacher structure. This is a cheap operation because it only + /// sets [Option]al values to [None] and the length of stores TLVs to 0. + /// + /// Please note that this method will not set the values in the buffer to 0. + pub fn clear(&mut self) { + self.static_fields.clear(); + self.fs_requests_len = 0; + } + } +} + +#[cfg(test)] +mod tests { + use spacepackets::util::UbfU16; + + use super::*; + + pub const DEST_ID: UbfU16 = UbfU16::new(5); + + #[test] + fn test_put_request_basic() { + let src_file = "/tmp/hello.txt"; + let dest_file = "/tmp/hello2.txt"; + let put_request = + PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None) + .unwrap(); + assert_eq!(put_request.source_file(), Some(src_file)); + assert_eq!(put_request.dest_file(), Some(dest_file)); + assert_eq!(put_request.destination_id(), DEST_ID.into()); + assert_eq!(put_request.seg_ctrl(), None); + assert_eq!(put_request.closure_requested(), None); + assert_eq!(put_request.trans_mode(), None); + assert!(!put_request.has_fs_requests()); + let dummy = |_tlv: &Tlv| { + panic!("should not be called"); + }; + put_request.fs_requests(&dummy); + assert!(!put_request.has_msgs_to_user()); + put_request.msgs_to_user(&dummy); + assert!(!put_request.has_fault_handler_overrides()); + put_request.fault_handler_overrides(&dummy); + assert!(put_request.flow_label().is_none()); + } + + #[test] + fn test_put_request_owned_basic() { + let src_file = "/tmp/hello.txt"; + let dest_file = "/tmp/hello2.txt"; + let put_request = + PutRequestOwned::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None) + .unwrap(); + assert_eq!(put_request.source_file(), Some(src_file)); + assert_eq!(put_request.dest_file(), Some(dest_file)); + assert_eq!(put_request.destination_id(), DEST_ID.into()); + assert_eq!(put_request.seg_ctrl(), None); + assert_eq!(put_request.closure_requested(), None); + assert_eq!(put_request.trans_mode(), None); + assert!(!put_request.has_fs_requests()); + let dummy = |_tlv: &Tlv| { + panic!("should not be called"); + }; + put_request.fs_requests(&dummy); + assert!(!put_request.has_msgs_to_user()); + put_request.msgs_to_user(&dummy); + assert!(!put_request.has_fault_handler_overrides()); + put_request.fault_handler_overrides(&dummy); + assert!(put_request.flow_label().is_none()); + let put_request_cloned = put_request.clone(); + assert_eq!(put_request, put_request_cloned); + } + + #[test] + fn test_put_request_cacher_basic() { + let cacher_cfg = PutRequestCacheConfig { + max_msgs_to_user_storage: 512, + max_fault_handler_overrides_storage: 128, + max_flow_label_storage: 128, + max_fs_requests_storage: 512, + }; + let put_request_cached = StaticPutRequestCacher::new(cacher_cfg); + assert_eq!(put_request_cached.static_fields.source_file_len, 0); + assert_eq!(put_request_cached.static_fields.dest_file_len, 0); + assert_eq!(put_request_cached.fs_requests_len, 0); + assert_eq!(put_request_cached.fs_requests.len(), 512); + } + + #[test] + fn test_put_request_cacher_set() { + let cacher_cfg = PutRequestCacheConfig { + max_msgs_to_user_storage: 512, + max_fault_handler_overrides_storage: 128, + max_flow_label_storage: 128, + max_fs_requests_storage: 512, + }; + let mut put_request_cached = StaticPutRequestCacher::new(cacher_cfg); + let src_file = "/tmp/hello.txt"; + let dest_file = "/tmp/hello2.txt"; + let put_request = + PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None) + .unwrap(); + put_request_cached.set(&put_request).unwrap(); + assert_eq!( + put_request_cached.static_fields.source_file_len, + src_file.len() + ); + assert_eq!( + put_request_cached.static_fields.dest_file_len, + dest_file.len() + ); + assert_eq!(put_request_cached.source_file().unwrap(), src_file); + assert_eq!(put_request_cached.dest_file().unwrap(), dest_file); + assert_eq!(put_request_cached.fs_requests_len, 0); + } + + #[test] + fn test_put_request_cacher_set_and_clear() { + let cacher_cfg = PutRequestCacheConfig { + max_msgs_to_user_storage: 512, + max_fault_handler_overrides_storage: 128, + max_flow_label_storage: 128, + max_fs_requests_storage: 512, + }; + let mut put_request_cached = StaticPutRequestCacher::new(cacher_cfg); + let src_file = "/tmp/hello.txt"; + let dest_file = "/tmp/hello2.txt"; + let put_request = + PutRequest::new_regular_request(DEST_ID.into(), src_file, dest_file, None, None) + .unwrap(); + put_request_cached.set(&put_request).unwrap(); + put_request_cached.clear(); + assert_eq!(put_request_cached.static_fields.source_file_len, 0); + assert_eq!(put_request_cached.static_fields.dest_file_len, 0); + assert_eq!(put_request_cached.fs_requests_len, 0); } } diff --git a/satrs/src/cfdp/source.rs b/satrs/src/cfdp/source.rs index 250bc59..ca81be4 100644 --- a/satrs/src/cfdp/source.rs +++ b/satrs/src/cfdp/source.rs @@ -1,8 +1,8 @@ use spacepackets::cfdp::{pdu::FileDirectiveType, PduType}; use super::{ - filestore::VirtualFilestore, user::CfdpUser, LocalEntityConfig, PacketInfo, PacketTarget, - PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider, + filestore::VirtualFilestore, request::ReadablePutRequest, user::CfdpUser, LocalEntityConfig, + PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider, }; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -155,6 +155,10 @@ impl< Ok(()) } + fn put_request(&mut self, put_request: &impl ReadablePutRequest) -> Result<(), SourceError> { + Ok(()) + } + fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { Ok(0) }