CFDP extracted to library #201

Closed
muellerr wants to merge 18 commits from continue-cfsp-source-handler into main
6 changed files with 148 additions and 52 deletions
Showing only changes of commit 35b24ba9de - Show all commits

View File

@ -4,16 +4,19 @@ use std::{
};
use log::info;
use satrs::tmtc::{PacketAsVec, PacketInPool, SharedPacketPool};
use satrs::{
pool::PoolProvider,
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
seq_count::CcsdsSimpleSeqCountProvider,
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
};
use satrs::{
seq_count::SequenceCountProvider,
tmtc::{PacketAsVec, PacketInPool, SharedPacketPool},
};
use crate::interface::tcp::SyncTcpTmSource;

View File

@ -54,6 +54,7 @@ pub enum TransactionStep {
SendingFinishedPdu = 6,
}
// This contains transfer state parameters for destination transaction.
#[derive(Debug)]
struct TransferState<CheckTimer: CountdownProvider> {
transaction_id: Option<TransactionId>,
@ -87,6 +88,7 @@ impl<CheckTimer: CountdownProvider> Default for TransferState<CheckTimer> {
}
}
// This contains parameters for destination transaction.
#[derive(Debug)]
struct TransactionParams<CheckTimer: CountdownProvider> {
tstate: TransferState<CheckTimer>,
@ -380,9 +382,7 @@ impl<
let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?;
self.tparams.reset();
self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params();
let remote_cfg = self
.remote_cfg_table
.get_remote_config(metadata_pdu.source_id().value());
let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value());
if remote_cfg.is_none() {
return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id()));
}
@ -847,7 +847,7 @@ mod tests {
tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestFaultHandler},
user::OwnedMetadataRecvdParams,
CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig,
RemoteEntityConfig, StdRemoteEntityConfigProvider, CRC_32,
StdRemoteEntityConfigProvider, CRC_32,
};
use super::*;

View File

