Continue CFDP handlers #90

Merged
muellerr merged 34 commits from continue_cfdp_handlers into main 2024-01-29 23:42:04 +01:00
2 changed files with 154 additions and 60 deletions
Showing only changes of commit 1b1bef2958 - Show all commits

View File

@ -1,17 +1,17 @@
use core::str::{from_utf8, Utf8Error}; use core::str::{from_utf8, Utf8Error};
use std::{ use std::{
fs::{metadata, File}, fs::{metadata, File},
io::{BufReader, Read, Seek, SeekFrom, Write}, io::{Seek, SeekFrom, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use crate::cfdp::user::TransactionFinishedParams; use crate::cfdp::user::TransactionFinishedParams;
use super::{ use super::{
filestore::{NativeFilestore, VirtualFilestore}, filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, MetadataReceivedParams}, user::{CfdpUser, MetadataReceivedParams},
CheckTimerCreator, DefaultFaultHandler, PacketInfo, PacketTarget, RemoteEntityConfig, CheckTimerCreator, LocalEntityConfig, PacketInfo, PacketTarget, RemoteEntityConfig,
RemoteEntityConfigProvider, State, TransactionId, TransactionStep, CRC_32, RemoteEntityConfigProvider, State, TransactionId, TransactionStep,
}; };
use alloc::boxed::Box; use alloc::boxed::Box;
use smallvec::SmallVec; use smallvec::SmallVec;
@ -32,13 +32,12 @@ use spacepackets::{
use thiserror::Error; use thiserror::Error;
pub struct DestinationHandler { pub struct DestinationHandler {
id: UnsignedByteField, local_cfg: LocalEntityConfig,
step: TransactionStep, step: TransactionStep,
state: State, state: State,
tparams: TransactionParams, tparams: TransactionParams,
packets_to_send_ctx: PacketsToSendContext, packets_to_send_ctx: PacketsToSendContext,
vfs: Box<dyn VirtualFilestore>, vfs: Box<dyn VirtualFilestore>,
default_fault_handler: DefaultFaultHandler,
remote_cfg_table: Box<dyn RemoteEntityConfigProvider>, remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
check_timer_creator: Box<dyn CheckTimerCreator>, check_timer_creator: Box<dyn CheckTimerCreator>,
} }
@ -61,7 +60,7 @@ struct FileProperties {
#[derive(Debug)] #[derive(Debug)]
struct TransferState { struct TransferState {
transaction_id: Option<TransactionId>, transaction_id: Option<TransactionId>,
progress: usize, progress: u64,
condition_code: ConditionCode, condition_code: ConditionCode,
delivery_code: DeliveryCode, delivery_code: DeliveryCode,
file_status: FileStatus, file_status: FileStatus,
@ -111,8 +110,8 @@ impl Default for FileProperties {
} }
impl TransactionParams { impl TransactionParams {
fn file_size(&self) -> usize { fn file_size(&self) -> u64 {
self.tstate.metadata_params.file_size as usize self.tstate.metadata_params.file_size
} }
fn metadata_params(&self) -> &MetadataGenericParams { fn metadata_params(&self) -> &MetadataGenericParams {
@ -171,20 +170,18 @@ pub enum DestError {
impl DestinationHandler { impl DestinationHandler {
pub fn new( pub fn new(
entity_id: impl Into<UnsignedByteField>, local_cfg: LocalEntityConfig,
vfs: Box<dyn VirtualFilestore>, vfs: Box<dyn VirtualFilestore>,
default_fault_handler: DefaultFaultHandler,
remote_cfg_table: Box<dyn RemoteEntityConfigProvider>, remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
check_timer_creator: Box<dyn CheckTimerCreator>, check_timer_creator: Box<dyn CheckTimerCreator>,
) -> Self { ) -> Self {
Self { Self {
id: entity_id.into(), local_cfg,
step: TransactionStep::Idle, step: TransactionStep::Idle,
state: State::Idle, state: State::Idle,
tparams: Default::default(), tparams: Default::default(),
packets_to_send_ctx: Default::default(), packets_to_send_ctx: Default::default(),
vfs, vfs,
default_fault_handler,
remote_cfg_table, remote_cfg_table,
check_timer_creator, check_timer_creator,
} }
@ -196,7 +193,7 @@ impl DestinationHandler {
packet_to_insert: Option<&PacketInfo>, packet_to_insert: Option<&PacketInfo>,
) -> Result<(), DestError> { ) -> Result<(), DestError> {
if let Some(packet) = packet_to_insert { if let Some(packet) = packet_to_insert {
self.insert_packet(packet)?; self.insert_packet(cfdp_user, packet)?;
} }
match self.state { match self.state {
State::Idle => todo!(), State::Idle => todo!(),
@ -205,7 +202,11 @@ impl DestinationHandler {
} }
} }
fn insert_packet(&mut self, packet_info: &PacketInfo) -> Result<(), DestError> { fn insert_packet(
&mut self,
cfdp_user: &mut impl CfdpUser,
packet_info: &PacketInfo,
) -> Result<(), DestError> {
if packet_info.target() != PacketTarget::DestEntity { if packet_info.target() != PacketTarget::DestEntity {
// Unwrap is okay here, a PacketInfo for a file data PDU should always have the // Unwrap is okay here, a PacketInfo for a file data PDU should always have the
// destination as the target. // destination as the target.
@ -219,6 +220,7 @@ impl DestinationHandler {
return Err(DestError::DirectiveExpected); return Err(DestError::DirectiveExpected);
} }
self.handle_file_directive( self.handle_file_directive(
cfdp_user,
packet_info.pdu_directive.unwrap(), packet_info.pdu_directive.unwrap(),
packet_info.raw_packet, packet_info.raw_packet,
) )
@ -252,7 +254,7 @@ impl DestinationHandler {
) )
} else { } else {
// TODO: Are there cases where this ID is actually the source entity ID? // TODO: Are there cases where this ID is actually the source entity ID?
let entity_id = EntityIdTlv::new(self.id); let entity_id = EntityIdTlv::new(self.local_cfg.id);
FinishedPduCreator::new_with_error( FinishedPduCreator::new_with_error(
pdu_header, pdu_header,
self.tparams.tstate.condition_code, self.tparams.tstate.condition_code,
@ -276,11 +278,12 @@ impl DestinationHandler {
pub fn handle_file_directive( pub fn handle_file_directive(
&mut self, &mut self,
cfdp_user: &mut impl CfdpUser,
pdu_directive: FileDirectiveType, pdu_directive: FileDirectiveType,
raw_packet: &[u8], raw_packet: &[u8],
) -> Result<(), DestError> { ) -> Result<(), DestError> {
match pdu_directive { match pdu_directive {
FileDirectiveType::EofPdu => self.handle_eof_pdu(raw_packet)?, FileDirectiveType::EofPdu => self.handle_eof_pdu(cfdp_user, raw_packet)?,
FileDirectiveType::FinishedPdu FileDirectiveType::FinishedPdu
| FileDirectiveType::NakPdu | FileDirectiveType::NakPdu
| FileDirectiveType::KeepAlivePdu => { | FileDirectiveType::KeepAlivePdu => {
@ -359,11 +362,19 @@ impl DestinationHandler {
Ok(()) Ok(())
} }
pub fn handle_eof_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> { pub fn handle_eof_pdu(
&mut self,
cfdp_user: &mut impl CfdpUser,
raw_packet: &[u8],
) -> Result<(), DestError> {
if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus { if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus {
return Err(DestError::WrongStateForFileDataAndEof); return Err(DestError::WrongStateForFileDataAndEof);
} }
let eof_pdu = EofPdu::from_bytes(raw_packet)?; let eof_pdu = EofPdu::from_bytes(raw_packet)?;
if self.local_cfg.indication_cfg.eof_recv {
// Unwrap is okay here, application logic ensures that transaction ID is valid here.
cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap());
}
if eof_pdu.condition_code() == ConditionCode::NoError { if eof_pdu.condition_code() == ConditionCode::NoError {
self.handle_no_error_eof_pdu(&eof_pdu)?; self.handle_no_error_eof_pdu(&eof_pdu)?;
} else { } else {
@ -372,49 +383,73 @@ impl DestinationHandler {
Ok(()) Ok(())
} }
fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<(), DestError> { fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
// CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
if self.tparams.tstate.progress > eof_pdu.file_size()
&& self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError
{
return Ok(true);
}
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged
&& !self.checksum_verify(eof_pdu.file_checksum())
{
self.start_check_limit_handling()
}
// TODO: Continue here based on Python implementation.
// For a standard disk based file system, which is assumed to be used for now, the file // For a standard disk based file system, which is assumed to be used for now, the file
// will always be retained. This might change in the future. // will always be retained. This might change in the future.
/*
self.tparams.tstate.file_status = FileStatus::Retained; self.tparams.tstate.file_status = FileStatus::Retained;
if self.checksum_check(eof_pdu.file_checksum())? { if self.checksum_verify(eof_pdu.file_checksum())? {
self.tparams.tstate.condition_code = ConditionCode::NoError; self.tparams.tstate.condition_code = ConditionCode::NoError;
self.tparams.tstate.delivery_code = DeliveryCode::Complete; self.tparams.tstate.delivery_code = DeliveryCode::Complete;
} else { } else {
self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure; self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure;
} }
*/
// TODO: Check progress, and implement transfer completion timer as specified in the // TODO: Check progress, and implement transfer completion timer as specified in the
// standard. This timer protects against out of order arrival of packets. // standard. This timer protects against out of order arrival of packets.
if self.tparams.tstate.progress != self.tparams.file_size() {} // if self.tparams.tstate.progress != self.tparams.file_size() {}
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged { if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged {
self.step = TransactionStep::TransferCompletion; self.step = TransactionStep::TransferCompletion;
} else { } else {
self.step = TransactionStep::SendingAckPdu; self.step = TransactionStep::SendingAckPdu;
} }
Ok(()) Ok(false)
} }
fn checksum_verify(&mut self, checksum: u32) -> bool {
match self.vfs.checksum_verify(
self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
self.tparams.metadata_params().checksum_type,
checksum,
&mut self.tparams.cksum_buf,
) {
Ok(checksum_success) => checksum_success,
Err(e) => match e {
FilestoreError::ChecksumTypeNotImplemented(checksum) => {
self.declare_fault(ConditionCode::UnsupportedChecksumType);
// For this case, the applicable algorithm shall the the null checksum, which
// is always succesful.
true
}
_ => {
self.declare_fault(ConditionCode::FilestoreRejection);
// Treat this equivalent to a failed checksum procedure.
false
}
},
}
}
fn start_check_limit_handling(&mut self) {}
pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> { pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> {
todo!(); todo!();
} }
fn checksum_check(&mut self, expected_checksum: u32) -> Result<bool, DestError> {
// TODO: Implement this using the new virtual filestore abstraction.
let mut digest = CRC_32.digest();
let file_to_check = File::open(&self.tparams.file_properties.dest_path_buf)?;
let mut buf_reader = BufReader::new(file_to_check);
loop {
let bytes_read = buf_reader.read(&mut self.tparams.cksum_buf)?;
if bytes_read == 0 {
break;
}
digest.update(&self.tparams.cksum_buf[0..bytes_read]);
}
if digest.finalize() == expected_checksum {
return Ok(true);
}
Ok(false)
}
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
if self.step == TransactionStep::TransactionStart { if self.step == TransactionStep::TransactionStart {
self.transaction_start(cfdp_user)?; self.transaction_start(cfdp_user)?;
@ -472,7 +507,7 @@ impl DestinationHandler {
let metadata_recvd_params = MetadataReceivedParams { let metadata_recvd_params = MetadataReceivedParams {
id, id,
source_id, source_id,
file_size: self.tparams.tstate.metadata_params.file_size, file_size: self.tparams.file_size(),
src_file_name: src_name, src_file_name: src_name,
dest_file_name: dest_name, dest_file_name: dest_name,
msgs_to_user: &msgs_to_user[..num_msgs_to_user], msgs_to_user: &msgs_to_user[..num_msgs_to_user],
@ -528,9 +563,27 @@ impl DestinationHandler {
} }
fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode { fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode {
todo!("implement this. requires cached fault handler abstraction"); let fh_code = self
FaultHandlerCode::IgnoreError .local_cfg
.default_fault_handler
.get_fault_handler(condition_code);
match fh_code {
FaultHandlerCode::NoticeOfCancellation => {
self.notice_of_cancellation();
} }
FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(),
FaultHandlerCode::IgnoreError => todo!(),
FaultHandlerCode::AbandonTransaction => todo!(),
}
self.local_cfg.default_fault_handler.report_fault(
self.tparams.tstate.transaction_id.unwrap(),
condition_code,
self.tparams.tstate.progress,
)
}
fn notice_of_cancellation(&mut self) {}
fn notice_of_suspension(&mut self) {}
fn reset(&mut self) { fn reset(&mut self) {
self.step = TransactionStep::Idle; self.step = TransactionStep::Idle;
@ -569,8 +622,9 @@ mod tests {
}; };
use crate::cfdp::{ use crate::cfdp::{
CheckTimer, CheckTimerCreator, RemoteEntityConfig, StdRemoteEntityConfigProvider, filestore::NativeFilestore, CheckTimer, CheckTimerCreator, DefaultFaultHandler,
UserFaultHandler, IndicationConfig, RemoteEntityConfig, StdRemoteEntityConfigProvider, UserFaultHandler,
CRC_32,
}; };
use super::*; use super::*;
@ -806,10 +860,14 @@ mod tests {
fn default_dest_handler() -> DestinationHandler { fn default_dest_handler() -> DestinationHandler {
let test_fault_handler = TestFaultHandler::default(); let test_fault_handler = TestFaultHandler::default();
let local_entity_cfg = LocalEntityConfig {
id: REMOTE_ID.into(),
indication_cfg: IndicationConfig::default(),
default_fault_handler: DefaultFaultHandler::new(Box::new(test_fault_handler)),
};
DestinationHandler::new( DestinationHandler::new(
REMOTE_ID, local_entity_cfg,
Box::new(NativeFilestore::default()), Box::new(NativeFilestore::default()),
DefaultFaultHandler::new(Box::new(test_fault_handler)),
Box::new(basic_remote_cfg_table()), Box::new(basic_remote_cfg_table()),
Box::new(TestCheckTimerCreator::new(2, 2)), Box::new(TestCheckTimerCreator::new(2, 2)),
) )
@ -936,10 +994,6 @@ mod tests {
.write_to_bytes(&mut buf) .write_to_bytes(&mut buf)
.expect("writing file data PDU failed"); .expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info)); let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok()); assert!(result.is_ok());
@ -1003,10 +1057,6 @@ mod tests {
.write_to_bytes(&mut buf) .write_to_bytes(&mut buf)
.expect("writing file data PDU failed"); .expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info)); let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok()); assert!(result.is_ok());
@ -1021,10 +1071,6 @@ mod tests {
.write_to_bytes(&mut buf) .write_to_bytes(&mut buf)
.expect("writing file data PDU failed"); .expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info)); let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok()); assert!(result.is_ok());

View File

@ -206,6 +206,18 @@ impl DefaultFaultHandler {
}) })
} }
pub fn set_fault_handler(
&mut self,
condition_code: ConditionCode,
fault_handler: FaultHandlerCode,
) {
let array_idx = Self::condition_code_to_array_index(condition_code);
if array_idx.is_none() {
return;
}
self.handler_array[array_idx.unwrap()] = fault_handler;
}
pub fn new(user_fault_handler: Box<dyn UserFaultHandler + Send>) -> Self { pub fn new(user_fault_handler: Box<dyn UserFaultHandler + Send>) -> Self {
let mut init_array = [FaultHandlerCode::NoticeOfCancellation; 10]; let mut init_array = [FaultHandlerCode::NoticeOfCancellation; 10];
init_array init_array
@ -219,7 +231,15 @@ impl DefaultFaultHandler {
} }
} }
fn report_fault( pub fn get_fault_handler(&self, condition_code: ConditionCode) -> FaultHandlerCode {
let array_idx = Self::condition_code_to_array_index(condition_code);
if array_idx.is_none() {
return FaultHandlerCode::IgnoreError;
}
self.handler_array[array_idx.unwrap()]
}
pub fn report_fault(
&mut self, &mut self,
transaction_id: TransactionId, transaction_id: TransactionId,
condition: ConditionCode, condition: ConditionCode,
@ -258,6 +278,34 @@ impl DefaultFaultHandler {
} }
} }
pub struct IndicationConfig {
pub eof_sent: bool,
pub eof_recv: bool,
pub file_segment_recv: bool,
pub transaction_finished: bool,
pub suspended: bool,
pub resumed: bool,
}
impl Default for IndicationConfig {
fn default() -> Self {
Self {
eof_sent: true,
eof_recv: true,
file_segment_recv: true,
transaction_finished: true,
suspended: true,
resumed: true,
}
}
}
pub struct LocalEntityConfig {
pub id: UnsignedByteField,
pub indication_cfg: IndicationConfig,
pub default_fault_handler: DefaultFaultHandler,
}
#[derive(Debug, Eq, Copy, Clone)] #[derive(Debug, Eq, Copy, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TransactionId { pub struct TransactionId {