MIO tcp client

This commit is contained in:
Robin Müller 2024-04-15 16:42:48 +02:00
parent 8313a0b26c
commit 192e701785
Signed by: muellerr
GPG Key ID: A649FB78196E3849
5 changed files with 47 additions and 16 deletions

8
Cargo.lock generated
View File

@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "satrs"
version = "0.2.0-rc.0"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669"
dependencies = [
"bus",
"cobs",
@ -615,7 +615,7 @@ dependencies = [
[[package]]
name = "satrs-mib"
version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669"
dependencies = [
"csv",
"satrs-mib-codegen",
@ -627,7 +627,7 @@ dependencies = [
[[package]]
name = "satrs-mib-codegen"
version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669"
dependencies = [
"proc-macro2",
"quote",
@ -637,7 +637,7 @@ dependencies = [
[[package]]
name = "satrs-shared"
version = "0.1.3"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669"
dependencies = [
"serde",
"spacepackets",

View File

@ -129,6 +129,8 @@ pub mod components {
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::UdpServer as u32);
pub const TCP_SERVER: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpServer as u32);
pub const TCP_SPP_CLIENT: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpSppClient as u32);
}
pub mod tasks {

View File

@ -90,7 +90,7 @@ impl TcpSppClient {
&mut self.read_buf[..read_bytes],
&[EXPERIMENT_PACKET_ID].as_slice(),
self.id,
&mut self.tc_source_tx,
&self.tc_source_tx,
&mut dummy,
)?;
Ok(())

View File

@ -7,7 +7,7 @@ use std::{
use log::info;
use ops_sat_rs::config::{
components::{CONTROLLER_ID, TCP_SERVER, UDP_SERVER},
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
EXPERIMENT_APID,
};
@ -17,7 +17,6 @@ use satrs::{
spacepackets::PacketId,
};
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
use crate::tmtc::tc_source::TcSourceTaskDynamic;
use crate::tmtc::tm_sink::TmFunnelDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service};
@ -26,6 +25,10 @@ use crate::{
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
logger::setup_logger,
};
use crate::{
interface::tcp_spp_client::TcpSppClient,
pus::{PusTcDistributor, PusTcMpscRouter},
};
use crate::{
pus::{action::create_action_service, stack::PusStack},
requests::GenericRequestRouter,
@ -162,6 +165,9 @@ fn main() {
stop_signal.clone(),
);
let mut tcp_spp_client = TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx)
.expect("creating TCP SPP client failed");
info!("Starting CTRL task");
let ctrl_stop_signal = stop_signal.clone();
let jh_ctrl_thread = thread::Builder::new()
@ -192,15 +198,33 @@ fn main() {
})
.unwrap();
let tcp_stop_signal = stop_signal.clone();
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("ops-sat tcp".to_string())
let tcp_server_stop_signal = stop_signal.clone();
info!("Starting TCP server task");
let jh_tcp_server = thread::Builder::new()
.name("ops-sat tcp-server".to_string())
.spawn(move || {
info!("Running TCP server on port {SERVER_PORT}");
loop {
tcp_server.periodic_operation();
if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
if tcp_server_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}
})
.unwrap();
// We could also move this to the existing TCP server thread, but we would have to adapt
// the server code for this so we do not block anymore and we pause manually if both the client
// and server are IDLE and have nothing to do..
let tcp_client_stop_signal = stop_signal.clone();
info!("Starting TCP SPP client task");
let jh_tcp_client = thread::Builder::new()
.name("ops-sat tcp-client".to_string())
.spawn(move || {
info!("Running TCP SPP client");
loop {
let _result = tcp_spp_client.periodic_operation();
if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}
@ -238,9 +262,12 @@ fn main() {
jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");
jh_tcp
jh_tcp_server
.join()
.expect("Joining TCP TMTC server thread failed");
jh_tcp_client
.join()
.expect("Joining TCP TMTC client thread failed");
jh_tm_funnel
.join()
.expect("Joining TM Funnel thread failed");

View File

@ -15,12 +15,13 @@ use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError,
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter,
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
};
use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{PusPacket, PusServiceId};
use satrs::tmtc::PacketAsVec;
use satrs::ComponentId;
use std::fmt::Debug;
use std::sync::mpsc::{self, Sender};
@ -102,14 +103,15 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
.acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp())
.expect("Acceptance success failure");
let service = PusServiceId::try_from(pus_tc.service());
let tc_in_memory = TcInMemory::Vec(PacketAsVec::new(sender_id, tc));
match service {
Ok(standard_service) => match standard_service {
PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken {
tc_in_memory: tc.into(),
tc_in_memory,
token: Some(accepted_token.into()),
})?,
PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken {
tc_in_memory: tc.into(),
tc_in_memory,
token: Some(accepted_token.into()),
})?,
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {