rework timer and packet send handling
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2023-12-22 19:24:48 +01:00
parent 4cf96ce0d5
commit 7776847364
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 144 additions and 103 deletions

View File

@ -6,7 +6,8 @@ use super::{
filestore::{FilestoreError, VirtualFilestore},
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
CheckTimer, CheckTimerCreator, EntityType, LocalEntityConfig, PacketInfo, PacketTarget,
RemoteEntityConfig, RemoteEntityConfigProvider, State, TransactionId, TransactionStep,
RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TransactionId,
TransactionStep,
};
use alloc::boxed::Box;
use smallvec::SmallVec;
@ -26,12 +27,6 @@ use spacepackets::{
};
use thiserror::Error;
#[derive(Debug, Default)]
struct PacketsToSendContext {
packet_available: bool,
directive: Option<FileDirectiveType>,
}
#[derive(Debug)]
struct FileProperties {
src_file_name: [u8; u8::MAX as usize],
@ -171,6 +166,15 @@ pub enum DestError {
NoRemoteCfgFound(UnsignedByteField),
}
pub trait CfdpPacketSender: Send {
fn send_pdu(
&mut self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), PduError>;
}
/// This is the primary CFDP destination handler. It models the CFDP destination entity, which is
/// primarily responsible for receiving files sent from another CFDP entity. It performs the
/// reception side of File Copy Operations.
@ -194,7 +198,8 @@ pub struct DestinationHandler {
step: TransactionStep,
state: State,
tparams: TransactionParams,
packets_to_send_ctx: PacketsToSendContext,
packet_buf: alloc::vec::Vec<u8>,
packet_sender: Box<dyn CfdpPacketSender>,
vfs: Box<dyn VirtualFilestore>,
remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
check_timer_creator: Box<dyn CheckTimerCreator>,
@ -203,6 +208,8 @@ pub struct DestinationHandler {
impl DestinationHandler {
pub fn new(
local_cfg: LocalEntityConfig,
max_packet_len: usize,
packet_sender: Box<dyn CfdpPacketSender>,
vfs: Box<dyn VirtualFilestore>,
remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
check_timer_creator: Box<dyn CheckTimerCreator>,
@ -212,7 +219,8 @@ impl DestinationHandler {
step: TransactionStep::Idle,
state: State::Idle,
tparams: Default::default(),
packets_to_send_ctx: Default::default(),
packet_buf: alloc::vec![0; max_packet_len],
packet_sender,
vfs,
remote_cfg_table,
check_timer_creator,
@ -232,10 +240,7 @@ impl DestinationHandler {
&mut self,
cfdp_user: &mut impl CfdpUser,
packet_to_insert: Option<&PacketInfo>,
) -> Result<(), DestError> {
if self.packet_to_send_ready() {
return Err(DestError::PacketToSendLeft);
}
) -> Result<u32, DestError> {
if let Some(packet) = packet_to_insert {
self.insert_packet(cfdp_user, packet)?;
}
@ -259,54 +264,6 @@ impl DestinationHandler {
self.tstate().transaction_id
}
pub fn packet_to_send_ready(&self) -> bool {
self.packets_to_send_ctx.packet_available
}
pub fn get_next_packet(
&self,
buf: &mut [u8],
) -> Result<Option<(FileDirectiveType, usize)>, DestError> {
if !self.packet_to_send_ready() {
return Ok(None);
}
let directive = self.packets_to_send_ctx.directive.unwrap();
let tstate = self.tstate();
let written_size = match directive {
FileDirectiveType::FinishedPdu => {
let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
let finished_pdu = if tstate.condition_code == ConditionCode::NoError
|| tstate.condition_code == ConditionCode::UnsupportedChecksumType
{
FinishedPduCreator::new_default(
pdu_header,
tstate.delivery_code,
tstate.file_status,
)
} else {
// TODO: Are there cases where this ID is actually the source entity ID?
let entity_id = EntityIdTlv::new(self.local_cfg.id);
FinishedPduCreator::new_with_error(
pdu_header,
tstate.condition_code,
tstate.delivery_code,
tstate.file_status,
entity_id,
)
};
finished_pdu.write_to_bytes(buf)?
}
FileDirectiveType::AckPdu => todo!(),
FileDirectiveType::NakPdu => todo!(),
FileDirectiveType::KeepAlivePdu => todo!(),
_ => {
// This should never happen and is considered an internal impl error
panic!("invalid file directive {directive:?} for dest handler send packet");
}
};
Ok(Some((directive, written_size)))
}
fn insert_packet(
&mut self,
cfdp_user: &mut impl CfdpUser,
@ -532,12 +489,14 @@ impl DestinationHandler {
fn start_check_limit_handling(&mut self) {
self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling;
self.tparams.tstate.current_check_timer =
Some(self.check_timer_creator.get_check_timer_provider(
&self.local_cfg.id,
&self.tparams.remote_cfg.unwrap().entity_id,
EntityType::Receiving,
));
self.tparams.tstate.current_check_timer = Some(
self.check_timer_creator
.get_check_timer_provider(TimerContext::CheckLimit {
local_id: self.local_cfg.id,
remote_id: self.tparams.remote_cfg.unwrap().entity_id,
entity_type: EntityType::Receiving,
}),
);
self.tparams.tstate.current_check_count = 0;
}
@ -571,7 +530,8 @@ impl DestinationHandler {
todo!();
}
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
let mut sent_packets = 0;
if self.step == TransactionStep::TransactionStart {
self.transaction_start(cfdp_user)?;
}
@ -579,7 +539,7 @@ impl DestinationHandler {
self.check_limit_handling();
}
if self.step == TransactionStep::TransferCompletion {
self.transfer_completion(cfdp_user)?;
sent_packets += self.transfer_completion(cfdp_user)?;
}
if self.step == TransactionStep::SendingAckPdu {
todo!("no support for acknowledged mode yet");
@ -587,7 +547,7 @@ impl DestinationHandler {
if self.step == TransactionStep::SendingFinishedPdu {
self.reset();
}
Ok(())
Ok(sent_packets)
}
/// Get the step, which denotes the exact step of a pending CFDP transaction when applicable.
@ -669,17 +629,18 @@ impl DestinationHandler {
Ok(())
}
fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
let mut sent_packets = 0;
self.notice_of_completion(cfdp_user)?;
if self.tparams.transmission_mode() == TransmissionMode::Acknowledged
|| self.tparams.metadata_params().closure_requested
{
self.prepare_finished_pdu()?;
sent_packets += self.send_finished_pdu()?;
self.step = TransactionStep::SendingFinishedPdu;
} else {
self.reset();
}
Ok(())
Ok(sent_packets)
}
fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
@ -745,15 +706,36 @@ impl DestinationHandler {
fn reset(&mut self) {
self.step = TransactionStep::Idle;
self.state = State::Idle;
self.packets_to_send_ctx.packet_available = false;
// self.packets_to_send_ctx.packet_available = false;
self.tparams.reset();
}
fn prepare_finished_pdu(&mut self) -> Result<(), DestError> {
self.packets_to_send_ctx.packet_available = true;
self.packets_to_send_ctx.directive = Some(FileDirectiveType::FinishedPdu);
self.step = TransactionStep::SendingFinishedPdu;
Ok(())
fn send_finished_pdu(&mut self) -> Result<u32, DestError> {
let tstate = self.tstate();
let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
let finished_pdu = if tstate.condition_code == ConditionCode::NoError
|| tstate.condition_code == ConditionCode::UnsupportedChecksumType
{
FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status)
} else {
// TODO: Are there cases where this ID is actually the source entity ID?
let entity_id = EntityIdTlv::new(self.local_cfg.id);
FinishedPduCreator::new_with_error(
pdu_header,
tstate.condition_code,
tstate.delivery_code,
tstate.file_status,
entity_id,
)
};
finished_pdu.write_to_bytes(&mut self.packet_buf)?;
self.packet_sender.send_pdu(
finished_pdu.pdu_type(),
finished_pdu.file_directive_type(),
&self.packet_buf[0..finished_pdu.len_written()],
)?;
Ok(1)
}
fn tstate(&self) -> &TransferState {
@ -800,6 +782,27 @@ mod tests {
pub length: usize,
}
type SharedPduPacketQueue = Arc<Mutex<VecDeque<(PduType, Option<FileDirectiveType>, Vec<u8>)>>>;
#[derive(Default, Clone)]
struct TestCfdpSender {
packet_queue: SharedPduPacketQueue,
}
impl CfdpPacketSender for TestCfdpSender {
fn send_pdu(
&mut self,
pdu_type: PduType,
file_directive_type: Option<FileDirectiveType>,
raw_pdu: &[u8],
) -> Result<(), PduError> {
self.packet_queue.lock().unwrap().push_back((
pdu_type,
file_directive_type,
raw_pdu.to_vec(),
));
Ok(())
}
}
#[derive(Default)]
struct TestCfdpUser {
next_expected_seq_num: u64,
@ -1025,28 +1028,33 @@ mod tests {
}
struct TestCheckTimerCreator {
expired_flag: Arc<AtomicBool>,
check_limit_expired_flag: Arc<AtomicBool>,
}
impl TestCheckTimerCreator {
pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
Self { expired_flag }
Self {
check_limit_expired_flag: expired_flag,
}
}
}
impl CheckTimerCreator for TestCheckTimerCreator {
fn get_check_timer_provider(
&self,
_local_id: &UnsignedByteField,
_remote_id: &UnsignedByteField,
_entity_type: crate::cfdp::EntityType,
) -> Box<dyn CheckTimer> {
Box::new(TestCheckTimer::new(self.expired_flag.clone()))
fn get_check_timer_provider(&self, timer_context: TimerContext) -> Box<dyn CheckTimer> {
match timer_context {
TimerContext::CheckLimit { .. } => {
Box::new(TestCheckTimer::new(self.check_limit_expired_flag.clone()))
}
_ => {
panic!("invalid check timer creator, can only be used for check limit handling")
}
}
}
}
struct DestHandlerTester {
check_timer_expired: Arc<AtomicBool>,
test_sender: TestCfdpSender,
handler: DestinationHandler,
src_path: PathBuf,
dest_path: PathBuf,
@ -1061,11 +1069,17 @@ mod tests {
impl DestHandlerTester {
fn new(fault_handler: TestFaultHandler) -> Self {
let check_timer_expired = Arc::new(AtomicBool::new(false));
let dest_handler = default_dest_handler(fault_handler, check_timer_expired.clone());
let test_sender = TestCfdpSender::default();
let dest_handler = default_dest_handler(
fault_handler,
test_sender.clone(),
check_timer_expired.clone(),
);
let (src_path, dest_path) = init_full_filenames();
assert!(!Path::exists(&dest_path));
let handler = Self {
check_timer_expired,
test_sender,
handler: dest_handler,
src_path,
dest_path,
@ -1134,7 +1148,7 @@ mod tests {
user: &mut TestCfdpUser,
offset: u64,
file_data_chunk: &[u8],
) -> Result<(), DestError> {
) -> Result<u32, DestError> {
let filedata_pdu =
FileDataPdu::new_no_seg_metadata(self.pdu_header, offset, file_data_chunk);
filedata_pdu
@ -1157,7 +1171,7 @@ mod tests {
&mut self,
user: &mut TestCfdpUser,
expected_full_data: Vec<u8>,
) -> Result<(), DestError> {
) -> Result<u32, DestError> {
self.expected_full_data = expected_full_data;
let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header);
let packet_info = create_packet_info(&eof_pdu, &mut self.buf);
@ -1218,6 +1232,7 @@ mod tests {
fn default_dest_handler(
test_fault_handler: TestFaultHandler,
test_packet_sender: TestCfdpSender,
check_timer_expired: Arc<AtomicBool>,
) -> DestinationHandler {
let local_entity_cfg = LocalEntityConfig {
@ -1227,6 +1242,8 @@ mod tests {
};
DestinationHandler::new(
local_entity_cfg,
2048,
Box::new(test_packet_sender),
Box::<NativeFilestore>::default(),
Box::new(basic_remote_cfg_table()),
Box::new(TestCheckTimerCreator::new(check_timer_expired)),
@ -1284,7 +1301,8 @@ mod tests {
#[test]
fn test_basic() {
let fault_handler = TestFaultHandler::default();
let dest_handler = default_dest_handler(fault_handler.clone(), Arc::default());
let test_sender = TestCfdpSender::default();
let dest_handler = default_dest_handler(fault_handler.clone(), test_sender, Arc::default());
assert!(dest_handler.transmission_mode().is_none());
assert!(fault_handler.all_queues_empty());
}

View File

@ -31,7 +31,23 @@ pub enum EntityType {
Receiving,
}
/// Generic abstraction for a check timer which has different functionality depending on whether
pub enum TimerContext {
CheckLimit {
local_id: UnsignedByteField,
remote_id: UnsignedByteField,
entity_type: EntityType,
},
NakActivity(f32),
PositiveAck(f32),
}
/// Generic abstraction for a check timer which is used by 3 mechanisms of the CFDP protocol.
///
/// ## 1. Check limit handling
///
/// The first mechanism is the check limit handling for unacknowledged transfers as specified
/// in 4.6.3.2 and 4.6.3.3 of the CFDP standard.
/// For this mechanism, the timer has different functionality depending on whether
/// the using entity is the sending entity or the receiving entity for the unacknowledged
/// transmission mode.
///
@ -42,6 +58,18 @@ pub enum EntityType {
/// For the receiving entity, this timer determines the expiry period for incrementing a check
/// counter after an EOF PDU is received for an incomplete file transfer. This allows out-of-order
/// reception of file data PDUs and EOF PDUs. Also see 4.6.3.3 of the CFDP standard.
///
/// ## 2. NAK activity limit
///
/// The timer will be used to perform the NAK activity check as specified in 4.6.4.7 of the CFDP
/// standard. The expiration period will be provided by the NAK timer expiration limit of the
/// remote entity configuration.
///
/// ## 3. Positive ACK procedures
///
/// The timer will be used to perform the Positive Acknowledgement Procedures as specified in
/// 4.7. 1of the CFDP standard. The expiration period will be provided by the Positive ACK timer
/// interval of the remote entity configuration.
pub trait CheckTimer: Debug {
fn has_expired(&self) -> bool;
fn reset(&mut self);
@ -50,19 +78,14 @@ pub trait CheckTimer: Debug {
/// A generic trait which allows CFDP entities to create check timers which are required to
/// implement special procedures in unacknowledged transmission mode, as specified in 4.6.3.2
/// and 4.6.3.3. The [CheckTimer] documentation provides more information about the purpose of the
/// check timer.
/// check timer in the context of CFDP.
///
/// This trait also allows the creation of different check timers depending on
/// the ID of the local entity, the ID of the remote entity for a given transaction, and the
/// type of entity.
/// This trait also allows the creation of different check timers depending on context and purpose
/// of the timer, the runtime environment (e.g. standard clock timer vs. timer using a RTC) or
/// other factors.
#[cfg(feature = "alloc")]
pub trait CheckTimerCreator {
fn get_check_timer_provider(
&self,
local_id: &UnsignedByteField,
remote_id: &UnsignedByteField,
entity_type: EntityType,
) -> Box<dyn CheckTimer>;
fn get_check_timer_provider(&self, timer_context: TimerContext) -> Box<dyn CheckTimer>;
}
/// Simple implementation of the [CheckTimerCreator] trait assuming a standard runtime.