Continue CFDP handlers #90

Merged
muellerr merged 34 commits from continue_cfdp_handlers into main 2024-01-29 23:42:04 +01:00
2 changed files with 208 additions and 98 deletions
Showing only changes of commit 7e8be538e0 - Show all commits

View File

@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};
use super::{
filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, MetadataReceivedParams},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
CheckTimer, CheckTimerCreator, EntityType, LocalEntityConfig, PacketInfo, PacketTarget,
RemoteEntityConfig, RemoteEntityConfigProvider, State, TransactionId, TransactionStep,
};
@ -229,7 +229,7 @@ impl DestinationHandler {
packet_info.raw_packet,
)
}
PduType::FileData => self.handle_file_data(packet_info.raw_packet),
PduType::FileData => self.handle_file_data(cfdp_user, packet_info.raw_packet),
}
}
@ -354,7 +354,11 @@ impl DestinationHandler {
Ok(())
}
pub fn handle_file_data(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
pub fn handle_file_data(
&mut self,
user: &mut impl CfdpUser,
raw_packet: &[u8],
) -> Result<(), DestError> {
if self.state == State::Idle
|| (self.step != TransactionStep::ReceivingFileDataPdus
&& self.step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling)
@ -362,6 +366,14 @@ impl DestinationHandler {
return Err(DestError::WrongStateForFileDataAndEof);
}
let fd_pdu = FileDataPdu::from_bytes(raw_packet)?;
if self.local_cfg.indication_cfg.file_segment_recv {
user.file_segment_recvd_indication(&FileSegmentRecvdParams {
id: self.tstate().transaction_id.unwrap(),
offset: fd_pdu.offset(),
length: fd_pdu.file_data().len(),
segment_metadata: fd_pdu.segment_metadata(),
});
}
if let Err(e) = self.vfs.write_data(
self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
fd_pdu.offset(),
@ -689,9 +701,9 @@ mod tests {
};
use crate::cfdp::{
filestore::NativeFilestore, CheckTimer, CheckTimerCreator, DefaultFaultHandler,
IndicationConfig, RemoteEntityConfig, StdRemoteEntityConfigProvider, UserFaultHandler,
CRC_32,
filestore::NativeFilestore, user::OwnedMetadataRecvdParams, CheckTimer, CheckTimerCreator,
DefaultFaultHandler, IndicationConfig, RemoteEntityConfig, StdRemoteEntityConfigProvider,
UserFaultHandler, CRC_32,
};
use super::*;
@ -699,21 +711,137 @@ mod tests {
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
pub struct FileSegmentRecvdParamsNoSegMetadata {
pub id: TransactionId,
pub offset: u64,
pub length: usize,
}
#[derive(Default)]
struct TestCfdpUser {
next_expected_seq_num: u64,
expected_full_src_name: String,
expected_full_dest_name: String,
expected_file_size: u64,
transaction_indication_call_count: u32,
eof_recvd_call_count: u32,
finished_indic_queue: VecDeque<TransactionFinishedParams>,
metadata_recv_queue: VecDeque<OwnedMetadataRecvdParams>,
file_seg_recvd_queue: VecDeque<FileSegmentRecvdParamsNoSegMetadata>,
}
impl TestCfdpUser {
fn new(
next_expected_seq_num: u64,
expected_full_src_name: String,
expected_full_dest_name: String,
expected_file_size: u64,
) -> Self {
Self {
next_expected_seq_num,
expected_full_src_name,
expected_full_dest_name,
expected_file_size,
transaction_indication_call_count: 0,
eof_recvd_call_count: 0,
finished_indic_queue: VecDeque::new(),
metadata_recv_queue: VecDeque::new(),
file_seg_recvd_queue: VecDeque::new(),
}
}
fn generic_id_check(&self, id: &crate::cfdp::TransactionId) {
assert_eq!(id.source_id, LOCAL_ID.into());
assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
}
}
impl CfdpUser for TestCfdpUser {
fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
self.transaction_indication_call_count += 1;
}
fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn transaction_finished_indication(
&mut self,
finished_params: &crate::cfdp::user::TransactionFinishedParams,
) {
self.generic_id_check(&finished_params.id);
self.finished_indic_queue.push_back(*finished_params);
}
fn metadata_recvd_indication(
&mut self,
md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
) {
self.generic_id_check(&md_recvd_params.id);
assert_eq!(
String::from(md_recvd_params.src_file_name),
self.expected_full_src_name
);
assert_eq!(
String::from(md_recvd_params.dest_file_name),
self.expected_full_dest_name
);
assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
assert_eq!(md_recvd_params.file_size, self.expected_file_size);
self.metadata_recv_queue.push_back(md_recvd_params.into());
}
fn file_segment_recvd_indication(
&mut self,
segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
) {
self.generic_id_check(&segment_recvd_params.id);
self.file_seg_recvd_queue
.push_back(FileSegmentRecvdParamsNoSegMetadata {
id: segment_recvd_params.id,
offset: segment_recvd_params.offset,
length: segment_recvd_params.length,
})
}
fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
fn suspended_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
self.eof_recvd_call_count += 1;
}
}
#[derive(Default)]
struct TestFaultHandler {
notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
@ -759,81 +887,6 @@ mod tests {
}
}
impl CfdpUser for TestCfdpUser {
fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn transaction_finished_indication(
&mut self,
finished_params: &crate::cfdp::user::TransactionFinishedParams,
) {
self.generic_id_check(&finished_params.id);
}
fn metadata_recvd_indication(
&mut self,
md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
) {
self.generic_id_check(&md_recvd_params.id);
assert_eq!(
String::from(md_recvd_params.src_file_name),
self.expected_full_src_name
);
assert_eq!(
String::from(md_recvd_params.dest_file_name),
self.expected_full_dest_name
);
assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
assert_eq!(md_recvd_params.file_size, self.expected_file_size);
}
fn file_segment_recvd_indication(
&mut self,
_segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
) {
}
fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
fn suspended_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
}
#[derive(Debug)]
struct TestCheckTimer {
counter: Cell<u32>,
@ -903,8 +956,8 @@ mod tests {
handler: dest_handler,
src_path,
dest_path,
check_dest_file: true,
check_handler_idle_at_drop: true,
check_dest_file: false,
check_handler_idle_at_drop: false,
expected_file_size: 0,
pdu_header: create_pdu_header(UbfU16::new(0)),
expected_full_data: Vec::new(),
@ -914,23 +967,31 @@ mod tests {
handler
}
fn indication_cfg_mut(&mut self) -> &mut IndicationConfig {
&mut self.handler.local_cfg.indication_cfg
}
fn indication_cfg(&mut self) -> &IndicationConfig {
&self.handler.local_cfg.indication_cfg
}
fn set_check_timer_expired(&mut self) {
self.check_timer_expired
.store(true, core::sync::atomic::Ordering::Relaxed);
}
fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser {
TestCfdpUser {
next_expected_seq_num: 0,
expected_full_src_name: self.src_path.to_string_lossy().into(),
expected_full_dest_name: self.dest_path.to_string_lossy().into(),
TestCfdpUser::new(
0,
self.src_path.to_string_lossy().into(),
self.dest_path.to_string_lossy().into(),
expected_file_size,
}
)
}
fn generic_transfer_init(
&mut self,
user: &mut impl CfdpUser,
user: &mut TestCfdpUser,
file_size: u64,
) -> Result<(), DestError> {
self.expected_file_size = file_size;
@ -941,12 +1002,14 @@ mod tests {
file_size,
);
let packet_info = create_packet_info(&metadata_pdu, &mut self.buf);
self.handler.state_machine(user, Some(&packet_info))
let result = self.handler.state_machine(user, Some(&packet_info));
assert_eq!(user.metadata_recv_queue.len(), 1);
result
}
fn generic_file_data_insert(
&mut self,
user: &mut impl CfdpUser,
user: &mut TestCfdpUser,
offset: u64,
file_data_chunk: &[u8],
) -> Result<(), DestError> {
@ -956,18 +1019,33 @@ mod tests {
.write_to_bytes(&mut self.buf)
.expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&self.buf).expect("creating packet info failed");
self.handler.state_machine(user, Some(&packet_info))
let result = self.handler.state_machine(user, Some(&packet_info));
if self.indication_cfg().file_segment_recv {
assert!(!user.file_seg_recvd_queue.is_empty());
assert_eq!(user.file_seg_recvd_queue.back().unwrap().offset, offset);
assert_eq!(
user.file_seg_recvd_queue.back().unwrap().length,
file_data_chunk.len()
);
}
result
}
fn generic_eof_no_error(
&mut self,
user: &mut impl CfdpUser,
user: &mut TestCfdpUser,
expected_full_data: Vec<u8>,
) -> Result<(), DestError> {
self.expected_full_data = expected_full_data;
let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header);
let packet_info = create_packet_info(&eof_pdu, &mut self.buf);
self.handler.state_machine(user, Some(&packet_info))
self.check_handler_idle_at_drop = true;
self.check_dest_file = true;
let result = self.handler.state_machine(user, Some(&packet_info));
if self.indication_cfg().eof_recv {
assert_eq!(user.eof_recvd_call_count, 1);
}
result
}
fn state_check(&self, state: State, step: TransactionStep) {
@ -1159,6 +1237,7 @@ mod tests {
test_obj
.generic_transfer_init(&mut test_user, file_size)
.expect("transfer init failed");
test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
test_obj
.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])

View File

@ -1,10 +1,10 @@
use spacepackets::{
cfdp::{
pdu::{
file_data::RecordContinuationState,
file_data::SegmentMetadata,
finished::{DeliveryCode, FileStatus},
},
tlv::msg_to_user::MsgToUserTlv,
tlv::{msg_to_user::MsgToUserTlv, WritableTlv},
ConditionCode,
},
util::UnsignedByteField,
@ -30,13 +30,44 @@ pub struct MetadataReceivedParams<'src_file, 'dest_file, 'msgs_to_user> {
pub msgs_to_user: &'msgs_to_user [MsgToUserTlv<'msgs_to_user>],
}
#[cfg(feature = "alloc")]
#[derive(Debug)]
pub struct OwnedMetadataRecvdParams {
pub id: TransactionId,
pub source_id: UnsignedByteField,
pub file_size: u64,
pub src_file_name: alloc::string::String,
pub dest_file_name: alloc::string::String,
pub msgs_to_user: alloc::vec::Vec<alloc::vec::Vec<u8>>,
}
#[cfg(feature = "alloc")]
impl From<MetadataReceivedParams<'_, '_, '_>> for OwnedMetadataRecvdParams {
fn from(value: MetadataReceivedParams) -> Self {
Self::from(&value)
}
}
#[cfg(feature = "alloc")]
impl From<&MetadataReceivedParams<'_, '_, '_>> for OwnedMetadataRecvdParams {
fn from(value: &MetadataReceivedParams) -> Self {
Self {
id: value.id,
source_id: value.source_id,
file_size: value.file_size,
src_file_name: value.src_file_name.into(),
dest_file_name: value.dest_file_name.into(),
msgs_to_user: value.msgs_to_user.iter().map(|tlv| tlv.to_vec()).collect(),
}
}
}
#[derive(Debug)]
pub struct FileSegmentRecvdParams<'seg_meta> {
pub id: TransactionId,
pub offset: u64,
pub length: usize,
pub rec_cont_state: Option<RecordContinuationState>,
pub segment_metadata: Option<&'seg_meta [u8]>,
pub segment_metadata: Option<&'seg_meta SegmentMetadata<'seg_meta>>,
}
pub trait CfdpUser {