diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index 0771a79..afc15a0 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -4,16 +4,19 @@ use std::{ }; use log::info; -use satrs::tmtc::{PacketAsVec, PacketInPool, SharedPacketPool}; use satrs::{ pool::PoolProvider, - seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, + seq_count::CcsdsSimpleSeqCountProvider, spacepackets::{ ecss::{tm::PusTmZeroCopyWriter, PusPacket}, time::cds::MIN_CDS_FIELD_LEN, CcsdsPacket, }, }; +use satrs::{ + seq_count::SequenceCountProvider, + tmtc::{PacketAsVec, PacketInPool, SharedPacketPool}, +}; use crate::interface::tcp::SyncTcpTmSource; diff --git a/satrs/src/cfdp/dest.rs b/satrs/src/cfdp/dest.rs index c320bb2..39f3836 100644 --- a/satrs/src/cfdp/dest.rs +++ b/satrs/src/cfdp/dest.rs @@ -54,6 +54,7 @@ pub enum TransactionStep { SendingFinishedPdu = 6, } +// This contains transfer state parameters for destination transaction. #[derive(Debug)] struct TransferState { transaction_id: Option, @@ -87,6 +88,7 @@ impl Default for TransferState { } } +// This contains parameters for destination transaction. #[derive(Debug)] struct TransactionParams { tstate: TransferState, @@ -380,9 +382,7 @@ impl< let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?; self.tparams.reset(); self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params(); - let remote_cfg = self - .remote_cfg_table - .get_remote_config(metadata_pdu.source_id().value()); + let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value()); if remote_cfg.is_none() { return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id())); } @@ -847,7 +847,7 @@ mod tests { tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestFaultHandler}, user::OwnedMetadataRecvdParams, CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig, - RemoteEntityConfig, StdRemoteEntityConfigProvider, CRC_32, + StdRemoteEntityConfigProvider, CRC_32, }; use super::*; diff --git a/satrs/src/cfdp/mod.rs b/satrs/src/cfdp/mod.rs index 514b8ce..322bd81 100644 --- a/satrs/src/cfdp/mod.rs +++ b/satrs/src/cfdp/mod.rs @@ -268,8 +268,8 @@ impl RemoteEntityConfig { pub trait RemoteEntityConfigProvider { /// Retrieve the remote entity configuration for the given remote ID. - fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig>; - fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>; + fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>; + fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>; /// Add a new remote configuration. Return [true] if the configuration was /// inserted successfully, and [false] if a configuration already exists. fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool; @@ -286,10 +286,10 @@ pub struct StdRemoteEntityConfigProvider { #[cfg(feature = "std")] impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider { - fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { + fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> { self.remote_cfg_table.get(&remote_id) } - fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> { + fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> { self.remote_cfg_table.get_mut(&remote_id) } fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool { diff --git a/satrs/src/cfdp/request.rs b/satrs/src/cfdp/request.rs index 7fc6e0a..9254f70 100644 --- a/satrs/src/cfdp/request.rs +++ b/satrs/src/cfdp/request.rs @@ -476,18 +476,11 @@ pub mod alloc_mod { pub fs_requests_len: usize, } - pub struct PutRequestCacheConfig { - pub max_msgs_to_user_storage: usize, - pub max_fault_handler_overrides_storage: usize, - pub max_flow_label_storage: usize, - pub max_fs_requests_storage: usize, - } - impl StaticPutRequestCacher { - pub fn new(cfg: PutRequestCacheConfig) -> Self { + pub fn new(max_fs_requests_storage: usize) -> Self { Self { static_fields: StaticPutRequestFields::default(), - fs_requests: alloc::vec![0; cfg.max_fs_requests_storage], + fs_requests: alloc::vec![0; max_fs_requests_storage], fs_requests_len: 0, } } diff --git a/satrs/src/cfdp/source.rs b/satrs/src/cfdp/source.rs index ca81be4..4e1c4eb 100644 --- a/satrs/src/cfdp/source.rs +++ b/satrs/src/cfdp/source.rs @@ -1,8 +1,17 @@ -use spacepackets::cfdp::{pdu::FileDirectiveType, PduType}; +use spacepackets::{ + cfdp::{pdu::FileDirectiveType, PduType}, + util::UnsignedByteField, + ByteConversionError, +}; + +use crate::seq_count::SequenceCountProvider; use super::{ - filestore::VirtualFilestore, request::ReadablePutRequest, user::CfdpUser, LocalEntityConfig, - PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider, + filestore::VirtualFilestore, + request::{ReadablePutRequest, StaticPutRequestCacher}, + user::CfdpUser, + LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig, + RemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, }; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -12,7 +21,7 @@ pub enum TransactionStep { TransactionStart = 1, SendingMetadata = 3, SendingFileData = 4, - /// Re-transmitting missing packets in acknowledged mode6 + /// Re-transmitting missing packets in acknowledged mode Retransmitting = 5, SendingEof = 6, WaitingForEofAck = 7, @@ -36,6 +45,14 @@ pub struct StateHelper { num_packets_ready: u32, } +#[derive(Debug, Copy, Clone, derive_new::new)] +pub struct TransferState { + transaction_id: TransactionId, + remote_cfg: RemoteEntityConfig, + transmission_mode: super::TransmissionMode, + closure_requested: bool, +} + impl Default for StateHelper { fn default() -> Self { Self { @@ -55,6 +72,18 @@ pub enum SourceError { }, #[error("unexpected file data PDU")] UnexpectedFileDataPdu, + #[error("source handler is already busy with put request")] + PutRequestAlreadyActive, + #[error("error caching put request")] + PutRequestCaching(ByteConversionError), +} + +#[derive(Debug, thiserror::Error)] +pub enum PutRequestError { + #[error("error caching put request: {0}")] + Storage(#[from] ByteConversionError), + #[error("already busy with put request")] + AlreadyBusy, } pub struct SourceHandler< @@ -62,12 +91,16 @@ pub struct SourceHandler< UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, + SeqCountProvider: SequenceCountProvider, > { local_cfg: LocalEntityConfig, pdu_sender: PduSender, + put_request_cacher: StaticPutRequestCacher, remote_cfg_table: RemoteCfgTable, vfs: Vfs, state_helper: StateHelper, + tstate: Option, + seq_count_provider: SeqCountProvider, } impl< @@ -75,20 +108,26 @@ impl< UserFaultHook: UserFaultHookProvider, Vfs: VirtualFilestore, RemoteCfgTable: RemoteEntityConfigProvider, - > SourceHandler + SeqCountProvider: SequenceCountProvider, + > SourceHandler { pub fn new( cfg: LocalEntityConfig, pdu_sender: PduSender, vfs: Vfs, + put_request_cacher: StaticPutRequestCacher, remote_cfg_table: RemoteCfgTable, + seq_count_provider: SeqCountProvider, ) -> Self { Self { local_cfg: cfg, remote_cfg_table, - vfs, pdu_sender, + vfs, + put_request_cacher, state_helper: Default::default(), + tstate: Default::default(), + seq_count_provider, } } @@ -155,10 +194,66 @@ impl< Ok(()) } - fn put_request(&mut self, put_request: &impl ReadablePutRequest) -> Result<(), SourceError> { + pub fn put_request( + &mut self, + put_request: &impl ReadablePutRequest, + ) -> Result<(), PutRequestError> { + if self.state_helper.state != super::State::Idle { + return Err(PutRequestError::AlreadyBusy); + } + self.put_request_cacher.set(put_request)?; + self.state_helper.state = super::State::Busy; + let source_file = self.put_request_cacher.source_file().unwrap(); + if !self.vfs.exists(source_file) { + // TODO: Specific error. + } + let remote_cfg = self.remote_cfg_table.get( + self.put_request_cacher + .static_fields + .destination_id + .value_const(), + ); + if remote_cfg.is_none() { + // TODO: Specific error. + } + let remote_cfg = remote_cfg.unwrap(); + self.state_helper.num_packets_ready = 0; + //self.tstate.remote_cfg = Some(*remote_cfg); + let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() { + self.put_request_cacher.static_fields.trans_mode.unwrap() + } else { + remote_cfg.default_transmission_mode + }; + let closure_requested = if self + .put_request_cacher + .static_fields + .closure_requested + .is_some() + { + self.put_request_cacher + .static_fields + .closure_requested + .unwrap() + } else { + remote_cfg.closure_requested_by_default + }; + self.tstate = Some(TransferState::new( + TransactionId::new( + self.put_request_cacher.static_fields.destination_id, + UnsignedByteField::new( + SeqCountProvider::MAX_BIT_WIDTH / 8, + self.seq_count_provider.get_and_increment().into(), + ), + ), + *remote_cfg, + transmission_mode, + closure_requested, + )); Ok(()) } + pub fn transmission_mode(&self) {} + fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { Ok(0) } @@ -172,14 +267,16 @@ impl< #[cfg(test)] mod tests { - use alloc::sync::Arc; use spacepackets::util::UnsignedByteFieldU16; use super::*; - use crate::cfdp::{ - filestore::NativeFilestore, - tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler}, - FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, + use crate::{ + cfdp::{ + filestore::NativeFilestore, + tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler}, + FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, + }, + seq_count::SeqCountProviderSimple, }; const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); @@ -190,6 +287,7 @@ mod tests { TestFaultHandler, NativeFilestore, StdRemoteEntityConfigProvider, + SeqCountProviderSimple, >; fn default_source_handler( @@ -201,12 +299,14 @@ mod tests { indication_cfg: IndicationConfig::default(), fault_handler: FaultHandler::new(test_fault_handler), }; + let static_put_request_cacher = StaticPutRequestCacher::new(1024); SourceHandler::new( local_entity_cfg, test_packet_sender, NativeFilestore::default(), + static_put_request_cacher, basic_remote_cfg_table(), - // TestCheckTimerCreator::new(check_timer_expired), + SeqCountProviderSimple::default(), ) } diff --git a/satrs/src/seq_count.rs b/satrs/src/seq_count.rs index b4539b0..1dcd17c 100644 --- a/satrs/src/seq_count.rs +++ b/satrs/src/seq_count.rs @@ -1,6 +1,4 @@ use core::cell::Cell; -#[cfg(feature = "alloc")] -use dyn_clone::DynClone; use paste::paste; use spacepackets::MAX_SEQ_COUNT; #[cfg(feature = "std")] @@ -11,27 +9,21 @@ pub use stdmod::*; /// The core functions are not mutable on purpose to allow easier usage with /// static structs when using the interior mutability pattern. This can be achieved by using /// [Cell], [core::cell::RefCell] or atomic types. -pub trait SequenceCountProviderCore { - fn get(&self) -> Raw; +pub trait SequenceCountProvider { + type Raw: Into; + const MAX_BIT_WIDTH: usize; + + fn get(&self) -> Self::Raw; fn increment(&self); - fn get_and_increment(&self) -> Raw { + fn get_and_increment(&self) -> Self::Raw { let val = self.get(); self.increment(); val } } -/// Extension trait which allows cloning a sequence count provider after it was turned into -/// a trait object. -#[cfg(feature = "alloc")] -pub trait SequenceCountProvider: SequenceCountProviderCore + DynClone {} -#[cfg(feature = "alloc")] -dyn_clone::clone_trait_object!(SequenceCountProvider); -#[cfg(feature = "alloc")] -impl SequenceCountProvider for T where T: SequenceCountProviderCore + Clone {} - #[derive(Clone)] pub struct SeqCountProviderSimple { seq_count: Cell, @@ -63,8 +55,11 @@ macro_rules! impl_for_primitives { } } - impl SequenceCountProviderCore<$ty> for SeqCountProviderSimple<$ty> { - fn get(&self) -> $ty { + impl SequenceCountProvider for SeqCountProviderSimple<$ty> { + type Raw = $ty; + const MAX_BIT_WIDTH: usize = core::mem::size_of::() * 8; + + fn get(&self) -> Self::Raw { self.seq_count.get() } @@ -72,7 +67,7 @@ macro_rules! impl_for_primitives { self.get_and_increment(); } - fn get_and_increment(&self) -> $ty { + fn get_and_increment(&self) -> Self::Raw { let curr_count = self.seq_count.get(); if curr_count == self.max_val { @@ -104,7 +99,9 @@ impl Default for CcsdsSimpleSeqCountProvider { } } -impl SequenceCountProviderCore for CcsdsSimpleSeqCountProvider { +impl SequenceCountProvider for CcsdsSimpleSeqCountProvider { + type Raw = u16; + const MAX_BIT_WIDTH: usize = core::mem::size_of::() * 8; delegate::delegate! { to self.provider { fn get(&self) -> u16; @@ -144,7 +141,10 @@ pub mod stdmod { } } } - impl SequenceCountProviderCore<$ty> for [] { + impl SequenceCountProvider for [] { + type Raw = $ty; + const MAX_BIT_WIDTH: usize = core::mem::size_of::() * 8; + fn get(&self) -> $ty { match self.seq_count.lock() { Ok(counter) => *counter, @@ -181,7 +181,7 @@ pub mod stdmod { mod tests { use crate::seq_count::{ CcsdsSimpleSeqCountProvider, SeqCountProviderSimple, SeqCountProviderSyncU8, - SequenceCountProviderCore, + SequenceCountProvider, }; use spacepackets::MAX_SEQ_COUNT;