@ -268,8 +268,8 @@ impl RemoteEntityConfig {
pub trait RemoteEntityConfigProvider {
/// Retrieve the remote entity configuration for the given remote ID.
fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
/// Add a new remote configuration. Return [true] if the configuration was
/// inserted successfully, and [false] if a configuration already exists.
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool;
@ -286,10 +286,10 @@ pub struct StdRemoteEntityConfigProvider {
#[cfg(feature = "std")]
impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider {
fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
fn get(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.remote_cfg_table.get(&remote_id)
}
fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
fn get_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.remote_cfg_table.get_mut(&remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool {

View File

@ -476,18 +476,11 @@ pub mod alloc_mod {
pub fs_requests_len: usize,
}
pub struct PutRequestCacheConfig {
pub max_msgs_to_user_storage: usize,
pub max_fault_handler_overrides_storage: usize,
pub max_flow_label_storage: usize,
pub max_fs_requests_storage: usize,
}
impl StaticPutRequestCacher {
pub fn new(cfg: PutRequestCacheConfig) -> Self {
pub fn new(max_fs_requests_storage: usize) -> Self {
Self {
static_fields: StaticPutRequestFields::default(),
fs_requests: alloc::vec![0; cfg.max_fs_requests_storage],
fs_requests: alloc::vec![0; max_fs_requests_storage],
fs_requests_len: 0,
}
}

View File

@ -1,8 +1,17 @@
use spacepackets::cfdp::{pdu::FileDirectiveType, PduType};
use spacepackets::{
cfdp::{pdu::FileDirectiveType, PduType},
util::UnsignedByteField,
ByteConversionError,
};
use crate::seq_count::SequenceCountProvider;
use super::{
filestore::VirtualFilestore, request::ReadablePutRequest, user::CfdpUser, LocalEntityConfig,
PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider,
filestore::VirtualFilestore,
request::{ReadablePutRequest, StaticPutRequestCacher},
user::CfdpUser,
LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig,
RemoteEntityConfigProvider, TransactionId, UserFaultHookProvider,
};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -12,7 +21,7 @@ pub enum TransactionStep {
TransactionStart = 1,
SendingMetadata = 3,
SendingFileData = 4,
/// Re-transmitting missing packets in acknowledged mode6
/// Re-transmitting missing packets in acknowledged mode
Retransmitting = 5,
SendingEof = 6,
WaitingForEofAck = 7,
@ -36,6 +45,14 @@ pub struct StateHelper {
num_packets_ready: u32,
}
#[derive(Debug, Copy, Clone, derive_new::new)]
pub struct TransferState {
transaction_id: TransactionId,
remote_cfg: RemoteEntityConfig,
transmission_mode: super::TransmissionMode,
closure_requested: bool,
}
impl Default for StateHelper {
fn default() -> Self {
Self {
@ -55,6 +72,18 @@ pub enum SourceError {
},
#[error("unexpected file data PDU")]
UnexpectedFileDataPdu,
#[error("source handler is already busy with put request")]
PutRequestAlreadyActive,
#[error("error caching put request")]
PutRequestCaching(ByteConversionError),
}
#[derive(Debug, thiserror::Error)]
pub enum PutRequestError {
#[error("error caching put request: {0}")]
Storage(#[from] ByteConversionError),
#[error("already busy with put request")]
AlreadyBusy,
}
pub struct SourceHandler<
@ -62,12 +91,16 @@ pub struct SourceHandler<
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
SeqCountProvider: SequenceCountProvider,
> {
local_cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
put_request_cacher: StaticPutRequestCacher,
remote_cfg_table: RemoteCfgTable,
vfs: Vfs,
state_helper: StateHelper,
tstate: Option<TransferState>,
seq_count_provider: SeqCountProvider,
}
impl<
@ -75,20 +108,26 @@ impl<
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
> SourceHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable>
SeqCountProvider: SequenceCountProvider,
> SourceHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable, SeqCountProvider>
{
pub fn new(
cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
vfs: Vfs,
put_request_cacher: StaticPutRequestCacher,
remote_cfg_table: RemoteCfgTable,
seq_count_provider: SeqCountProvider,
) -> Self {
Self {
local_cfg: cfg,
remote_cfg_table,
vfs,
pdu_sender,
vfs,
put_request_cacher,
state_helper: Default::default(),
tstate: Default::default(),
seq_count_provider,
}
}
@ -155,10 +194,66 @@ impl<
Ok(())
}
fn put_request(&mut self, put_request: &impl ReadablePutRequest) -> Result<(), SourceError> {
pub fn put_request(
&mut self,
put_request: &impl ReadablePutRequest,
) -> Result<(), PutRequestError> {
if self.state_helper.state != super::State::Idle {
return Err(PutRequestError::AlreadyBusy);
}
self.put_request_cacher.set(put_request)?;
self.state_helper.state = super::State::Busy;
let source_file = self.put_request_cacher.source_file().unwrap();
if !self.vfs.exists(source_file) {
// TODO: Specific error.
}
let remote_cfg = self.remote_cfg_table.get(
self.put_request_cacher
.static_fields
.destination_id
.value_const(),
);
if remote_cfg.is_none() {
// TODO: Specific error.
}
let remote_cfg = remote_cfg.unwrap();
self.state_helper.num_packets_ready = 0;
//self.tstate.remote_cfg = Some(*remote_cfg);
let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() {
self.put_request_cacher.static_fields.trans_mode.unwrap()
} else {
remote_cfg.default_transmission_mode
};
let closure_requested = if self
.put_request_cacher
.static_fields
.closure_requested
.is_some()
{
self.put_request_cacher
.static_fields
.closure_requested
.unwrap()
} else {
remote_cfg.closure_requested_by_default
};
self.tstate = Some(TransferState::new(
TransactionId::new(
self.put_request_cacher.static_fields.destination_id,
UnsignedByteField::new(
SeqCountProvider::MAX_BIT_WIDTH / 8,
self.seq_count_provider.get_and_increment().into(),
),
),
*remote_cfg,
transmission_mode,
closure_requested,
));
Ok(())
}
pub fn transmission_mode(&self) {}
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> {
Ok(0)
}
@ -172,14 +267,16 @@ impl<
#[cfg(test)]
mod tests {
use alloc::sync::Arc;
use spacepackets::util::UnsignedByteFieldU16;
use super::*;
use crate::cfdp::{
use crate::{
cfdp::{
filestore::NativeFilestore,
tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler},
FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider,
},
seq_count::SeqCountProviderSimple,
};
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
@ -190,6 +287,7 @@ mod tests {
TestFaultHandler,
NativeFilestore,
StdRemoteEntityConfigProvider,
SeqCountProviderSimple<u16>,
>;
fn default_source_handler(
@ -201,12 +299,14 @@ mod tests {
indication_cfg: IndicationConfig::default(),
fault_handler: FaultHandler::new(test_fault_handler),
};
let static_put_request_cacher = StaticPutRequestCacher::new(1024);
SourceHandler::new(
local_entity_cfg,
test_packet_sender,
NativeFilestore::default(),
static_put_request_cacher,
basic_remote_cfg_table(),
// TestCheckTimerCreator::new(check_timer_expired),
SeqCountProviderSimple::default(),
)
}

View File

@ -1,6 +1,4 @@
use core::cell::Cell;
#[cfg(feature = "alloc")]
use dyn_clone::DynClone;
use paste::paste;
use spacepackets::MAX_SEQ_COUNT;
#[cfg(feature = "std")]
@ -11,27 +9,21 @@ pub use stdmod::*;
/// The core functions are not mutable on purpose to allow easier usage with
/// static structs when using the interior mutability pattern. This can be achieved by using
/// [Cell], [core::cell::RefCell] or atomic types.
pub trait SequenceCountProviderCore<Raw> {
fn get(&self) -> Raw;
pub trait SequenceCountProvider {
type Raw: Into<u64>;
const MAX_BIT_WIDTH: usize;
fn get(&self) -> Self::Raw;
fn increment(&self);
fn get_and_increment(&self) -> Raw {
fn get_and_increment(&self) -> Self::Raw {
let val = self.get();
self.increment();
val
}
}
/// Extension trait which allows cloning a sequence count provider after it was turned into
/// a trait object.
#[cfg(feature = "alloc")]
pub trait SequenceCountProvider<Raw>: SequenceCountProviderCore<Raw> + DynClone {}
#[cfg(feature = "alloc")]
dyn_clone::clone_trait_object!(SequenceCountProvider<u16>);
#[cfg(feature = "alloc")]
impl<T, Raw> SequenceCountProvider<Raw> for T where T: SequenceCountProviderCore<Raw> + Clone {}
#[derive(Clone)]
pub struct SeqCountProviderSimple<T: Copy> {
seq_count: Cell<T>,
@ -63,8 +55,11 @@ macro_rules! impl_for_primitives {
}
}
impl SequenceCountProviderCore<$ty> for SeqCountProviderSimple<$ty> {
fn get(&self) -> $ty {
impl SequenceCountProvider for SeqCountProviderSimple<$ty> {
type Raw = $ty;
const MAX_BIT_WIDTH: usize = core::mem::size_of::<Self::Raw>() * 8;
fn get(&self) -> Self::Raw {
self.seq_count.get()
}
@ -72,7 +67,7 @@ macro_rules! impl_for_primitives {
self.get_and_increment();
}
fn get_and_increment(&self) -> $ty {
fn get_and_increment(&self) -> Self::Raw {
let curr_count = self.seq_count.get();
if curr_count == self.max_val {
@ -104,7 +99,9 @@ impl Default for CcsdsSimpleSeqCountProvider {
}
}
impl SequenceCountProviderCore<u16> for CcsdsSimpleSeqCountProvider {
impl SequenceCountProvider for CcsdsSimpleSeqCountProvider {
type Raw = u16;
const MAX_BIT_WIDTH: usize = core::mem::size_of::<Self::Raw>() * 8;
delegate::delegate! {
to self.provider {
fn get(&self) -> u16;
@ -144,7 +141,10 @@ pub mod stdmod {
}
}
}
impl SequenceCountProviderCore<$ty> for [<SeqCountProviderSync $ty:upper>] {
impl SequenceCountProvider for [<SeqCountProviderSync $ty:upper>] {
type Raw = $ty;
const MAX_BIT_WIDTH: usize = core::mem::size_of::<Self::Raw>() * 8;
fn get(&self) -> $ty {
match self.seq_count.lock() {
Ok(counter) => *counter,
@ -181,7 +181,7 @@ pub mod stdmod {
mod tests {
use crate::seq_count::{
CcsdsSimpleSeqCountProvider, SeqCountProviderSimple, SeqCountProviderSyncU8,
SequenceCountProviderCore,
SequenceCountProvider,
};
use spacepackets::MAX_SEQ_COUNT;