CFDP extracted to library #201

Closed
muellerr wants to merge 18 commits from continue-cfsp-source-handler into main
3 changed files with 506 additions and 241 deletions
Showing only changes of commit 7649a73740 - Show all commits

View File

@ -6,9 +6,9 @@ use super::{
filestore::{FilestoreError, NativeFilestore, VirtualFilestore},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
CheckTimerProviderCreator, CountdownProvider, EntityType, LocalEntityConfig, PacketInfo,
PacketTarget, RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCheckTimer,
StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext, TransactionId,
TransactionStep,
PacketTarget, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State,
StdCheckTimer, StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext,
TransactionId, UserFaultHookProvider,
};
use smallvec::SmallVec;
use spacepackets::{
@ -42,6 +42,18 @@ enum CompletionDisposition {
Cancelled = 1,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum TransactionStep {
Idle = 0,
TransactionStart = 1,
ReceivingFileDataPdus = 2,
ReceivingFileDataPdusWithCheckLimitHandling = 3,
SendingAckPdu = 4,
TransferCompletion = 5,
SendingFinishedPdu = 6,
}
#[derive(Debug)]
struct TransferState<CheckTimer: CountdownProvider> {
transaction_id: Option<TransactionId>,
@ -140,9 +152,12 @@ impl<CheckTimer: CountdownProvider> TransactionParams<CheckTimer> {
pub enum DestError {
/// File directive expected, but none specified
#[error("expected file directive")]
DirectiveExpected,
#[error("can not process packet type {0:?}")]
CantProcessPacketType(FileDirectiveType),
DirectiveFieldEmpty,
#[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
CantProcessPacketType {
pdu_type: PduType,
directive_type: Option<FileDirectiveType>,
},
#[error("can not process file data PDUs in current state")]
WrongStateForFileDataAndEof,
// Received new metadata PDU while being already being busy with a file transfer.
@ -168,15 +183,6 @@ pub enum DestError {
NoRemoteCfgFound(UnsignedByteField),
}
pub trait CfdpPacketSender: Send {
fn send_pdu(
&self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), PduError>;
}
/// This is the primary CFDP destination handler. It models the CFDP destination entity, which is
/// primarily responsible for receiving files sent from another CFDP entity. It performs the
/// reception side of File Copy Operations.
@ -192,13 +198,14 @@ pub trait CfdpPacketSender: Send {
/// user and passed as a constructor parameter. The number of generated packets is returned
/// by the state machine call.
pub struct DestinationHandler<
PduSender: CfdpPacketSender,
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
CheckTimerCreator: CheckTimerProviderCreator<CheckTimer = CheckTimerProvider>,
CheckTimerProvider: CountdownProvider,
> {
local_cfg: LocalEntityConfig,
local_cfg: LocalEntityConfig<UserFaultHook>,
step: TransactionStep,
state: State,
tparams: TransactionParams<CheckTimerProvider>,
@ -210,8 +217,9 @@ pub struct DestinationHandler<
}
#[cfg(feature = "std")]
pub type StdDestinationHandler<PduSender> = DestinationHandler<
pub type StdDestinationHandler<PduSender, UserFaultHook> = DestinationHandler<
PduSender,
UserFaultHook,
NativeFilestore,
StdRemoteEntityConfigProvider,
StdCheckTimerCreator,
@ -219,12 +227,21 @@ pub type StdDestinationHandler<PduSender> = DestinationHandler<
>;
impl<
PduSender: CfdpPacketSender,
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
CheckTimerCreator: CheckTimerProviderCreator<CheckTimer = CheckTimerProvider>,
CheckTimerProvider: CountdownProvider,
> DestinationHandler<PduSender, Vfs, RemoteCfgTable, CheckTimerCreator, CheckTimerProvider>
>
DestinationHandler<
PduSender,
UserFaultHook,
Vfs,
RemoteCfgTable,
CheckTimerCreator,
CheckTimerProvider,
>
{
/// Constructs a new destination handler.
///
@ -246,9 +263,9 @@ impl<
/// * `check_timer_creator` - This is used by the CFDP handler to generate timers required
/// by various tasks.
pub fn new(
local_cfg: LocalEntityConfig,
local_cfg: LocalEntityConfig<UserFaultHook>,
max_packet_len: usize,
packet_sender: PduSender,
pdu_sender: PduSender,
vfs: Vfs,
remote_cfg_table: RemoteCfgTable,
check_timer_creator: CheckTimerCreator,
@ -259,7 +276,7 @@ impl<
state: State::Idle,
tparams: Default::default(),
packet_buf: alloc::vec![0; max_packet_len],
pdu_sender: packet_sender,
pdu_sender,
vfs,
remote_cfg_table,
check_timer_creator,
@ -272,6 +289,8 @@ impl<
/// The state machine should either be called if a packet with the appropriate destination ID
/// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example
/// checking for timeouts or missed file segments.
///
/// The function returns the number of sent PDU packets on success.
pub fn state_machine(
&mut self,
cfdp_user: &mut impl CfdpUser,
@ -308,14 +327,15 @@ impl<
if packet_info.target() != PacketTarget::DestEntity {
// Unwrap is okay here, a PacketInfo for a file data PDU should always have the
// destination as the target.
return Err(DestError::CantProcessPacketType(
packet_info.pdu_directive().unwrap(),
));
return Err(DestError::CantProcessPacketType {
pdu_type: packet_info.pdu_type(),
directive_type: packet_info.pdu_directive(),
});
}
match packet_info.pdu_type {
PduType::FileDirective => {
if packet_info.pdu_directive.is_none() {
return Err(DestError::DirectiveExpected);
return Err(DestError::DirectiveFieldEmpty);
}
self.handle_file_directive(
cfdp_user,
@ -338,12 +358,13 @@ impl<
FileDirectiveType::FinishedPdu
| FileDirectiveType::NakPdu
| FileDirectiveType::KeepAlivePdu => {
return Err(DestError::CantProcessPacketType(pdu_directive));
return Err(DestError::CantProcessPacketType {
pdu_type: PduType::FileDirective,
directive_type: Some(pdu_directive),
});
}
FileDirectiveType::AckPdu => {
todo!(
"check whether ACK pdu handling is applicable by checking the acked directive field"
)
todo!("acknowledged mode not implemented yet")
}
FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?,
FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?,
@ -730,7 +751,7 @@ impl<
let progress = self.tstate().progress;
let fh_code = self
.local_cfg
.default_fault_handler
.fault_handler
.get_fault_handler(condition_code);
match fh_code {
FaultHandlerCode::NoticeOfCancellation => {
@ -741,7 +762,7 @@ impl<
FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
}
self.local_cfg
.default_fault_handler
.fault_handler
.report_fault(transaction_id, condition_code, progress)
}
@ -804,15 +825,12 @@ impl<
#[cfg(test)]
mod tests {
use core::{
cell::{Cell, RefCell},
sync::atomic::AtomicBool,
};
use core::{cell::Cell, sync::atomic::AtomicBool};
use std::fs;
#[allow(unused_imports)]
use std::println;
use std::{fs, sync::Mutex};
use alloc::{boxed::Box, collections::VecDeque, string::String, sync::Arc, vec::Vec};
use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec};
use rand::Rng;
use spacepackets::{
cfdp::{
@ -824,9 +842,11 @@ mod tests {
};
use crate::cfdp::{
filestore::NativeFilestore, user::OwnedMetadataRecvdParams, CheckTimerProviderCreator,
CountdownProvider, DefaultFaultHandler, IndicationConfig, RemoteEntityConfig,
StdRemoteEntityConfigProvider, UserFaultHandler, CRC_32,
filestore::NativeFilestore,
tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestFaultHandler},
user::OwnedMetadataRecvdParams,
CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig,
RemoteEntityConfig, StdRemoteEntityConfigProvider, CRC_32,
};
use super::*;
@ -840,41 +860,6 @@ mod tests {
pub length: usize,
}
struct SentPdu {
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: Vec<u8>,
}
#[derive(Default)]
struct TestCfdpSender {
packet_queue: RefCell<VecDeque<SentPdu>>,
}
impl CfdpPacketSender for TestCfdpSender {
fn send_pdu(
&self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), PduError> {
self.packet_queue.borrow_mut().push_back(SentPdu {
pdu_type,
file_directive_type,
raw_pdu: raw_pdu.to_vec(),
});
Ok(())
}
}
impl TestCfdpSender {
pub fn retrieve_next_pdu(&self) -> Option<SentPdu> {
self.packet_queue.borrow_mut().pop_front()
}
pub fn queue_empty(&self) -> bool {
self.packet_queue.borrow_mut().is_empty()
}
}
#[derive(Default)]
struct TestCfdpUser {
next_expected_seq_num: u64,
@ -1000,81 +985,6 @@ mod tests {
}
}
#[derive(Default, Clone)]
struct TestFaultHandler {
notice_of_suspension_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
notice_of_cancellation_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
abandoned_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
ignored_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
}
impl UserFaultHandler for TestFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.notice_of_suspension_queue.lock().unwrap().push_back((
transaction_id,
cond,
progress,
))
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.notice_of_cancellation_queue
.lock()
.unwrap()
.push_back((transaction_id, cond, progress))
}
fn abandoned_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.abandoned_queue
.lock()
.unwrap()
.push_back((transaction_id, cond, progress))
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
self.ignored_queue
.lock()
.unwrap()
.push_back((transaction_id, cond, progress))
}
}
impl TestFaultHandler {
fn suspension_queue_empty(&self) -> bool {
self.notice_of_suspension_queue.lock().unwrap().is_empty()
}
fn cancellation_queue_empty(&self) -> bool {
self.notice_of_cancellation_queue.lock().unwrap().is_empty()
}
fn ignored_queue_empty(&self) -> bool {
self.ignored_queue.lock().unwrap().is_empty()
}
fn abandoned_queue_empty(&self) -> bool {
self.abandoned_queue.lock().unwrap().is_empty()
}
fn all_queues_empty(&self) -> bool {
self.suspension_queue_empty()
&& self.cancellation_queue_empty()
&& self.ignored_queue_empty()
&& self.abandoned_queue_empty()
}
}
#[derive(Debug)]
struct TestCheckTimer {
counter: Cell<u32>,
@ -1128,6 +1038,7 @@ mod tests {
type TestDestHandler = DestinationHandler<
TestCfdpSender,
TestFaultHandler,
NativeFilestore,
StdRemoteEntityConfigProvider,
TestCheckTimerCreator,
@ -1177,6 +1088,14 @@ mod tests {
&self.dest_path
}
fn all_fault_queues_empty(&self) -> bool {
self.handler
.local_cfg
.user_fault_hook()
.borrow()
.all_queues_empty()
}
#[allow(dead_code)]
fn indication_cfg_mut(&mut self) -> &mut IndicationConfig {
&mut self.handler.local_cfg.indication_cfg
@ -1295,36 +1214,15 @@ mod tests {
)
}
fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider {
let mut table = StdRemoteEntityConfigProvider::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
UnsignedByteFieldU16::new(1).into(),
1024,
1024,
true,
true,
TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
table.add_config(&remote_entity_cfg);
table
}
fn default_dest_handler(
test_fault_handler: TestFaultHandler,
test_packet_sender: TestCfdpSender,
check_timer_expired: Arc<AtomicBool>,
) -> DestinationHandler<
TestCfdpSender,
NativeFilestore,
StdRemoteEntityConfigProvider,
TestCheckTimerCreator,
TestCheckTimer,
> {
) -> TestDestHandler {
let local_entity_cfg = LocalEntityConfig {
id: REMOTE_ID.into(),
indication_cfg: IndicationConfig::default(),
default_fault_handler: DefaultFaultHandler::new(Box::new(test_fault_handler)),
fault_handler: FaultHandler::new(test_fault_handler),
};
DestinationHandler::new(
local_entity_cfg,
@ -1390,15 +1288,20 @@ mod tests {
fn test_basic() {
let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default();
let dest_handler = default_dest_handler(fault_handler.clone(), test_sender, Arc::default());
let dest_handler = default_dest_handler(fault_handler, test_sender, Arc::default());
assert!(dest_handler.transmission_mode().is_none());
assert!(fault_handler.all_queues_empty());
assert!(dest_handler
.local_cfg
.fault_handler
.user_hook
.borrow()
.all_queues_empty());
}
#[test]
fn test_empty_file_transfer_not_acked_no_closure() {
let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler.clone(), false);
let mut testbench = DestHandlerTester::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(0);
testbench
.generic_transfer_init(&mut test_user, 0)
@ -1407,7 +1310,7 @@ mod tests {
testbench
.generic_eof_no_error(&mut test_user, Vec::new())
.expect("EOF no error insertion failed");
assert!(fault_handler.all_queues_empty());
assert!(testbench.all_fault_queues_empty());
assert!(testbench.handler.pdu_sender.queue_empty());
testbench.state_check(State::Idle, TransactionStep::Idle);
}
@ -1419,7 +1322,7 @@ mod tests {
let file_size = file_data.len() as u64;
let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler.clone(), false);
let mut testbench = DestHandlerTester::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size);
testbench
.generic_transfer_init(&mut test_user, file_size)
@ -1431,7 +1334,7 @@ mod tests {
testbench
.generic_eof_no_error(&mut test_user, file_data.to_vec())
.expect("EOF no error insertion failed");
assert!(fault_handler.all_queues_empty());
assert!(testbench.all_fault_queues_empty());
assert!(testbench.handler.pdu_sender.queue_empty());
testbench.state_check(State::Idle, TransactionStep::Idle);
}
@ -1445,7 +1348,7 @@ mod tests {
let segment_len = 256;
let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler.clone(), false);
let mut testbench = DestHandlerTester::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size);
testbench
.generic_transfer_init(&mut test_user, file_size)
@ -1464,7 +1367,7 @@ mod tests {
testbench
.generic_eof_no_error(&mut test_user, random_data.to_vec())
.expect("EOF no error insertion failed");
assert!(fault_handler.all_queues_empty());
assert!(testbench.all_fault_queues_empty());
assert!(testbench.handler.pdu_sender.queue_empty());
testbench.state_check(State::Idle, TransactionStep::Idle);
}
@ -1478,7 +1381,7 @@ mod tests {
let segment_len = 256;
let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler.clone(), false);
let mut testbench = DestHandlerTester::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size);
let transaction_id = testbench
.generic_transfer_init(&mut test_user, file_size)
@ -1507,10 +1410,10 @@ mod tests {
.handler
.state_machine(&mut test_user, None)
.expect("fsm failure");
let fault_handler = testbench.handler.local_cfg.fault_handler.user_hook.borrow();
let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
assert_eq!(ignored_queue.len(), 1);
let cancelled = *ignored_queue.front().unwrap();
assert_eq!(fault_handler.ignored_queue.len(), 1);
let cancelled = fault_handler.ignored_queue.front().unwrap();
assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
assert_eq!(cancelled.2, segment_len as u64);
@ -1527,7 +1430,7 @@ mod tests {
let segment_len = 256;
let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler.clone(), false);
let mut testbench = DestHandlerTester::new(fault_handler, false);
let mut test_user = testbench.test_user_from_cached_paths(file_size);
let transaction_id = testbench
.generic_transfer_init(&mut test_user, file_size)
@ -1560,27 +1463,25 @@ mod tests {
.expect("fsm error");
testbench.state_check(State::Idle, TransactionStep::Idle);
assert!(fault_handler
.notice_of_suspension_queue
.lock()
.unwrap()
.is_empty());
let fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow();
let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
assert!(fault_hook.notice_of_suspension_queue.is_empty());
let ignored_queue = &fault_hook.ignored_queue;
assert_eq!(ignored_queue.len(), 1);
let cancelled = *ignored_queue.front().unwrap();
let cancelled = ignored_queue.front().unwrap();
assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
assert_eq!(cancelled.2, segment_len as u64);
let cancelled_queue = fault_handler.notice_of_cancellation_queue.lock().unwrap();
let fault_hook = testbench.handler.local_cfg.user_fault_hook().borrow();
let cancelled_queue = &fault_hook.notice_of_cancellation_queue;
assert_eq!(cancelled_queue.len(), 1);
let cancelled = *cancelled_queue.front().unwrap();
assert_eq!(cancelled.0, transaction_id);
assert_eq!(cancelled.1, ConditionCode::CheckLimitReached);
assert_eq!(cancelled.2, segment_len as u64);
drop(cancelled_queue);
drop(fault_hook);
assert!(testbench.handler.pdu_sender.queue_empty());
@ -1610,7 +1511,7 @@ mod tests {
#[test]
fn test_file_transfer_with_closure() {
let fault_handler = TestFaultHandler::default();
let mut testbench = DestHandlerTester::new(fault_handler.clone(), true);
let mut testbench = DestHandlerTester::new(fault_handler, true);
let mut test_user = testbench.test_user_from_cached_paths(0);
testbench
.generic_transfer_init(&mut test_user, 0)
@ -1620,7 +1521,7 @@ mod tests {
.generic_eof_no_error(&mut test_user, Vec::new())
.expect("EOF no error insertion failed");
assert_eq!(sent_packets, 1);
assert!(fault_handler.all_queues_empty());
assert!(testbench.all_fault_queues_empty());
// The Finished PDU was sent, so the state machine is done.
testbench.state_check(State::Idle, TransactionStep::Idle);
assert!(!testbench.handler.pdu_sender.queue_empty());

View File

@ -12,8 +12,6 @@ use spacepackets::{
util::{UnsignedByteField, UnsignedEnum},
};
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
@ -311,9 +309,9 @@ impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider {
/// implement some CFDP features like fault handler logging, which would not be possible
/// generically otherwise.
///
/// For each error reported by the [DefaultFaultHandler], the appropriate fault handler callback
/// For each error reported by the [FaultHandler], the appropriate fault handler callback
/// will be called depending on the [FaultHandlerCode].
pub trait UserFaultHandler {
pub trait UserFaultHookProvider {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
@ -333,6 +331,37 @@ pub trait UserFaultHandler {
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64);
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
struct DummyFaultHook {}
impl UserFaultHookProvider for DummyFaultHook {
fn notice_of_suspension_cb(
&mut self,
_transaction_id: TransactionId,
_cond: ConditionCode,
_progress: u64,
) {
}
fn notice_of_cancellation_cb(
&mut self,
_transaction_id: TransactionId,
_cond: ConditionCode,
_progress: u64,
) {
}
fn abandoned_cb(
&mut self,
_transaction_id: TransactionId,
_cond: ConditionCode,
_progress: u64,
) {
}
fn ignore_cb(&mut self, _transaction_id: TransactionId, _cond: ConditionCode, _progress: u64) {}
}
/// This structure is used to implement the fault handling as specified in chapter 4.8 of the CFDP
/// standard.
///
@ -353,14 +382,14 @@ pub trait UserFaultHandler {
/// These defaults can be overriden by using the [Self::set_fault_handler] method.
/// Please note that in any case, fault handler overrides can be specified by the sending CFDP
/// entity.
pub struct DefaultFaultHandler {
pub struct FaultHandler<UserHandler: UserFaultHookProvider> {
handler_array: [FaultHandlerCode; 10],
// Could also change the user fault handler trait to have non mutable methods, but that limits
// flexbility on the user side..
user_fault_handler: RefCell<Box<dyn UserFaultHandler + Send>>,
pub user_hook: RefCell<UserHandler>,
}
impl DefaultFaultHandler {
impl<UserHandler: UserFaultHookProvider> FaultHandler<UserHandler> {
fn condition_code_to_array_index(conditon_code: ConditionCode) -> Option<usize> {
Some(match conditon_code {
ConditionCode::PositiveAckLimitReached => 0,
@ -389,7 +418,7 @@ impl DefaultFaultHandler {
self.handler_array[array_idx.unwrap()] = fault_handler;
}
pub fn new(user_fault_handler: Box<dyn UserFaultHandler + Send>) -> Self {
pub fn new(user_fault_handler: UserHandler) -> Self {
let mut init_array = [FaultHandlerCode::NoticeOfCancellation; 10];
init_array
[Self::condition_code_to_array_index(ConditionCode::FileChecksumFailure).unwrap()] =
@ -398,7 +427,7 @@ impl DefaultFaultHandler {
.unwrap()] = FaultHandlerCode::IgnoreError;
Self {
handler_array: init_array,
user_fault_handler: RefCell::new(user_fault_handler),
user_hook: RefCell::new(user_fault_handler),
}
}
@ -421,7 +450,7 @@ impl DefaultFaultHandler {
return FaultHandlerCode::IgnoreError;
}
let fh_code = self.handler_array[array_idx.unwrap()];
let mut handler_mut = self.user_fault_handler.borrow_mut();
let mut handler_mut = self.user_hook.borrow_mut();
match fh_code {
FaultHandlerCode::NoticeOfCancellation => {
handler_mut.notice_of_cancellation_cb(transaction_id, condition, progress);
@ -462,10 +491,29 @@ impl Default for IndicationConfig {
}
}
pub struct LocalEntityConfig {
pub struct LocalEntityConfig<UserFaultHook: UserFaultHookProvider> {
pub id: UnsignedByteField,
pub indication_cfg: IndicationConfig,
pub default_fault_handler: DefaultFaultHandler,
pub fault_handler: FaultHandler<UserFaultHook>,
}
impl<UserFaultHook: UserFaultHookProvider> LocalEntityConfig<UserFaultHook> {
pub fn user_fault_hook_mut(&mut self) -> &mut RefCell<UserFaultHook> {
&mut self.fault_handler.user_hook
}
pub fn user_fault_hook(&self) -> &RefCell<UserFaultHook> {
&self.fault_handler.user_hook
}
}
pub trait PduSendProvider {
fn send_pdu(
&self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), PduError>;
}
/// The CFDP transaction ID of a CFDP transaction consists of the source entity ID and the sequence
@ -505,18 +553,6 @@ impl PartialEq for TransactionId {
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum TransactionStep {
Idle = 0,
TransactionStart = 1,
ReceivingFileDataPdus = 2,
ReceivingFileDataPdusWithCheckLimitHandling = 3,
SendingAckPdu = 4,
TransferCompletion = 5,
SendingFinishedPdu = 6,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum State {
@ -627,21 +663,147 @@ impl<'raw> PacketInfo<'raw> {
}
#[cfg(test)]
mod tests {
use spacepackets::cfdp::{
pub(crate) mod tests {
use core::cell::RefCell;
use alloc::{collections::VecDeque, vec::Vec};
use spacepackets::{
cfdp::{
lv::Lv,
pdu::{
eof::EofPdu,
file_data::FileDataPdu,
metadata::{MetadataGenericParams, MetadataPduCreator},
CommonPduConfig, FileDirectiveType, PduHeader, WritablePduPacket,
CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
},
PduType,
ChecksumType, ConditionCode, PduType, TransmissionMode,
},
util::UnsignedByteFieldU16,
};
use crate::cfdp::PacketTarget;
use super::PacketInfo;
use super::{
PacketInfo, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider,
StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider,
};
#[derive(Default)]
pub(crate) struct TestFaultHandler {
pub notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub abandoned_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
pub ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
}
impl UserFaultHookProvider for TestFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.notice_of_suspension_queue
.push_back((transaction_id, cond, progress))
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.notice_of_cancellation_queue
.push_back((transaction_id, cond, progress))
}
fn abandoned_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
self.abandoned_queue
.push_back((transaction_id, cond, progress))
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
self.ignored_queue
.push_back((transaction_id, cond, progress))
}
}
impl TestFaultHandler {
pub(crate) fn suspension_queue_empty(&self) -> bool {
self.notice_of_suspension_queue.is_empty()
}
pub(crate) fn cancellation_queue_empty(&self) -> bool {
self.notice_of_cancellation_queue.is_empty()
}
pub(crate) fn ignored_queue_empty(&self) -> bool {
self.ignored_queue.is_empty()
}
pub(crate) fn abandoned_queue_empty(&self) -> bool {
self.abandoned_queue.is_empty()
}
pub(crate) fn all_queues_empty(&self) -> bool {
self.suspension_queue_empty()
&& self.cancellation_queue_empty()
&& self.ignored_queue_empty()
&& self.abandoned_queue_empty()
}
}
pub struct SentPdu {
pub pdu_type: PduType,
pub file_directive_type: Option<FileDirectiveType>,
pub raw_pdu: Vec<u8>,
}
#[derive(Default)]
pub struct TestCfdpSender {
pub packet_queue: RefCell<VecDeque<SentPdu>>,
}
impl PduSendProvider for TestCfdpSender {
fn send_pdu(
&self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), PduError> {
self.packet_queue.borrow_mut().push_back(SentPdu {
pdu_type,
file_directive_type,
raw_pdu: raw_pdu.to_vec(),
});
Ok(())
}
}
impl TestCfdpSender {
pub fn retrieve_next_pdu(&self) -> Option<SentPdu> {
self.packet_queue.borrow_mut().pop_front()
}
pub fn queue_empty(&self) -> bool {
self.packet_queue.borrow_mut().is_empty()
}
}
pub fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider {
let mut table = StdRemoteEntityConfigProvider::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
UnsignedByteFieldU16::new(1).into(),
1024,
1024,
true,
true,
TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
table.add_config(&remote_entity_cfg);
table
}
fn generic_pdu_header() -> PduHeader {
let pdu_conf = CommonPduConfig::default();

View File

@ -1,15 +1,217 @@
#![allow(dead_code)]
use spacepackets::util::UnsignedByteField;
use spacepackets::cfdp::{pdu::FileDirectiveType, PduType};
pub struct SourceHandler {
id: UnsignedByteField,
use super::{
filestore::VirtualFilestore, user::CfdpUser, LocalEntityConfig, PacketInfo, PacketTarget,
PduSendProvider, RemoteEntityConfigProvider, UserFaultHookProvider,
};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum TransactionStep {
Idle = 0,
TransactionStart = 1,
SendingMetadata = 3,
SendingFileData = 4,
/// Re-transmitting missing packets in acknowledged mode6
Retransmitting = 5,
SendingEof = 6,
WaitingForEofAck = 7,
WaitingForFinished = 8,
SendingAckOfFinished = 9,
NoticeOfCompletion = 10,
}
impl SourceHandler {
pub fn new(id: impl Into<UnsignedByteField>) -> Self {
Self { id: id.into() }
pub struct FileParams {
pub progress: usize,
pub segment_len: usize,
pub crc32: Option<[u8; 4]>,
pub metadata_only: bool,
pub file_size: usize,
pub no_eof: bool,
}
pub struct StateHelper {
state: super::State,
step: TransactionStep,
num_packets_ready: u32,
}
impl Default for StateHelper {
fn default() -> Self {
Self {
state: super::State::Idle,
step: TransactionStep::Idle,
num_packets_ready: 0,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum SourceError {
#[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
CantProcessPacketType {
pdu_type: PduType,
directive_type: Option<FileDirectiveType>,
},
#[error("unexpected file data PDU")]
UnexpectedFileDataPdu,
}
pub struct SourceHandler<
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
> {
local_cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
remote_cfg_table: RemoteCfgTable,
vfs: Vfs,
state_helper: StateHelper,
}
impl<
PduSender: PduSendProvider,
UserFaultHook: UserFaultHookProvider,
Vfs: VirtualFilestore,
RemoteCfgTable: RemoteEntityConfigProvider,
> SourceHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable>
{
pub fn new(
cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
vfs: Vfs,
remote_cfg_table: RemoteCfgTable,
) -> Self {
Self {
local_cfg: cfg,
remote_cfg_table,
vfs,
pdu_sender,
state_helper: Default::default(),
}
}
/// This is the core function to drive the source handler. It is also used to insert
/// packets into the source handler.
///
/// The state machine should either be called if a packet with the appropriate destination ID
/// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example
/// checking for timeouts or missed file segments.
///
/// The function returns the number of sent PDU packets on success.
pub fn state_machine(
&mut self,
cfdp_user: &mut impl CfdpUser,
packet_to_insert: Option<&PacketInfo>,
) -> Result<u32, SourceError> {
if let Some(packet) = packet_to_insert {
self.insert_packet(cfdp_user, packet)?;
}
match self.state_helper.state {
super::State::Idle => todo!(),
super::State::Busy => self.fsm_busy(cfdp_user),
super::State::Suspended => todo!(),
}
}
fn insert_packet(
&mut self,
cfdp_user: &mut impl CfdpUser,
packet_info: &PacketInfo,
) -> Result<(), SourceError> {
if packet_info.target() != PacketTarget::SourceEntity {
// Unwrap is okay here, a PacketInfo for a file data PDU should always have the
// destination as the target.
return Err(SourceError::CantProcessPacketType {
pdu_type: packet_info.pdu_type(),
directive_type: packet_info.pdu_directive(),
});
}
if packet_info.pdu_type() == PduType::FileData {
// The [PacketInfo] API should ensure that file data PDUs can not be passed
// into a source entity, so this should never happen.
return Err(SourceError::UnexpectedFileDataPdu);
}
// Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is
// always a valid value.
match packet_info
.pdu_directive()
.expect("PDU directive type unexpectedly not set")
{
FileDirectiveType::FinishedPdu => self.handle_finished_pdu(),
FileDirectiveType::NakPdu => self.handle_nak_pdu(),
FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
FileDirectiveType::AckPdu => todo!("acknowledged mode not implemented yet"),
FileDirectiveType::EofPdu
| FileDirectiveType::PromptPdu
| FileDirectiveType::MetadataPdu => {
return Err(SourceError::CantProcessPacketType {
pdu_type: packet_info.pdu_type(),
directive_type: packet_info.pdu_directive(),
});
}
}
Ok(())
}
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> {
Ok(0)
}
fn handle_finished_pdu(&mut self) {}
fn handle_nak_pdu(&mut self) {}
fn handle_keep_alive_pdu(&mut self) {}
}
#[cfg(test)]
mod tests {}
mod tests {
use alloc::sync::Arc;
use spacepackets::util::UnsignedByteFieldU16;
use super::*;
use crate::cfdp::{
filestore::NativeFilestore,
tests::{basic_remote_cfg_table, TestCfdpSender, TestFaultHandler},
FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider,
};
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
type TestSourceHandler = SourceHandler<
TestCfdpSender,
TestFaultHandler,
NativeFilestore,
StdRemoteEntityConfigProvider,
>;
fn default_source_handler(
test_fault_handler: TestFaultHandler,
test_packet_sender: TestCfdpSender,
) -> TestSourceHandler {
let local_entity_cfg = LocalEntityConfig {
id: REMOTE_ID.into(),
indication_cfg: IndicationConfig::default(),
fault_handler: FaultHandler::new(test_fault_handler),
};
SourceHandler::new(
local_entity_cfg,
test_packet_sender,
NativeFilestore::default(),
basic_remote_cfg_table(),
// TestCheckTimerCreator::new(check_timer_expired),
)
}
#[test]
fn test_basic() {
let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default();
let source_handler = default_source_handler(fault_handler, test_sender);
// assert!(dest_handler.transmission_mode().is_none());
// assert!(fault_handler.all_queues_empty());
}
}