CFDP extracted to library #201

Closed
muellerr wants to merge 18 commits from continue-cfsp-source-handler into main
Showing only changes of commit 643ce9ace1 - Show all commits

View File

@ -6,11 +6,12 @@ use spacepackets::{
pdu::{
eof::EofPdu,
file_data::FileDataPduCreatorWithReservedDatafield,
finished::{DeliveryCode, FileStatus, FinishedPduReader},
metadata::{MetadataGenericParams, MetadataPduCreator},
CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
},
ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl,
NULL_CHECKSUM_U32,
TransmissionMode,
},
util::{UnsignedByteField, UnsignedEnum},
ByteConversionError,
@ -21,9 +22,9 @@ use crate::seq_count::SequenceCountProvider;
use super::{
filestore::{FilestoreError, VirtualFilestore},
request::{ReadablePutRequest, StaticPutRequestCacher},
user::CfdpUser,
user::{CfdpUser, TransactionFinishedParams},
LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig,
RemoteEntityConfigProvider, TransactionId, UserFaultHookProvider,
RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider,
};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -38,7 +39,7 @@ pub enum TransactionStep {
SendingEof = 6,
WaitingForEofAck = 7,
WaitingForFinished = 8,
SendingAckOfFinished = 9,
// SendingAckOfFinished = 9,
NoticeOfCompletion = 10,
}
@ -46,36 +47,33 @@ pub enum TransactionStep {
pub struct FileParams {
pub progress: u64,
pub segment_len: u64,
pub crc32: [u8; 4],
pub crc32: u32,
pub metadata_only: bool,
pub file_size: u64,
pub empty_file: bool,
}
impl FileParams {
pub fn reset(&mut self) {
self.progress = 0;
self.segment_len = 0;
self.crc32 = NULL_CHECKSUM_U32;
self.metadata_only = false;
self.file_size = 0;
self.empty_file = false;
}
}
pub struct StateHelper {
state: super::State,
step: TransactionStep,
num_packets_ready: u32,
}
#[derive(Debug, Copy, Clone, derive_new::new)]
#[derive(Debug)]
pub struct FinishedParams {
condition_code: ConditionCode,
delivery_code: DeliveryCode,
file_status: FileStatus,
}
#[derive(Debug, derive_new::new)]
pub struct TransferState {
transaction_id: TransactionId,
remote_cfg: RemoteEntityConfig,
transmission_mode: super::TransmissionMode,
closure_requested: bool,
cond_code_eof: Option<ConditionCode>,
finished_params: Option<FinishedParams>,
}
impl Default for StateHelper {
@ -95,8 +93,11 @@ pub enum SourceError {
pdu_type: PduType,
directive_type: Option<FileDirectiveType>,
},
#[error("unexpected file data PDU")]
UnexpectedFileDataPdu,
#[error("unexpected PDU")]
UnexpectedPdu {
pdu_type: PduType,
directive_type: Option<FileDirectiveType>,
},
#[error("source handler is already busy with put request")]
PutRequestAlreadyActive,
#[error("error caching put request")]
@ -128,7 +129,7 @@ pub struct SourceHandler<
> {
local_cfg: LocalEntityConfig<UserFaultHook>,
pdu_sender: PduSender,
pdu_buffer: RefCell<alloc::vec::Vec<u8>>,
pdu_and_cksum_buffer: RefCell<alloc::vec::Vec<u8>>,
put_request_cacher: StaticPutRequestCacher,
remote_cfg_table: RemoteCfgTable,
vfs: Vfs,
@ -155,7 +156,7 @@ impl<
pdu_sender: PduSender,
vfs: Vfs,
put_request_cacher: StaticPutRequestCacher,
max_pdu_len: usize,
pdu_and_cksum_buf_size: usize,
remote_cfg_table: RemoteCfgTable,
seq_count_provider: SeqCountProvider,
) -> Self {
@ -163,7 +164,7 @@ impl<
local_cfg: cfg,
remote_cfg_table,
pdu_sender,
pdu_buffer: RefCell::new(alloc::vec![0; max_pdu_len]),
pdu_and_cksum_buffer: RefCell::new(alloc::vec![0; pdu_and_cksum_buf_size]),
vfs,
put_request_cacher,
state_helper: Default::default(),
@ -213,7 +214,10 @@ impl<
if packet_info.pdu_type() == PduType::FileData {
// The [PacketInfo] API should ensure that file data PDUs can not be passed
// into a source entity, so this should never happen.
return Err(SourceError::UnexpectedFileDataPdu);
return Err(SourceError::UnexpectedPdu {
pdu_type: PduType::FileData,
directive_type: None,
});
}
// Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is
// always a valid value.
@ -221,7 +225,7 @@ impl<
.pdu_directive()
.expect("PDU directive type unexpectedly not set")
{
FileDirectiveType::FinishedPdu => self.handle_finished_pdu(),
FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_info)?,
FileDirectiveType::NakPdu => self.handle_nak_pdu(),
FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
FileDirectiveType::AckPdu => todo!("acknowledged mode not implemented yet"),
@ -287,13 +291,14 @@ impl<
transmission_mode,
closure_requested,
None,
None,
));
Ok(())
}
#[inline]
pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
self.tstate.map(|v| v.transmission_mode)
self.tstate.as_ref().map(|v| v.transmission_mode)
}
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, SourceError> {
@ -315,18 +320,114 @@ impl<
}
}
if self.state_helper.step == TransactionStep::SendingEof {
// TODO: Checksum calculation using VFS.
self.prepare_and_send_eof_pdu(0)?;
self.eof_fsm(cfdp_user)?;
return Ok(1);
}
if self.state_helper.step == TransactionStep::WaitingForFinished {
/*
def _handle_wait_for_finish(self):
if (
self.transmission_mode == TransmissionMode.ACKNOWLEDGED
and self.__handle_retransmission()
):
return
if (
self._inserted_pdu.pdu is None
or self._inserted_pdu.pdu_directive_type is None
or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU
):
if self._params.check_timer is not None:
if self._params.check_timer.timed_out():
self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED)
return
finished_pdu = self._inserted_pdu.to_finished_pdu()
self._inserted_pdu.pdu = None
self._params.finished_params = finished_pdu.finished_params
if self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
self._prepare_finished_ack_packet(finished_pdu.condition_code)
self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED
else:
self.states.step = TransactionStep.NOTICE_OF_COMPLETION
*/
}
if self.state_helper.step == TransactionStep::NoticeOfCompletion {
self.notice_of_completion(cfdp_user);
/*
def _notice_of_completion(self):
if self.cfg.indication_cfg.transaction_finished_indication_required:
assert self._params.transaction_id is not None
# This happens for unacknowledged file copy operation with no closure.
if self._params.finished_params is None:
self._params.finished_params = FinishedParams(
condition_code=ConditionCode.NO_ERROR,
delivery_code=DeliveryCode.DATA_COMPLETE,
file_status=FileStatus.FILE_STATUS_UNREPORTED,
)
indication_params = TransactionFinishedParams(
transaction_id=self._params.transaction_id,
finished_params=self._params.finished_params,
)
self.user.transaction_finished_indication(indication_params)
# Transaction finished
self.reset()
*/
}
Ok(0)
}
fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> {
let tstate = self.tstate.as_ref().unwrap();
let checksum = self.vfs.calculate_checksum(
self.put_request_cacher.source_file().unwrap(),
tstate.remote_cfg.default_crc_type,
self.pdu_and_cksum_buffer.get_mut(),
)?;
self.prepare_and_send_eof_pdu(checksum)?;
let tstate = self.tstate.as_ref().unwrap();
if self.local_cfg.indication_cfg.eof_sent {
cfdp_user.eof_sent_indication(&tstate.transaction_id);
}
if tstate.transmission_mode == TransmissionMode::Unacknowledged {
if tstate.closure_requested {
// TODO: Check timer handling.
self.state_helper.step = TransactionStep::WaitingForFinished;
} else {
self.state_helper.step = TransactionStep::NoticeOfCompletion;
}
} else {
// TODO: Start positive ACK procedure.
}
/*
if self.cfg.indication_cfg.eof_sent_indication_required:
assert self._params.transaction_id is not None
self.user.eof_sent_indication(self._params.transaction_id)
if self.transmission_mode == TransmissionMode.UNACKNOWLEDGED:
if self._params.closure_requested:
assert self._params.remote_cfg is not None
self._params.check_timer = (
self.check_timer_provider.provide_check_timer(
local_entity_id=self.cfg.local_entity_id,
remote_entity_id=self._params.remote_cfg.entity_id,
entity_type=EntityType.SENDING,
)
)
self.states.step = TransactionStep.WAITING_FOR_FINISHED
else:
self.states.step = TransactionStep.NOTICE_OF_COMPLETION
else:
self._start_positive_ack_procedure()
*/
Ok(())
}
fn handle_transaction_start(
&mut self,
cfdp_user: &mut impl CfdpUser,
) -> Result<(), SourceError> {
self.fparams.reset();
let tstate = &self.tstate.expect("transfer state unexpectedly empty");
let tstate = self
.tstate
.as_ref()
.expect("transfer state unexpectedly empty");
if !self.put_request_cacher.has_source_file() {
self.fparams.metadata_only = true;
self.fparams.empty_file = true;
@ -435,6 +536,30 @@ impl<
Ok(ControlFlow::Continue(()))
}
fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
let tstate = self.tstate.as_ref().unwrap();
if self.local_cfg.indication_cfg.transaction_finished {
// The first case happens for unacknowledged file copy operation with no closure.
let finished_params = if tstate.finished_params.is_none() {
TransactionFinishedParams {
id: tstate.transaction_id,
condition_code: ConditionCode::NoError,
delivery_code: DeliveryCode::Complete,
file_status: FileStatus::Unreported,
}
} else {
let finished_params = tstate.finished_params.as_ref().unwrap();
TransactionFinishedParams {
id: tstate.transaction_id,
condition_code: finished_params.condition_code,
delivery_code: finished_params.delivery_code,
file_status: finished_params.file_status,
}
};
cfdp_user.transaction_finished_indication(&finished_params);
}
}
fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
// Should never be called, but use defensive programming here.
if self.fparams.progress >= self.fparams.file_size {
@ -447,7 +572,6 @@ impl<
} else {
self.fparams.segment_len
};
// TODO: Send File Data PDU.
let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata(
PduHeader::new_for_file_data(
self.pdu_conf,
@ -458,7 +582,8 @@ impl<
self.fparams.progress,
read_len,
);
let mut unwritten_pdu = pdu_creator.write_to_bytes_partially(self.pdu_buffer.get_mut())?;
let mut unwritten_pdu =
pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?;
self.vfs.read_data(
self.put_request_cacher.source_file().unwrap(),
self.fparams.progress,
@ -469,7 +594,7 @@ impl<
self.pdu_sender.send_pdu(
PduType::FileData,
None,
&self.pdu_buffer.borrow()[0..written_len],
&self.pdu_and_cksum_buffer.borrow()[0..written_len],
)?;
self.fparams.progress += read_len;
/*
@ -521,7 +646,6 @@ impl<
.tstate
.as_ref()
.expect("transfer state unexpectedly empty");
//let checksum_u32 = u32::from_be_bytes(self.fparams.crc32);
let eof_pdu = EofPdu::new(
PduHeader::new_no_file_data(self.pdu_conf, 0),
tstate.cond_code_eof.unwrap_or(ConditionCode::NoError),
@ -530,23 +654,11 @@ impl<
None,
);
self.pdu_send_helper(&eof_pdu)?;
/*
*
assert self._params.cond_code_eof is not None
self._add_packet_to_be_sent(
EofPdu(
file_checksum=checksum,
file_size=self._params.fp.progress,
pdu_conf=self._params.pdu_conf,
condition_code=self._params.cond_code_eof,
)
)
*/
Ok(())
}
fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), PduError> {
let mut pdu_buffer_mut = self.pdu_buffer.borrow_mut();
let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut();
let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?;
self.pdu_sender.send_pdu(
PduType::FileDirective,
@ -556,11 +668,52 @@ impl<
Ok(())
}
fn handle_finished_pdu(&mut self) {}
fn handle_finished_pdu(&mut self, packet_info: &PacketInfo) -> Result<(), SourceError> {
// Ignore this packet when we are idle.
if self.state_helper.state == State::Idle {
return Ok(());
}
if self.state_helper.step != TransactionStep::WaitingForFinished {
return Err(SourceError::UnexpectedPdu {
pdu_type: PduType::FileDirective,
directive_type: Some(FileDirectiveType::FinishedPdu),
});
}
let finished_pdu = FinishedPduReader::new(packet_info.raw_packet())?;
// Unwrapping should be fine here, the transfer state is valid when we are not in IDLE
// mode.
self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams {
condition_code: finished_pdu.condition_code(),
delivery_code: finished_pdu.delivery_code(),
file_status: finished_pdu.file_status(),
});
if self.tstate.as_ref().unwrap().transmission_mode == TransmissionMode::Acknowledged {
// TODO: Send ACK packet here immediately and continue.
//self.state_helper.step = TransactionStep::SendingAckOfFinished;
}
self.state_helper.step = TransactionStep::NoticeOfCompletion;
/*
if self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
self._prepare_finished_ack_packet(finished_pdu.condition_code)
self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED
else:
self.states.step = TransactionStep.NOTICE_OF_COMPLETION
*/
Ok(())
}
fn handle_nak_pdu(&mut self) {}
fn handle_keep_alive_pdu(&mut self) {}
/// This function is public to allow completely resetting the handler, but it is explicitely
/// discouraged to do this. CFDP has mechanism to detect issues and errors on itself.
pub fn reset(&mut self) {
self.state_helper = Default::default();
self.tstate = None;
self.fparams = Default::default();
}
}
#[cfg(test)]