continue with rust and python
Some checks failed
Rust/cfdp/pipeline/head There was a failure building this commit

This commit is contained in:
Robin Müller 2024-08-27 16:01:17 +02:00
parent ee6b715987
commit 259bf3294c
Signed by: muellerr
GPG Key ID: A649FB78196E3849
4 changed files with 349 additions and 19 deletions

View File

@ -55,8 +55,8 @@ from tmtccmd.config.cfdp import CfdpParams, generic_cfdp_params_to_put_request
from tmtccmd.config.args import add_cfdp_procedure_arguments, cfdp_args_to_cfdp_params from tmtccmd.config.args import add_cfdp_procedure_arguments, cfdp_args_to_cfdp_params
LOCAL_ENTITY_ID = ByteFieldU16(1) PYTHON_ENTITY_ID = ByteFieldU16(1)
REMOTE_ENTITY_ID = ByteFieldU16(2) RUST_ENTITY_ID = ByteFieldU16(2)
# Enable all indications for both local and remote entity. # Enable all indications for both local and remote entity.
INDICATION_CFG = IndicationCfg() INDICATION_CFG = IndicationCfg()
@ -79,8 +79,8 @@ DEST_ENTITY_QUEUE = Queue()
# be sent by the UDP server. # be sent by the UDP server.
TM_QUEUE = Queue() TM_QUEUE = Queue()
REMOTE_CFG_OF_LOCAL_ENTITY = RemoteEntityCfg( REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg(
entity_id=LOCAL_ENTITY_ID, entity_id=PYTHON_ENTITY_ID,
max_packet_len=MAX_PACKET_LEN, max_packet_len=MAX_PACKET_LEN,
max_file_segment_len=FILE_SEGMENT_SIZE, max_file_segment_len=FILE_SEGMENT_SIZE,
closure_requested=True, closure_requested=True,
@ -89,8 +89,8 @@ REMOTE_CFG_OF_LOCAL_ENTITY = RemoteEntityCfg(
crc_type=ChecksumType.CRC_32, crc_type=ChecksumType.CRC_32,
) )
REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_LOCAL_ENTITY) REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_PY_ENTITY)
REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = REMOTE_ENTITY_ID REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = RUST_ENTITY_ID
LOCAL_PORT = 5111 LOCAL_PORT = 5111
REMOTE_PORT = 5222 REMOTE_PORT = 5222
@ -196,10 +196,10 @@ class SourceEntityHandler(Thread):
try: try:
put_req: PutRequest = self.put_req_queue.get(False) put_req: PutRequest = self.put_req_queue.get(False)
_LOGGER.info(f"{self.base_str}: Handling Put Request: {put_req}") _LOGGER.info(f"{self.base_str}: Handling Put Request: {put_req}")
if put_req.destination_id not in [LOCAL_ENTITY_ID, REMOTE_ENTITY_ID]: if put_req.destination_id not in [PYTHON_ENTITY_ID, RUST_ENTITY_ID]:
_LOGGER.warning( _LOGGER.warning(
f"can only handle put requests target towards {REMOTE_ENTITY_ID} or " f"can only handle put requests target towards {RUST_ENTITY_ID} or "
f"{LOCAL_ENTITY_ID}" f"{PYTHON_ENTITY_ID}"
) )
else: else:
try: try:
@ -557,7 +557,7 @@ def main():
cfdp_params = CfdpParams() cfdp_params = CfdpParams()
cfdp_args_to_cfdp_params(args, cfdp_params) cfdp_args_to_cfdp_params(args, cfdp_params)
put_req = generic_cfdp_params_to_put_request( put_req = generic_cfdp_params_to_put_request(
cfdp_params, LOCAL_ENTITY_ID, REMOTE_ENTITY_ID, LOCAL_ENTITY_ID cfdp_params, PYTHON_ENTITY_ID, RUST_ENTITY_ID, PYTHON_ENTITY_ID
) )
PUT_REQ_QUEUE.put(put_req) PUT_REQ_QUEUE.put(put_req)
@ -572,7 +572,7 @@ def main():
src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE) src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE)
check_timer_provider = CustomCheckTimerProvider() check_timer_provider = CustomCheckTimerProvider()
source_handler = SourceHandler( source_handler = SourceHandler(
cfg=LocalEntityCfg(LOCAL_ENTITY_ID, INDICATION_CFG, src_fault_handler), cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler),
seq_num_provider=src_seq_count_provider, seq_num_provider=src_seq_count_provider,
remote_cfg_table=remote_cfg_table, remote_cfg_table=remote_cfg_table,
user=src_user, user=src_user,
@ -592,7 +592,7 @@ def main():
dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST) dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST)
dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE) dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE)
dest_handler = DestHandler( dest_handler = DestHandler(
cfg=LocalEntityCfg(LOCAL_ENTITY_ID, INDICATION_CFG, dest_fault_handler), cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler),
user=dest_user, user=dest_user,
remote_cfg_table=remote_cfg_table, remote_cfg_table=remote_cfg_table,
check_timer_provider=check_timer_provider, check_timer_provider=check_timer_provider,

View File

@ -1 +1,333 @@
fn main() {} use std::{
fs::OpenOptions,
io::Write,
sync::{atomic::AtomicBool, mpsc, Arc},
thread,
time::Duration,
};
use cfdp::{
dest::DestinationHandler,
filestore::NativeFilestore,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig,
StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
};
use spacepackets::{
cfdp::{ChecksumType, ConditionCode, TransmissionMode},
seq_count::SeqCountProviderSyncU16,
util::UnsignedByteFieldU16,
};
const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
const FILE_DATA: &str = "Hello World!";
#[derive(Default)]
pub struct ExampleFaultHandler {}
impl UserFaultHookProvider for ExampleFaultHandler {
fn notice_of_suspension_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected suspension of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn notice_of_cancellation_cb(
&mut self,
transaction_id: TransactionId,
cond: ConditionCode,
progress: u64,
) {
panic!(
"unexpected cancellation of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"unexpected abandonment of transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
panic!(
"ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}",
transaction_id, cond, progress
);
}
}
pub struct ExampleCfdpUser {
entity_type: EntityType,
completion_signal: Arc<AtomicBool>,
}
impl ExampleCfdpUser {
pub fn new(entity_type: EntityType, completion_signal: Arc<AtomicBool>) -> Self {
Self {
entity_type,
completion_signal,
}
}
}
impl CfdpUser for ExampleCfdpUser {
fn transaction_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: Transaction indication for {:?}",
self.entity_type, id
);
}
fn eof_sent_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF sent for transaction {:?}",
self.entity_type, id
);
}
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) {
println!(
"{:?} entity: Transaction finished: {:?}",
self.entity_type, finished_params
);
self.completion_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
}
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
println!(
"{:?} entity: Metadata received: {:?}",
self.entity_type, md_recvd_params
);
}
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) {
println!(
"{:?} entity: File segment {:?} received",
self.entity_type, segment_recvd_params
);
}
fn report_indication(&mut self, _id: &crate::TransactionId) {}
fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: EOF received for transaction {:?}",
self.entity_type, id
);
}
}
fn main() {
// Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone();
let stop_signal_ctrl = stop_signal_source.clone();
let completion_signal_source = Arc::new(AtomicBool::new(false));
let completion_signal_source_main = completion_signal_source.clone();
let completion_signal_dest = Arc::new(AtomicBool::new(false));
let completion_signal_dest_main = completion_signal_dest.clone();
let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut file = OpenOptions::new()
.write(true)
.open(&srcfile)
.expect("opening file failed");
file.write_all(FILE_DATA.as_bytes())
.expect("writing file content failed");
let destdir = tempfile::tempdir().expect("creating temp directory failed");
let destfile = destdir.path().join("test.txt");
let local_cfg_source = LocalEntityConfig::new(
RUST_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let (source_tx, source_rx) = mpsc::channel::<PduWithInfo>();
let (dest_tx, dest_rx) = mpsc::channel::<PduWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg_python = RemoteEntityConfig::new_with_default_values(
PYTHON_ID.into(),
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let mut source_handler = SourceHandler::new(
local_cfg_source,
source_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg_python,
seq_count_provider,
);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source);
let local_cfg_dest = LocalEntityConfig::new(
RUST_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tx,
NativeFilestore::default(),
remote_cfg_python,
StdCheckTimerCreator::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);
let put_request = PutRequestOwned::new_regular_request(
PYTHON_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged),
Some(true),
)
.expect("put request creation failed");
let start = std::time::Instant::now();
let jh_source = thread::spawn(move || {
source_handler
.put_request(&put_request)
.expect("put request failed");
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match dest_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Source handler state machine possible in permanent loop");
}
}
});
let jh_dest = thread::spawn(move || {
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match source_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
panic!("Destination handler state machine possible in permanent loop");
}
}
});
loop {
if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed)
&& completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed)
{
let file = std::fs::read_to_string(destfile).expect("reading file failed");
assert_eq!(file, FILE_DATA);
// Stop the threads gracefully.
stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
if std::time::Instant::now() - start > Duration::from_secs(2) {
panic!("file transfer not finished in 2 seconds");
}
std::thread::sleep(Duration::from_millis(50));
}
jh_source.join().unwrap();
jh_dest.join().unwrap();
}

View File

@ -1535,8 +1535,7 @@ mod tests {
TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
); );
tb.set_check_timer_expired(); tb.set_check_timer_expired();
tb tb.handler
.handler
.state_machine_no_packet(&mut user) .state_machine_no_packet(&mut user)
.expect("fsm error"); .expect("fsm error");
tb.state_check( tb.state_check(
@ -1544,8 +1543,7 @@ mod tests {
TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
); );
tb.set_check_timer_expired(); tb.set_check_timer_expired();
tb tb.handler
.handler
.state_machine_no_packet(&mut user) .state_machine_no_packet(&mut user)
.expect("fsm error"); .expect("fsm error");
tb.state_check(State::Idle, TransactionStep::Idle); tb.state_check(State::Idle, TransactionStep::Idle);

View File

@ -13,8 +13,8 @@ use cfdp::{
request::{PutRequestOwned, StaticPutRequestCacher}, request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler, source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
DummyPduProvider, EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig,
RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider, StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
}; };
use spacepackets::{ use spacepackets::{
cfdp::{ChecksumType, ConditionCode, TransmissionMode}, cfdp::{ChecksumType, ConditionCode, TransmissionMode},