add check limit handling
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
Robin Müller 2023-12-14 12:10:22 +01:00
parent 0ecb718416
commit fbd05a4a25
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 186 additions and 117 deletions

View File

@ -1,17 +1,12 @@
use core::str::{from_utf8, Utf8Error};
use std::{
fs::{metadata, File},
io::{Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use crate::cfdp::user::TransactionFinishedParams;
use core::str::{from_utf8, Utf8Error};
use std::path::{Path, PathBuf};
use super::{
filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, MetadataReceivedParams},
CheckTimerCreator, LocalEntityConfig, PacketInfo, PacketTarget, RemoteEntityConfig,
RemoteEntityConfigProvider, State, TransactionId, TransactionStep,
CheckTimer, CheckTimerCreator, EntityType, LocalEntityConfig, PacketInfo, PacketTarget,
RemoteEntityConfig, RemoteEntityConfigProvider, State, TransactionId, TransactionStep,
};
use alloc::boxed::Box;
use smallvec::SmallVec;
@ -60,22 +55,28 @@ struct FileProperties {
#[derive(Debug)]
struct TransferState {
transaction_id: Option<TransactionId>,
metadata_params: MetadataGenericParams,
progress: u64,
condition_code: ConditionCode,
delivery_code: DeliveryCode,
file_status: FileStatus,
metadata_params: MetadataGenericParams,
checksum: u32,
current_check_count: u32,
current_check_timer: Option<Box<dyn CheckTimer>>,
}
impl Default for TransferState {
fn default() -> Self {
Self {
transaction_id: None,
metadata_params: Default::default(),
progress: Default::default(),
condition_code: ConditionCode::NoError,
delivery_code: DeliveryCode::Incomplete,
file_status: FileStatus::Unreported,
metadata_params: Default::default(),
checksum: 0,
current_check_count: 0,
current_check_timer: None,
}
}
}
@ -137,6 +138,7 @@ impl TransactionParams {
fn reset(&mut self) {
self.tstate.condition_code = ConditionCode::NoError;
self.tstate.delivery_code = DeliveryCode::Incomplete;
self.tstate.file_status = FileStatus::Unreported;
}
}
@ -160,10 +162,12 @@ pub enum DestError {
Pdu(#[from] PduError),
#[error("io error {0}")]
Io(#[from] std::io::Error),
#[error("file store error {0}")]
Filestore(#[from] FilestoreError),
#[error("path conversion error {0}")]
PathConversion(#[from] Utf8Error),
#[error("error building dest path from source file name and dest folder")]
PathConcatError,
PathConcat,
#[error("no remote entity configuration found for {0:?}")]
NoRemoteCfgFound(UnsignedByteField),
}
@ -241,25 +245,26 @@ impl DestinationHandler {
return Ok(None);
}
let directive = self.packets_to_send_ctx.directive.unwrap();
let tstate = self.tstate();
let written_size = match directive {
FileDirectiveType::FinishedPdu => {
let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
let finished_pdu = if self.tparams.tstate.condition_code == ConditionCode::NoError
|| self.tparams.tstate.condition_code == ConditionCode::UnsupportedChecksumType
let finished_pdu = if tstate.condition_code == ConditionCode::NoError
|| tstate.condition_code == ConditionCode::UnsupportedChecksumType
{
FinishedPduCreator::new_default(
pdu_header,
self.tparams.tstate.delivery_code,
self.tparams.tstate.file_status,
tstate.delivery_code,
tstate.file_status,
)
} else {
// TODO: Are there cases where this ID is actually the source entity ID?
let entity_id = EntityIdTlv::new(self.local_cfg.id);
FinishedPduCreator::new_with_error(
pdu_header,
self.tparams.tstate.condition_code,
self.tparams.tstate.delivery_code,
self.tparams.tstate.file_status,
tstate.condition_code,
tstate.delivery_code,
tstate.file_status,
entity_id,
)
};
@ -354,11 +359,14 @@ impl DestinationHandler {
return Err(DestError::WrongStateForFileDataAndEof);
}
let fd_pdu = FileDataPdu::from_bytes(raw_packet)?;
let mut dest_file = File::options()
.write(true)
.open(&self.tparams.file_properties.dest_path_buf)?;
dest_file.seek(SeekFrom::Start(fd_pdu.offset()))?;
dest_file.write_all(fd_pdu.file_data())?;
if let Err(e) = self.vfs.write_data(
self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
fd_pdu.offset(),
fd_pdu.file_data(),
) {
self.declare_fault(ConditionCode::FilestoreRejection);
return Err(e.into());
}
Ok(())
}
@ -375,50 +383,57 @@ impl DestinationHandler {
// Unwrap is okay here, application logic ensures that transaction ID is valid here.
cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap());
}
if eof_pdu.condition_code() == ConditionCode::NoError {
self.handle_no_error_eof_pdu(&eof_pdu)?;
let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError {
self.handle_no_error_eof_pdu(&eof_pdu)?
} else {
todo!("implement cancel request handling");
};
if regular_transfer_finish {
self.file_transfer_complete_transition();
}
Ok(())
}
/// Returns whether the transfer can be completed regularly.
fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
// CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
if self.tparams.tstate.progress > eof_pdu.file_size()
&& self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError
{
return Ok(true);
}
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged
&& !self.checksum_verify(eof_pdu.file_checksum())
return Ok(false);
} else if (self.tparams.tstate.progress < eof_pdu.file_size())
&& self.tparams.transmission_mode() == TransmissionMode::Acknowledged
{
self.start_check_limit_handling()
// CFDP 4.6.4.3.1: The end offset of the last received file segment and the file
// size as stated in the EOF PDU is not the same, so we need to add that segment to
// the lost segments for the deferred lost segment detection procedure.
// TODO: Proper lost segment handling.
// self._params.acked_params.lost_seg_tracker.add_lost_segment(
// (self._params.fp.progress, self._params.fp.file_size_eof)
// )
}
// TODO: Continue here based on Python implementation.
// For a standard disk based file system, which is assumed to be used for now, the file
// will always be retained. This might change in the future.
/*
self.tparams.tstate.file_status = FileStatus::Retained;
if self.checksum_verify(eof_pdu.file_checksum())? {
self.tparams.tstate.condition_code = ConditionCode::NoError;
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
} else {
self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure;
self.tparams.tstate.checksum = eof_pdu.file_checksum();
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged
&& !self.checksum_verify(self.tparams.tstate.checksum)
{
if self.declare_fault(ConditionCode::FileChecksumFailure)
!= FaultHandlerCode::IgnoreError
{
return Ok(false);
}
self.start_check_limit_handling();
}
*/
// TODO: Check progress, and implement transfer completion timer as specified in the
// standard. This timer protects against out of order arrival of packets.
// if self.tparams.tstate.progress != self.tparams.file_size() {}
Ok(true)
}
fn file_transfer_complete_transition(&mut self) {
if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged {
self.step = TransactionStep::TransferCompletion;
} else {
// TODO: Prepare ACK PDU somehow.
self.step = TransactionStep::SendingAckPdu;
}
Ok(false)
}
fn checksum_verify(&mut self, checksum: u32) -> bool {
@ -430,7 +445,7 @@ impl DestinationHandler {
) {
Ok(checksum_success) => checksum_success,
Err(e) => match e {
FilestoreError::ChecksumTypeNotImplemented(checksum) => {
FilestoreError::ChecksumTypeNotImplemented(_) => {
self.declare_fault(ConditionCode::UnsupportedChecksumType);
// For this case, the applicable algorithm shall the the null checksum, which
// is always succesful.
@ -445,7 +460,43 @@ impl DestinationHandler {
}
}
fn start_check_limit_handling(&mut self) {}
fn start_check_limit_handling(&mut self) {
self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling;
self.tparams.tstate.current_check_timer =
Some(self.check_timer_creator.get_check_timer_provider(
&self.local_cfg.id,
&self.tparams.remote_cfg.unwrap().entity_id,
EntityType::Receiving,
));
self.tparams.tstate.current_check_count = 0;
}
fn check_limit_handling(&mut self) {
if self.tparams.tstate.current_check_timer.is_none() {
return;
}
let check_timer = self.tparams.tstate.current_check_timer.as_ref().unwrap();
if check_timer.has_expired() {
if self.checksum_verify(self.tparams.tstate.checksum) {
self.file_transfer_complete_transition();
return;
}
if self.tparams.tstate.current_check_count + 1
>= self.tparams.remote_cfg.unwrap().check_limit
{
self.declare_fault(ConditionCode::CheckLimitReached);
} else {
self.tparams.tstate.current_check_count += 1;
self.tparams
.tstate
.current_check_timer
.as_mut()
.unwrap()
.reset();
}
}
}
pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> {
todo!();
}
@ -457,6 +508,9 @@ impl DestinationHandler {
if self.step == TransactionStep::TransferCompletion {
self.transfer_completion(cfdp_user)?;
}
if self.step == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
self.check_limit_handling();
}
if self.step == TransactionStep::SendingAckPdu {
todo!("no support for acknowledged mode yet");
}
@ -515,39 +569,43 @@ impl DestinationHandler {
self.tparams.tstate.transaction_id = Some(id);
cfdp_user.metadata_recvd_indication(&metadata_recvd_params);
if dest_path.exists() {
let dest_metadata = metadata(dest_path)?;
if dest_metadata.is_dir() {
// Create new destination path by concatenating the last part of the source source
// name and the destination folder. For example, for a source file of /tmp/hello.txt
// and a destination name of /home/test, the resulting file name should be
// /home/test/hello.txt
let source_path = Path::new(from_utf8(
&self.tparams.file_properties.src_file_name
[..self.tparams.file_properties.src_file_name_len],
)?);
let source_name = source_path.file_name();
if source_name.is_none() {
return Err(DestError::PathConcatError);
}
let source_name = source_name.unwrap();
self.tparams.file_properties.dest_path_buf.push(source_name);
// TODO: This is the only remaining function which uses std.. the easiest way would
// probably be to use a static pre-allocated dest path buffer to store any concatenated
// paths.
if dest_path.exists() && self.vfs.is_dir(dest_path.to_str().unwrap()) {
// Create new destination path by concatenating the last part of the source source
// name and the destination folder. For example, for a source file of /tmp/hello.txt
// and a destination name of /home/test, the resulting file name should be
// /home/test/hello.txt
let source_path = Path::new(from_utf8(
&self.tparams.file_properties.src_file_name
[..self.tparams.file_properties.src_file_name_len],
)?);
let source_name = source_path.file_name();
if source_name.is_none() {
return Err(DestError::PathConcat);
}
let source_name = source_name.unwrap();
self.tparams.file_properties.dest_path_buf.push(source_name);
}
// This function does exactly what we require: Create a new file if it does not exist yet
// and trucate an existing one.
File::create(&self.tparams.file_properties.dest_path_buf)?;
let dest_path_str = self.tparams.file_properties.dest_path_buf.to_str().unwrap();
if self.vfs.exists(dest_path_str) {
self.vfs.truncate_file(dest_path_str)?;
} else {
self.vfs.create_file(dest_path_str)?;
}
self.tparams.tstate.file_status = FileStatus::Retained;
self.step = TransactionStep::ReceivingFileDataPdus;
Ok(())
}
fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
let tstate = self.tstate();
let transaction_finished_params = TransactionFinishedParams {
id: self.tparams.tstate.transaction_id.unwrap(),
condition_code: self.tparams.tstate.condition_code,
delivery_code: self.tparams.tstate.delivery_code,
file_status: self.tparams.tstate.file_status,
id: tstate.transaction_id.unwrap(),
condition_code: tstate.condition_code,
delivery_code: tstate.delivery_code,
file_status: tstate.file_status,
};
cfdp_user.transaction_finished_indication(&transaction_finished_params);
// This function should never be called with metadata parameters not set
@ -575,10 +633,11 @@ impl DestinationHandler {
FaultHandlerCode::IgnoreError => todo!(),
FaultHandlerCode::AbandonTransaction => todo!(),
}
let tstate = self.tstate();
self.local_cfg.default_fault_handler.report_fault(
self.tparams.tstate.transaction_id.unwrap(),
tstate.transaction_id.unwrap(),
condition_code,
self.tparams.tstate.progress,
tstate.progress,
)
}
@ -598,19 +657,20 @@ impl DestinationHandler {
self.step = TransactionStep::SendingFinishedPdu;
Ok(())
}
fn tstate(&self) -> &TransferState {
&self.tparams.tstate
}
}
#[cfg(test)]
mod tests {
use core::{
cell::Cell,
sync::atomic::{AtomicU8, Ordering},
};
use core::cell::Cell;
use std::fs;
#[allow(unused_imports)]
use std::println;
use std::{env::temp_dir, fs};
use alloc::{format, string::String};
use alloc::{collections::VecDeque, string::String};
use rand::Rng;
use spacepackets::{
cfdp::{
@ -649,10 +709,10 @@ mod tests {
#[derive(Default)]
struct TestFaultHandler {
notice_of_suspension_count: u32,
notice_of_cancellation_count: u32,
abandoned_count: u32,
ignored_count: u32,
notice_of_suspension_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
notice_of_cancellation_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
abandoned_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
ignored_queue: VecDeque<(TransactionId, ConditionCode, u64)>,
}
impl UserFaultHandler for TestFaultHandler {
@ -662,7 +722,8 @@ mod tests {
cond: ConditionCode,
progress: u64,
) {
self.notice_of_suspension_count += 1;
self.notice_of_suspension_queue
.push_back((transaction_id, cond, progress))
}
fn notice_of_cancellation_cb(
@ -671,7 +732,8 @@ mod tests {
cond: ConditionCode,
progress: u64,
) {
self.notice_of_cancellation_count += 1;
self.notice_of_cancellation_queue
.push_back((transaction_id, cond, progress))
}
fn abandoned_cb(
@ -680,11 +742,13 @@ mod tests {
cond: ConditionCode,
progress: u64,
) {
self.abandoned_count += 1;
self.abandoned_queue
.push_back((transaction_id, cond, progress))
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
self.ignored_count += 1;
self.ignored_queue
.push_back((transaction_id, cond, progress))
}
}
@ -763,6 +827,7 @@ mod tests {
}
}
#[derive(Debug)]
struct TestCheckTimer {
counter: Cell<u32>,
expiry_count: u32,
@ -777,6 +842,9 @@ mod tests {
self.counter.set(current_counter + 1);
false
}
fn reset(&mut self) {
self.counter.set(0);
}
}
impl TestCheckTimer {
@ -901,7 +969,8 @@ mod tests {
.expect("writing metadata PDU failed");
PacketInfo::new(&buf[..written_len]).expect("generating packet info failed")
}
fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader, buf: &mut [u8]) -> EofPdu {
fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader) -> EofPdu {
let mut digest = CRC_32.digest();
digest.update(file_data);
let crc32 = digest.finalize();
@ -935,7 +1004,7 @@ mod tests {
assert_ne!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus);
let eof_pdu = create_no_error_eof(&[], &pdu_header, &mut buf);
let eof_pdu = create_no_error_eof(&[], &pdu_header);
let packet_info = create_packet_info(&eof_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
@ -989,7 +1058,7 @@ mod tests {
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
let eof_pdu = create_no_error_eof(&file_data, &pdu_header, &mut buf);
let eof_pdu = create_no_error_eof(file_data, &pdu_header);
let packet_info = create_packet_info(&eof_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
@ -1066,7 +1135,7 @@ mod tests {
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
let eof_pdu = create_no_error_eof(&random_data, &pdu_header, &mut buf);
let eof_pdu = create_no_error_eof(&random_data, &pdu_header);
let packet_info = create_packet_info(&eof_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());

View File

@ -1,4 +1,4 @@
use core::hash::Hash;
use core::{cell::RefCell, fmt::Debug, hash::Hash};
use crc::{Crc, CRC_32_CKSUM};
use hashbrown::HashMap;
@ -39,8 +39,9 @@ pub enum EntityType {
/// For the receiving entity, this timer determines the expiry period for incrementing a check
/// counter after an EOF PDU is received for an incomplete file transfer. This allows out-of-order
/// reception of file data PDUs and EOF PDUs. Also see 4.6.3.3 of the CFDP standard.
pub trait CheckTimer {
pub trait CheckTimer: Debug {
fn has_expired(&self) -> bool;
fn reset(&mut self);
}
/// A generic trait which allows CFDP entities to create check timers which are required to
@ -64,6 +65,7 @@ pub trait CheckTimerCreator {
/// Simple implementation of the [CheckTimerProvider] trait assuming a standard runtime.
/// It also assumes that a second accuracy of the check timer period is sufficient.
#[cfg(feature = "std")]
#[derive(Debug)]
pub struct StdCheckTimer {
expiry_time_seconds: u64,
start_time: std::time::Instant,
@ -88,6 +90,10 @@ impl CheckTimer for StdCheckTimer {
}
false
}
fn reset(&mut self) {
self.start_time = std::time::Instant::now();
}
}
#[derive(Debug, Copy, Clone)]
@ -186,7 +192,9 @@ pub trait UserFaultHandler {
pub struct DefaultFaultHandler {
handler_array: [FaultHandlerCode; 10],
user_fault_handler: Box<dyn UserFaultHandler + Send>,
// Could also change the user fault handler trait to have non mutable methods, but that limits
// flexbility on the user side..
user_fault_handler: RefCell<Box<dyn UserFaultHandler + Send>>,
}
impl DefaultFaultHandler {
@ -227,7 +235,7 @@ impl DefaultFaultHandler {
.unwrap()] = FaultHandlerCode::IgnoreError;
Self {
handler_array: init_array,
user_fault_handler,
user_fault_handler: RefCell::new(user_fault_handler),
}
}
@ -240,7 +248,7 @@ impl DefaultFaultHandler {
}
pub fn report_fault(
&mut self,
&self,
transaction_id: TransactionId,
condition: ConditionCode,
progress: u64,
@ -250,28 +258,19 @@ impl DefaultFaultHandler {
return FaultHandlerCode::IgnoreError;
}
let fh_code = self.handler_array[array_idx.unwrap()];
let mut handler_mut = self.user_fault_handler.borrow_mut();
match fh_code {
FaultHandlerCode::NoticeOfCancellation => {
self.user_fault_handler.notice_of_cancellation_cb(
transaction_id,
condition,
progress,
);
handler_mut.notice_of_cancellation_cb(transaction_id, condition, progress);
}
FaultHandlerCode::NoticeOfSuspension => {
self.user_fault_handler.notice_of_suspension_cb(
transaction_id,
condition,
progress,
);
handler_mut.notice_of_suspension_cb(transaction_id, condition, progress);
}
FaultHandlerCode::IgnoreError => {
self.user_fault_handler
.ignore_cb(transaction_id, condition, progress);
handler_mut.ignore_cb(transaction_id, condition, progress);
}
FaultHandlerCode::AbandonTransaction => {
self.user_fault_handler
.abandoned_cb(transaction_id, condition, progress);
handler_mut.abandoned_cb(transaction_id, condition, progress);
}
}
fh_code
@ -347,9 +346,10 @@ pub enum TransactionStep {
Idle = 0,
TransactionStart = 1,
ReceivingFileDataPdus = 2,
SendingAckPdu = 3,
TransferCompletion = 4,
SendingFinishedPdu = 5,
ReceivingFileDataPdusWithCheckLimitHandling = 3,
SendingAckPdu = 4,
TransferCompletion = 5,
SendingFinishedPdu = 6,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]