From 192e701785abcdffcf140b6adaffe9f7988b31f4 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 15 Apr 2024 16:42:48 +0200 Subject: [PATCH] MIO tcp client --- Cargo.lock | 8 +++--- src/config.rs | 2 ++ src/interface/tcp_spp_client.rs | 2 +- src/main.rs | 43 +++++++++++++++++++++++++++------ src/pus/mod.rs | 8 +++--- 5 files changed, 47 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bec391..c83fe31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,7 +627,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "proc-macro2", "quote", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "serde", "spacepackets", diff --git a/src/config.rs b/src/config.rs index 12c334f..4295e75 100644 --- a/src/config.rs +++ b/src/config.rs @@ -129,6 +129,8 @@ pub mod components { UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::UdpServer as u32); pub const TCP_SERVER: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpServer as u32); + pub const TCP_SPP_CLIENT: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpSppClient as u32); } pub mod tasks { diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index d8d6936..20c15ee 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -90,7 +90,7 @@ impl TcpSppClient { &mut self.read_buf[..read_bytes], &[EXPERIMENT_PACKET_ID].as_slice(), self.id, - &mut self.tc_source_tx, + &self.tc_source_tx, &mut dummy, )?; Ok(()) diff --git a/src/main.rs b/src/main.rs index 67543c2..eb669d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use std::{ use log::info; use ops_sat_rs::config::{ - components::{CONTROLLER_ID, TCP_SERVER, UDP_SERVER}, + components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, EXPERIMENT_APID, }; @@ -17,7 +17,6 @@ use satrs::{ spacepackets::PacketId, }; -use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::tmtc::tc_source::TcSourceTaskDynamic; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; @@ -26,6 +25,10 @@ use crate::{ interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, }; +use crate::{ + interface::tcp_spp_client::TcpSppClient, + pus::{PusTcDistributor, PusTcMpscRouter}, +}; use crate::{ pus::{action::create_action_service, stack::PusStack}, requests::GenericRequestRouter, @@ -162,6 +165,9 @@ fn main() { stop_signal.clone(), ); + let mut tcp_spp_client = TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx) + .expect("creating TCP SPP client failed"); + info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); let jh_ctrl_thread = thread::Builder::new() @@ -192,15 +198,33 @@ fn main() { }) .unwrap(); - let tcp_stop_signal = stop_signal.clone(); - info!("Starting TCP task"); - let jh_tcp = thread::Builder::new() - .name("ops-sat tcp".to_string()) + let tcp_server_stop_signal = stop_signal.clone(); + info!("Starting TCP server task"); + let jh_tcp_server = thread::Builder::new() + .name("ops-sat tcp-server".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); - if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + if tcp_server_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + }) + .unwrap(); + + // We could also move this to the existing TCP server thread, but we would have to adapt + // the server code for this so we do not block anymore and we pause manually if both the client + // and server are IDLE and have nothing to do.. + let tcp_client_stop_signal = stop_signal.clone(); + info!("Starting TCP SPP client task"); + let jh_tcp_client = thread::Builder::new() + .name("ops-sat tcp-client".to_string()) + .spawn(move || { + info!("Running TCP SPP client"); + loop { + let _result = tcp_spp_client.periodic_operation(); + if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } @@ -238,9 +262,12 @@ fn main() { jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); - jh_tcp + jh_tcp_server .join() .expect("Joining TCP TMTC server thread failed"); + jh_tcp_client + .join() + .expect("Joining TCP TMTC client thread failed"); jh_tm_funnel .join() .expect("Joining TM Funnel thread failed"); diff --git a/src/pus/mod.rs b/src/pus/mod.rs index bc323b0..2f88a26 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -15,12 +15,13 @@ use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, - PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, + PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{PusPacket, PusServiceId}; +use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use std::fmt::Debug; use std::sync::mpsc::{self, Sender}; @@ -102,14 +103,15 @@ impl PusTcDistributor { .acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp()) .expect("Acceptance success failure"); let service = PusServiceId::try_from(pus_tc.service()); + let tc_in_memory = TcInMemory::Vec(PacketAsVec::new(sender_id, tc)); match service { Ok(standard_service) => match standard_service { PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken { - tc_in_memory: tc.into(), + tc_in_memory, token: Some(accepted_token.into()), })?, PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken { - tc_in_memory: tc.into(), + tc_in_memory, token: Some(accepted_token.into()), })?, // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {