continue CFDP handlers
Some checks failed
Rust/sat-rs/pipeline/head There was a failure building this commit

This commit is contained in:
Robin Müller 2023-11-11 19:05:46 +01:00
parent 2afb3de227
commit f7f1017fed
Signed by: muellerr
GPG Key ID: A649FB78196E3849
3 changed files with 235 additions and 76 deletions

View File

@ -73,11 +73,11 @@ features = ["all"]
optional = true
[dependencies.spacepackets]
version = "0.7.0-beta.2"
# version = "0.7.0-beta.2"
default-features = false
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
# rev = "79d26e1a6"
# branch = ""
branch = "writable_pdu_packet"
[dependencies.cobs]
git = "https://github.com/robamu/cobs.rs.git"

View File

@ -9,8 +9,10 @@ use crate::cfdp::user::TransactionFinishedParams;
use super::{
user::{CfdpUser, MetadataReceivedParams},
PacketInfo, PacketTarget, State, TransactionId, TransactionStep, CRC_32,
CheckTimerCreator, PacketInfo, PacketTarget, RemoteEntityConfigProvider, State, TransactionId,
TransactionStep, CRC_32,
};
use alloc::boxed::Box;
use smallvec::SmallVec;
use spacepackets::{
cfdp::{
@ -19,10 +21,10 @@ use spacepackets::{
file_data::FileDataPdu,
finished::{DeliveryCode, FileStatus, FinishedPdu},
metadata::{MetadataGenericParams, MetadataPdu},
CommonPduConfig, FileDirectiveType, PduError, PduHeader,
CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
},
tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, TlvType},
ConditionCode, PduType, TransmissionMode,
ConditionCode, PduType,
},
util::UnsignedByteField,
};
@ -34,6 +36,8 @@ pub struct DestinationHandler {
state: State,
tparams: TransactionParams,
packets_to_send_ctx: PacketsToSendContext,
remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
check_timer_creator: Box<dyn CheckTimerCreator>,
}
#[derive(Debug, Default)]
@ -152,25 +156,38 @@ pub enum DestError {
}
impl DestinationHandler {
pub fn new(id: impl Into<UnsignedByteField>) -> Self {
pub fn new(
entity_id: impl Into<UnsignedByteField>,
remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
check_timer_creator: Box<dyn CheckTimerCreator>,
) -> Self {
Self {
id: id.into(),
id: entity_id.into(),
step: TransactionStep::Idle,
state: State::Idle,
tparams: Default::default(),
packets_to_send_ctx: Default::default(),
remote_cfg_table,
check_timer_creator,
}
}
pub fn state_machine(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
pub fn state_machine(
&mut self,
cfdp_user: &mut impl CfdpUser,
packet_to_insert: Option<&PacketInfo>,
) -> Result<(), DestError> {
if let Some(packet) = packet_to_insert {
self.insert_packet(packet)?;
}
match self.state {
State::Idle => todo!(),
State::BusyClass1Nacked => self.fsm_nacked(cfdp_user),
State::BusyClass2Acked => todo!("acknowledged mode not implemented yet"),
State::Busy => self.fsm_busy(cfdp_user),
State::Suspended => todo!(),
}
}
pub fn insert_packet(&mut self, packet_info: &PacketInfo) -> Result<(), DestError> {
fn insert_packet(&mut self, packet_info: &PacketInfo) -> Result<(), DestError> {
if packet_info.target() != PacketTarget::DestEntity {
// Unwrap is okay here, a PacketInfo for a file data PDU should always have the
// destination as the target.
@ -297,11 +314,7 @@ impl DestinationHandler {
}
}
}
if self.tparams.pdu_conf.trans_mode == TransmissionMode::Unacknowledged {
self.state = State::BusyClass1Nacked;
} else {
self.state = State::BusyClass2Acked;
}
self.state = State::Busy;
self.step = TransactionStep::TransactionStart;
Ok(())
}
@ -338,7 +351,7 @@ impl DestinationHandler {
// TODO: Check progress, and implement transfer completion timer as specified in the
// standard. This timer protects against out of order arrival of packets.
if self.tparams.tstate.progress != self.tparams.file_size() {}
if self.state == State::BusyClass1Nacked {
if self.state == State::Busy {
self.step = TransactionStep::TransferCompletion;
} else {
self.step = TransactionStep::SendingAckPdu;
@ -367,7 +380,7 @@ impl DestinationHandler {
Ok(false)
}
fn fsm_nacked(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
if self.step == TransactionStep::TransactionStart {
self.transaction_start(cfdp_user)?;
}
@ -496,7 +509,10 @@ impl DestinationHandler {
#[cfg(test)]
mod tests {
use core::sync::atomic::{AtomicU8, Ordering};
use core::{
cell::Cell,
sync::atomic::{AtomicU8, Ordering},
};
#[allow(unused_imports)]
use std::println;
use std::{env::temp_dir, fs};
@ -504,10 +520,14 @@ mod tests {
use alloc::{format, string::String};
use rand::Rng;
use spacepackets::{
cfdp::{lv::Lv, ChecksumType},
cfdp::{lv::Lv, pdu::WritablePduPacket, ChecksumType, TransmissionMode},
util::{UbfU16, UnsignedByteFieldU16},
};
use crate::cfdp::{
CheckTimer, CheckTimerCreator, RemoteEntityConfig, StdRemoteEntityConfigProvider,
};
use super::*;
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
@ -608,6 +628,63 @@ mod tests {
}
}
struct TestCheckTimer {
counter: Cell<u32>,
expiry_count: u32,
}
impl CheckTimer for TestCheckTimer {
fn has_expired(&self) -> bool {
let current_counter = self.counter.get();
if self.expiry_count == current_counter {
return true;
}
self.counter.set(current_counter + 1);
false
}
}
impl TestCheckTimer {
pub fn new(expiry_after_x_calls: u32) -> Self {
Self {
counter: Cell::new(0),
expiry_count: expiry_after_x_calls,
}
}
}
struct TestCheckTimerCreator {
expiry_counter_for_source_entity: u32,
expiry_counter_for_dest_entity: u32,
}
impl TestCheckTimerCreator {
pub fn new(
expiry_counter_for_source_entity: u32,
expiry_counter_for_dest_entity: u32,
) -> Self {
Self {
expiry_counter_for_source_entity,
expiry_counter_for_dest_entity,
}
}
}
impl CheckTimerCreator for TestCheckTimerCreator {
fn get_check_timer_provider(
&self,
_local_id: &UnsignedByteField,
_remote_id: &UnsignedByteField,
entity_type: crate::cfdp::EntityType,
) -> Box<dyn CheckTimer> {
if entity_type == crate::cfdp::EntityType::Sending {
Box::new(TestCheckTimer::new(self.expiry_counter_for_source_entity))
} else {
Box::new(TestCheckTimer::new(self.expiry_counter_for_dest_entity))
}
}
}
fn init_check(handler: &DestinationHandler) {
assert_eq!(handler.state(), State::Idle);
assert_eq!(handler.step(), TransactionStep::Idle);
@ -626,9 +703,31 @@ mod tests {
(src_path, file_path)
}
fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider {
let mut table = StdRemoteEntityConfigProvider::default();
let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
UnsignedByteFieldU16::new(1).into(),
1024,
1024,
true,
true,
TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
table.add_config(&remote_entity_cfg);
table
}
fn default_dest_handler() -> DestinationHandler {
DestinationHandler::new(
REMOTE_ID,
Box::new(basic_remote_cfg_table()),
Box::new(TestCheckTimerCreator::new(2, 2)),
)
}
#[test]
fn test_basic() {
let dest_handler = DestinationHandler::new(REMOTE_ID);
let dest_handler = default_dest_handler();
init_check(&dest_handler);
}
@ -655,37 +754,20 @@ mod tests {
)
}
fn insert_metadata_pdu(
metadata_pdu: &MetadataPdu,
buf: &mut [u8],
dest_handler: &mut DestinationHandler,
) {
let written_len = metadata_pdu
fn create_packet_info<'a>(
pdu: &'a impl WritablePduPacket,
buf: &'a mut [u8],
) -> PacketInfo<'a> {
let written_len = pdu
.write_to_bytes(buf)
.expect("writing metadata PDU failed");
let packet_info =
PacketInfo::new(&buf[..written_len]).expect("generating packet info failed");
let insert_result = dest_handler.insert_packet(&packet_info);
if let Err(e) = insert_result {
panic!("insert result error: {e}");
}
PacketInfo::new(&buf[..written_len]).expect("generating packet info failed")
}
fn insert_eof_pdu(
file_data: &[u8],
pdu_header: &PduHeader,
buf: &mut [u8],
dest_handler: &mut DestinationHandler,
) {
fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader, buf: &mut [u8]) -> EofPdu {
let mut digest = CRC_32.digest();
digest.update(file_data);
let crc32 = digest.finalize();
let eof_pdu = EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64);
let result = eof_pdu.write_to_bytes(buf);
assert!(result.is_ok());
let packet_info = PacketInfo::new(&buf).expect("generating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
assert!(result.is_ok());
EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64)
}
#[test]
@ -700,23 +782,24 @@ mod tests {
expected_file_size: 0,
};
// We treat the destination handler like it is a remote entity.
let mut dest_handler = DestinationHandler::new(REMOTE_ID);
let mut dest_handler = default_dest_handler();
init_check(&dest_handler);
let seq_num = UbfU16::new(0);
let pdu_header = create_pdu_header(seq_num);
let metadata_pdu =
create_metadata_pdu(&pdu_header, src_name.as_path(), dest_name.as_path(), 0);
insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
let packet_info = create_packet_info(&metadata_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
if let Err(e) = result {
panic!("dest handler fsm error: {e}");
}
assert_ne!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus);
insert_eof_pdu(&[], &pdu_header, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
let eof_pdu = create_no_error_eof(&[], &pdu_header, &mut buf);
let packet_info = create_packet_info(&eof_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);
@ -740,7 +823,7 @@ mod tests {
expected_file_size: file_data.len(),
};
// We treat the destination handler like it is a remote entity.
let mut dest_handler = DestinationHandler::new(REMOTE_ID);
let mut dest_handler = default_dest_handler();
init_check(&dest_handler);
let seq_num = UbfU16::new(0);
@ -751,8 +834,8 @@ mod tests {
dest_name.as_path(),
file_data.len() as u64,
);
insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
let packet_info = create_packet_info(&metadata_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
if let Err(e) = result {
panic!("dest handler fsm error: {e}");
}
@ -769,11 +852,12 @@ mod tests {
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
insert_eof_pdu(file_data, &pdu_header, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
let eof_pdu = create_no_error_eof(&file_data, &pdu_header, &mut buf);
let packet_info = create_packet_info(&eof_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);
@ -800,7 +884,7 @@ mod tests {
};
// We treat the destination handler like it is a remote entity.
let mut dest_handler = DestinationHandler::new(REMOTE_ID);
let mut dest_handler = default_dest_handler();
init_check(&dest_handler);
let seq_num = UbfU16::new(0);
@ -811,8 +895,8 @@ mod tests {
dest_name.as_path(),
random_data.len() as u64,
);
insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
let packet_info = create_packet_info(&metadata_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
if let Err(e) = result {
panic!("dest handler fsm error: {e}");
}
@ -835,7 +919,7 @@ mod tests {
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
// Second file data PDU
@ -853,11 +937,12 @@ mod tests {
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
insert_eof_pdu(&random_data, &pdu_header, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
let eof_pdu = create_no_error_eof(&random_data, &pdu_header, &mut buf);
let packet_info = create_packet_info(&eof_pdu, &mut buf);
let result = dest_handler.state_machine(&mut test_user, Some(&packet_info));
assert!(result.is_ok());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);

View File

@ -1,4 +1,7 @@
use core::hash::Hash;
use crc::{Crc, CRC_32_CKSUM};
use hashbrown::HashMap;
use spacepackets::{
cfdp::{
pdu::{FileDirectiveType, PduError, PduHeader},
@ -35,7 +38,7 @@ pub enum EntityType {
/// For the receiving entity, this timer determines the expiry period for incrementing a check
/// counter after an EOF PDU is received for an incomplete file transfer. This allows out-of-order
/// reception of file data PDUs and EOF PDUs. Also see 4.6.3.3 of the CFDP standard.
pub trait CheckTimerProvider {
pub trait CheckTimer {
fn has_expired(&self) -> bool;
}
@ -50,10 +53,11 @@ pub trait CheckTimerProvider {
#[cfg(feature = "alloc")]
pub trait CheckTimerCreator {
fn get_check_timer_provider(
&self,
local_id: &UnsignedByteField,
remote_id: &UnsignedByteField,
entity_type: EntityType,
) -> Box<dyn CheckTimerProvider>;
) -> Box<dyn CheckTimer>;
}
/// Simple implementation of the [CheckTimerProvider] trait assuming a standard runtime.
@ -75,7 +79,7 @@ impl StdCheckTimer {
}
#[cfg(feature = "std")]
impl CheckTimerProvider for StdCheckTimer {
impl CheckTimer for StdCheckTimer {
fn has_expired(&self) -> bool {
let elapsed_time = self.start_time.elapsed();
if elapsed_time.as_secs() > self.expiry_time_seconds {
@ -85,22 +89,78 @@ impl CheckTimerProvider for StdCheckTimer {
}
}
#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct RemoteEntityConfig {
pub entity_id: UnsignedByteField,
pub max_file_segment_len: usize,
pub closure_requeted_by_default: bool,
pub max_packet_len: usize,
pub closure_requested_by_default: bool,
pub crc_on_transmission_by_default: bool,
pub default_transmission_mode: TransmissionMode,
pub default_crc_type: ChecksumType,
pub check_limit: u32,
}
pub trait RemoteEntityConfigProvider {
fn get_remote_config(&self, remote_id: &UnsignedByteField) -> Option<&RemoteEntityConfig>;
impl RemoteEntityConfig {
pub fn new_with_default_values(
entity_id: UnsignedByteField,
max_file_segment_len: usize,
max_packet_len: usize,
closure_requested_by_default: bool,
crc_on_transmission_by_default: bool,
default_transmission_mode: TransmissionMode,
default_crc_type: ChecksumType,
) -> Self {
Self {
entity_id,
max_file_segment_len,
max_packet_len,
closure_requested_by_default,
crc_on_transmission_by_default,
default_transmission_mode,
default_crc_type,
check_limit: 2,
}
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub trait RemoteEntityConfigProvider {
/// Retrieve the remote entity configuration for the given remote ID.
fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig>;
fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig>;
/// Add a new remote configuration. Return [True] if the configuration was
/// inserted successfully, and [False] if a configuration already exists.
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool;
/// Remote a configuration. Returns [True] if the configuration was removed successfully,
/// and [False] if no configuration exists for the given remote ID.
fn remove_config(&mut self, remote_id: u64) -> bool;
}
#[cfg(feature = "std")]
#[derive(Default)]
pub struct StdRemoteEntityConfigProvider {
remote_cfg_table: HashMap<u64, RemoteEntityConfig>,
}
#[cfg(feature = "std")]
impl RemoteEntityConfigProvider for StdRemoteEntityConfigProvider {
fn get_remote_config(&self, remote_id: u64) -> Option<&RemoteEntityConfig> {
self.remote_cfg_table.get(&remote_id)
}
fn get_remote_config_mut(&mut self, remote_id: u64) -> Option<&mut RemoteEntityConfig> {
self.remote_cfg_table.get_mut(&remote_id)
}
fn add_config(&mut self, cfg: &RemoteEntityConfig) -> bool {
self.remote_cfg_table
.insert(cfg.entity_id.value(), *cfg)
.is_some()
}
fn remove_config(&mut self, remote_id: u64) -> bool {
self.remote_cfg_table.remove(&remote_id).is_some()
}
}
#[derive(Debug, Eq, Copy, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TransactionId {
source_id: UnsignedByteField,
@ -121,6 +181,20 @@ impl TransactionId {
}
}
impl Hash for TransactionId {
fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
self.source_id.value().hash(state);
self.seq_num.value().hash(state);
}
}
impl PartialEq for TransactionId {
fn eq(&self, other: &Self) -> bool {
self.source_id.value() == other.source_id.value()
&& self.seq_num.value() == other.seq_num.value()
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum TransactionStep {
@ -136,8 +210,8 @@ pub enum TransactionStep {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum State {
Idle = 0,
BusyClass1Nacked = 2,
BusyClass2Acked = 3,
Busy = 1,
Suspended = 2,
}
pub const CRC_32: Crc<u32> = Crc::<u32>::new(&CRC_32_CKSUM);
@ -249,7 +323,7 @@ mod tests {
eof::EofPdu,
file_data::FileDataPdu,
metadata::{MetadataGenericParams, MetadataPdu},
CommonPduConfig, FileDirectiveType, PduHeader,
CommonPduConfig, FileDirectiveType, PduHeader, WritablePduPacket,
},
PduType,
};