add more error handling
Some checks failed
Rust/cfdp/pipeline/head There was a failure building this commit
Some checks failed
Rust/cfdp/pipeline/head There was a failure building this commit
This commit is contained in:
269
src/source.rs
269
src/source.rs
@@ -12,11 +12,11 @@
|
||||
//! The [SourceHandler::state_machine] will generally perform the following steps after a valid
|
||||
//! put request was received through the [SourceHandler::put_request] method:
|
||||
//!
|
||||
//! 1. Generate the Metadata PDU to be sent to a remote CFDP entity. You can use the
|
||||
//! 1. Generate the Metadata PDU to be sent to a remote CFDP entity. You can use the
|
||||
//! [spacepackets::cfdp::pdu::metadata::MetadataPduReader] to inspect the generated PDU.
|
||||
//! 2. Generate all File Data PDUs to be sent to a remote CFDP entity if applicable (file not
|
||||
//! 2. Generate all File Data PDUs to be sent to a remote CFDP entity if applicable (file not
|
||||
//! empty). The PDU(s) can be inspected using the [spacepackets::cfdp::pdu::file_data::FileDataPdu] reader.
|
||||
//! 3. Generate an EOF PDU to be sent to a remote CFDP entity. The PDU can be inspected using
|
||||
//! 3. Generate an EOF PDU to be sent to a remote CFDP entity. The PDU can be inspected using
|
||||
//! the [spacepackets::cfdp::pdu::eof::EofPdu] reader.
|
||||
//!
|
||||
//! If this is an unacknowledged transfer with no transaction closure, the file transfer will be
|
||||
@@ -24,16 +24,16 @@
|
||||
//!
|
||||
//! ### Unacknowledged transfer with requested closure
|
||||
//!
|
||||
//! 4. A Finished PDU will be awaited, for example one generated using
|
||||
//! 4. A Finished PDU will be awaited, for example one generated using
|
||||
//! [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
|
||||
//!
|
||||
//! ### Acknowledged transfer (*not implemented yet*)
|
||||
//!
|
||||
//! 4. A EOF ACK packet will be awaited, for example one generated using
|
||||
//! 4. A EOF ACK packet will be awaited, for example one generated using
|
||||
//! [spacepackets::cfdp::pdu::ack::AckPdu].
|
||||
//! 5. A Finished PDU will be awaited, for example one generated using
|
||||
//! 5. A Finished PDU will be awaited, for example one generated using
|
||||
//! [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
|
||||
//! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity.
|
||||
//! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity.
|
||||
//! The [spacepackets::cfdp::pdu::finished::FinishedPduReader] can be used to inspect the
|
||||
//! generated PDU.
|
||||
use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error};
|
||||
@@ -60,7 +60,10 @@ use spacepackets::{
|
||||
|
||||
use spacepackets::seq_count::SequenceCountProvider;
|
||||
|
||||
use crate::{DummyPduProvider, GenericSendError, PduProvider};
|
||||
use crate::{
|
||||
time::CountdownProvider, DummyPduProvider, EntityType, GenericSendError, PduProvider,
|
||||
TimerCreatorProvider,
|
||||
};
|
||||
|
||||
use super::{
|
||||
filestore::{FilestoreError, VirtualFilestore},
|
||||
@@ -208,6 +211,8 @@ pub struct SourceHandler<
|
||||
UserFaultHook: UserFaultHookProvider,
|
||||
Vfs: VirtualFilestore,
|
||||
RemoteCfgTable: RemoteEntityConfigProvider,
|
||||
TimerCreator: TimerCreatorProvider<Countdown = Countdown>,
|
||||
Countdown: CountdownProvider,
|
||||
SeqCountProvider: SequenceCountProvider,
|
||||
> {
|
||||
local_cfg: LocalEntityConfig<UserFaultHook>,
|
||||
@@ -223,6 +228,8 @@ pub struct SourceHandler<
|
||||
fparams: FileParams,
|
||||
// PDU configuration is cached so it can be re-used for all PDUs generated for file transfers.
|
||||
pdu_conf: CommonPduConfig,
|
||||
countdown: Option<Countdown>,
|
||||
timer_creator: TimerCreator,
|
||||
seq_count_provider: SeqCountProvider,
|
||||
}
|
||||
|
||||
@@ -231,8 +238,19 @@ impl<
|
||||
UserFaultHook: UserFaultHookProvider,
|
||||
Vfs: VirtualFilestore,
|
||||
RemoteCfgTable: RemoteEntityConfigProvider,
|
||||
CheckTimerCreator: TimerCreatorProvider<Countdown = CheckTimerProvider>,
|
||||
CheckTimerProvider: CountdownProvider,
|
||||
SeqCountProvider: SequenceCountProvider,
|
||||
> SourceHandler<PduSender, UserFaultHook, Vfs, RemoteCfgTable, SeqCountProvider>
|
||||
>
|
||||
SourceHandler<
|
||||
PduSender,
|
||||
UserFaultHook,
|
||||
Vfs,
|
||||
RemoteCfgTable,
|
||||
CheckTimerCreator,
|
||||
CheckTimerProvider,
|
||||
SeqCountProvider,
|
||||
>
|
||||
{
|
||||
/// Creates a new instance of a source handler.
|
||||
///
|
||||
@@ -251,8 +269,12 @@ impl<
|
||||
/// example 2048 or 4096 bytes.
|
||||
/// * `remote_cfg_table` - The [RemoteEntityConfigProvider] used to look up remote
|
||||
/// entities and target specific configuration for file copy operations.
|
||||
/// * `check_timer_creator` - [CheckTimerProviderCreator] used by the CFDP handler to generate
|
||||
/// timers required by various tasks. This allows to use this handler for embedded systems
|
||||
/// where the standard time APIs might not be available.
|
||||
/// * `seq_count_provider` - The [SequenceCountProvider] used to generate the [TransactionId]
|
||||
/// which contains an incrementing counter.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
cfg: LocalEntityConfig<UserFaultHook>,
|
||||
pdu_sender: PduSender,
|
||||
@@ -260,6 +282,7 @@ impl<
|
||||
put_request_cacher: StaticPutRequestCacher,
|
||||
pdu_and_cksum_buf_size: usize,
|
||||
remote_cfg_table: RemoteCfgTable,
|
||||
check_timer_creator: CheckTimerCreator,
|
||||
seq_count_provider: SeqCountProvider,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -273,6 +296,8 @@ impl<
|
||||
tstate: Default::default(),
|
||||
fparams: Default::default(),
|
||||
pdu_conf: Default::default(),
|
||||
countdown: None,
|
||||
timer_creator: check_timer_creator,
|
||||
seq_count_provider,
|
||||
}
|
||||
}
|
||||
@@ -296,9 +321,9 @@ impl<
|
||||
pub fn state_machine(
|
||||
&mut self,
|
||||
cfdp_user: &mut impl CfdpUser,
|
||||
packet_to_insert: Option<&impl PduProvider>,
|
||||
pdu: Option<&impl PduProvider>,
|
||||
) -> Result<u32, SourceError> {
|
||||
if let Some(packet) = packet_to_insert {
|
||||
if let Some(packet) = pdu {
|
||||
self.insert_packet(cfdp_user, packet)?;
|
||||
}
|
||||
match self.state_helper.state {
|
||||
@@ -306,7 +331,7 @@ impl<
|
||||
// TODO: In acknowledged mode, add timer handling.
|
||||
Ok(0)
|
||||
}
|
||||
super::State::Busy => self.fsm_busy(cfdp_user),
|
||||
super::State::Busy => self.fsm_busy(cfdp_user, pdu),
|
||||
super::State::Suspended => {
|
||||
// There is now way to suspend the handler currently anyway.
|
||||
Ok(0)
|
||||
@@ -426,6 +451,34 @@ impl<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This functions models the Cancel.request CFDP primitive and is the recommended way to
|
||||
/// cancel a transaction.
|
||||
///
|
||||
/// This method will cause a Notice of Cancellation at this entity if a transaction is active
|
||||
/// and the passed transaction ID matches the currently active transaction ID. Please note
|
||||
/// that the state machine might still be active because a cancelled transfer might still
|
||||
/// require some packets to be sent to the remote receiver entity.
|
||||
///
|
||||
/// If not unexpected errors occur, this method returns [true] if the transfer was cancelled
|
||||
/// propery and [false] if there is no transaction active or the passed transaction ID and the
|
||||
/// active ID do not match.
|
||||
pub fn cancel_request(&mut self, transaction_id: &TransactionId) -> Result<bool, SourceError> {
|
||||
if self.state_helper.state == super::State::Idle {
|
||||
return Ok(false);
|
||||
}
|
||||
if let Some(active_id) = self.transaction_id() {
|
||||
if active_id == *transaction_id {
|
||||
self.declare_fault(ConditionCode::CancelRequestReceived)?;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub fn transaction_id(&self) -> Option<TransactionId> {
|
||||
self.tstate.as_ref().map(|v| v.transaction_id)
|
||||
}
|
||||
|
||||
fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
|
||||
let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
|
||||
&PduHeader::new_no_file_data(self.pdu_conf, 0),
|
||||
@@ -447,7 +500,11 @@ impl<
|
||||
self.tstate.as_ref().map(|v| v.transmission_mode)
|
||||
}
|
||||
|
||||
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> {
|
||||
fn fsm_busy(
|
||||
&mut self,
|
||||
cfdp_user: &mut impl CfdpUser,
|
||||
pdu: Option<&impl PduProvider>,
|
||||
) -> Result<u32, SourceError> {
|
||||
let mut sent_packets = 0;
|
||||
if self.state_helper.step == TransactionStep::Idle {
|
||||
self.state_helper.step = TransactionStep::TransactionStart;
|
||||
@@ -473,31 +530,7 @@ impl<
|
||||
sent_packets += 1;
|
||||
}
|
||||
if self.state_helper.step == TransactionStep::WaitingForFinished {
|
||||
/*
|
||||
def _handle_wait_for_finish(self):
|
||||
if (
|
||||
self.transmission_mode == TransmissionMode.ACKNOWLEDGED
|
||||
and self.__handle_retransmission()
|
||||
):
|
||||
return
|
||||
if (
|
||||
self._inserted_pdu.pdu is None
|
||||
or self._inserted_pdu.pdu_directive_type is None
|
||||
or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU
|
||||
):
|
||||
if self._params.check_timer is not None:
|
||||
if self._params.check_timer.timed_out():
|
||||
self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED)
|
||||
return
|
||||
finished_pdu = self._inserted_pdu.to_finished_pdu()
|
||||
self._inserted_pdu.pdu = None
|
||||
self._params.finished_params = finished_pdu.finished_params
|
||||
if self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
|
||||
self._prepare_finished_ack_packet(finished_pdu.condition_code)
|
||||
self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED
|
||||
else:
|
||||
self.states.step = TransactionStep.NOTICE_OF_COMPLETION
|
||||
*/
|
||||
self.handle_wait_for_finished_pdu(pdu)?;
|
||||
}
|
||||
if self.state_helper.step == TransactionStep::NoticeOfCompletion {
|
||||
self.notice_of_completion(cfdp_user);
|
||||
@@ -506,11 +539,65 @@ impl<
|
||||
Ok(sent_packets)
|
||||
}
|
||||
|
||||
fn handle_wait_for_finished_pdu(
|
||||
&mut self,
|
||||
packet: Option<&impl PduProvider>,
|
||||
) -> Result<(), SourceError> {
|
||||
if let Some(packet) = packet {
|
||||
if let Some(FileDirectiveType::FinishedPdu) = packet.file_directive_type() {
|
||||
let finished_pdu = FinishedPduReader::new(packet.pdu())?;
|
||||
self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams {
|
||||
condition_code: finished_pdu.condition_code(),
|
||||
delivery_code: finished_pdu.delivery_code(),
|
||||
file_status: finished_pdu.file_status(),
|
||||
});
|
||||
if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged {
|
||||
// TODO: Ack packet handling
|
||||
self.state_helper.step = TransactionStep::NoticeOfCompletion;
|
||||
} else {
|
||||
self.state_helper.step = TransactionStep::NoticeOfCompletion;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
// If we reach this state, countdown is definitely valid instance.
|
||||
if self.countdown.as_ref().unwrap().has_expired() {
|
||||
self.declare_fault(ConditionCode::CheckLimitReached)?;
|
||||
}
|
||||
/*
|
||||
def _handle_wait_for_finish(self):
|
||||
if (
|
||||
self.transmission_mode == TransmissionMode.ACKNOWLEDGED
|
||||
and self.__handle_retransmission()
|
||||
):
|
||||
return
|
||||
if (
|
||||
self._inserted_pdu.pdu is None
|
||||
or self._inserted_pdu.pdu_directive_type is None
|
||||
or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU
|
||||
):
|
||||
if self._params.check_timer is not None:
|
||||
if self._params.check_timer.timed_out():
|
||||
self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED)
|
||||
return
|
||||
finished_pdu = self._inserted_pdu.to_finished_pdu()
|
||||
self._inserted_pdu.pdu = None
|
||||
self._params.finished_params = finished_pdu.finished_params
|
||||
if self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
|
||||
self._prepare_finished_ack_packet(finished_pdu.condition_code)
|
||||
self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED
|
||||
else:
|
||||
self.states.step = TransactionStep.NOTICE_OF_COMPLETION
|
||||
*/
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> {
|
||||
let tstate = self.tstate.as_ref().unwrap();
|
||||
let checksum = self.vfs.calculate_checksum(
|
||||
self.put_request_cacher.source_file().unwrap(),
|
||||
tstate.remote_cfg.default_crc_type,
|
||||
self.fparams.file_size,
|
||||
self.pdu_and_cksum_buffer.get_mut(),
|
||||
)?;
|
||||
self.prepare_and_send_eof_pdu(checksum)?;
|
||||
@@ -520,7 +607,13 @@ impl<
|
||||
}
|
||||
if tstate.transmission_mode == TransmissionMode::Unacknowledged {
|
||||
if tstate.closure_requested {
|
||||
// TODO: Check timer handling.
|
||||
self.countdown = Some(self.timer_creator.create_countdown(
|
||||
crate::TimerContext::CheckLimit {
|
||||
local_id: self.local_cfg.id,
|
||||
remote_id: tstate.remote_cfg.entity_id,
|
||||
entity_type: EntityType::Sending,
|
||||
},
|
||||
));
|
||||
self.state_helper.step = TransactionStep::WaitingForFinished;
|
||||
} else {
|
||||
self.state_helper.step = TransactionStep::NoticeOfCompletion;
|
||||
@@ -874,6 +967,97 @@ impl<
|
||||
&self.local_cfg
|
||||
}
|
||||
|
||||
fn declare_fault(&mut self, cond: ConditionCode) -> Result<(), SourceError> {
|
||||
let fh = self.local_cfg.fault_handler.get_fault_handler(cond);
|
||||
match fh {
|
||||
spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => {
|
||||
if let ControlFlow::Break(_) = self.notice_of_cancellation(cond)? {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => {
|
||||
self.notice_of_suspension();
|
||||
}
|
||||
spacepackets::cfdp::FaultHandlerCode::IgnoreError => (),
|
||||
spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
|
||||
}
|
||||
let tstate = self.tstate.as_ref().unwrap();
|
||||
self.local_cfg.fault_handler.report_fault(
|
||||
tstate.transaction_id,
|
||||
cond,
|
||||
self.fparams.progress,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn notice_of_cancellation(
|
||||
&mut self,
|
||||
condition_code: ConditionCode,
|
||||
) -> Result<ControlFlow<()>, SourceError> {
|
||||
// CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
|
||||
// the EOF (cancel) PDU must result in abandonment of the transaction.
|
||||
if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof {
|
||||
if cond_code_eof != ConditionCode::NoError {
|
||||
let tstate = self.tstate.as_ref().unwrap();
|
||||
// Still call the abandonment callback to ensure the fault is logged.
|
||||
self.local_cfg
|
||||
.fault_handler
|
||||
.user_hook
|
||||
.get_mut()
|
||||
.abandoned_cb(tstate.transaction_id, cond_code_eof, self.fparams.progress);
|
||||
self.abandon_transaction();
|
||||
return Ok(ControlFlow::Break(()));
|
||||
}
|
||||
}
|
||||
|
||||
let tstate = self.tstate.as_mut().unwrap();
|
||||
tstate.cond_code_eof = Some(condition_code);
|
||||
// As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply
|
||||
// the checksum for the file copy progress sent so far.
|
||||
let checksum = self.vfs.calculate_checksum(
|
||||
self.put_request_cacher.source_file().unwrap(),
|
||||
tstate.remote_cfg.default_crc_type,
|
||||
self.fparams.progress,
|
||||
self.pdu_and_cksum_buffer.get_mut(),
|
||||
)?;
|
||||
self.prepare_and_send_eof_pdu(checksum)?;
|
||||
Ok(ControlFlow::Continue(()))
|
||||
}
|
||||
|
||||
fn notice_of_suspension(&mut self) {}
|
||||
|
||||
fn abandon_transaction(&mut self) {
|
||||
// I guess an abandoned transaction just stops whatever the handler is doing and resets
|
||||
// it to a clean state.. The implementation for this is quite easy.
|
||||
self.reset();
|
||||
}
|
||||
|
||||
/*
|
||||
def _notice_of_cancellation(self, condition_code: ConditionCode) -> bool:
|
||||
"""Returns whether the fault declaration handler can returns prematurely."""
|
||||
# CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
|
||||
# the EOF (cancel) PDU must result in abandonment of the transaction.
|
||||
if (
|
||||
self._params.cond_code_eof is not None
|
||||
and self._params.cond_code_eof != ConditionCode.NO_ERROR
|
||||
):
|
||||
assert self._params.transaction_id is not None
|
||||
# We still call the abandonment callback to ensure the fault is logged.
|
||||
self.cfg.default_fault_handlers.abandoned_cb(
|
||||
self._params.transaction_id,
|
||||
self._params.cond_code_eof,
|
||||
self._params.fp.progress,
|
||||
)
|
||||
self._abandon_transaction()
|
||||
return False
|
||||
self._params.cond_code_eof = condition_code
|
||||
# As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply
|
||||
# the checksum for the file copy progress sent so far.
|
||||
self._prepare_eof_pdu(self._checksum_calculation(self._params.fp.progress))
|
||||
self.states.step = TransactionStep.SENDING_EOF
|
||||
return True
|
||||
*/
|
||||
|
||||
/// This function is public to allow completely resetting the handler, but it is explicitely
|
||||
/// discouraged to do this. CFDP has mechanism to detect issues and errors on itself.
|
||||
/// Resetting the handler might interfere with these mechanisms and lead to unexpected
|
||||
@@ -882,6 +1066,7 @@ impl<
|
||||
self.state_helper = Default::default();
|
||||
self.tstate = None;
|
||||
self.fparams = Default::default();
|
||||
self.countdown = None;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -907,7 +1092,8 @@ mod tests {
|
||||
filestore::NativeFilestore,
|
||||
request::PutRequestOwned,
|
||||
tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler},
|
||||
FaultHandler, IndicationConfig, PduRawWithInfo, StdRemoteEntityConfigProvider, CRC_32,
|
||||
FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown,
|
||||
StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32,
|
||||
};
|
||||
use spacepackets::seq_count::SeqCountProviderSimple;
|
||||
|
||||
@@ -927,6 +1113,8 @@ mod tests {
|
||||
TestFaultHandler,
|
||||
NativeFilestore,
|
||||
StdRemoteEntityConfigProvider,
|
||||
StdTimerCreator,
|
||||
StdCountdown,
|
||||
SeqCountProviderSimple<u16>,
|
||||
>;
|
||||
|
||||
@@ -967,6 +1155,7 @@ mod tests {
|
||||
max_packet_len,
|
||||
crc_on_transmission_by_default,
|
||||
),
|
||||
StdTimerCreator::new(1),
|
||||
SeqCountProviderSimple::default(),
|
||||
),
|
||||
srcfile_handle,
|
||||
|
||||
Reference in New Issue
Block a user