MIO TCP client #6

Merged
muellerr merged 7 commits from mio-tcp-client into main 2024-04-16 14:50:37 +02:00
3 changed files with 25 additions and 9 deletions
Showing only changes of commit b5e048a13b - Show all commits

View File

@ -30,6 +30,7 @@ pub struct TcpSppClient {
events: Events, events: Events,
client: TcpStream, client: TcpStream,
read_buf: [u8; 4096], read_buf: [u8; 4096],
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
tc_source_tx: mpsc::Sender<PacketAsVec>, tc_source_tx: mpsc::Sender<PacketAsVec>,
validator: SimpleSpValidator, validator: SimpleSpValidator,
} }
@ -38,6 +39,7 @@ impl TcpSppClient {
pub fn new( pub fn new(
id: ComponentId, id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>, tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId], valid_ids: &'static [PacketId],
) -> io::Result<Self> { ) -> io::Result<Self> {
let poll = Poll::new()?; let poll = Poll::new()?;
@ -55,6 +57,7 @@ impl TcpSppClient {
events, events,
client, client,
read_buf: [0; 4096], read_buf: [0; 4096],
tm_tcp_client_rx,
tc_source_tx, tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
}) })

View File

@ -47,6 +47,7 @@ fn main() {
let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (tm_server_tx, tm_server_rx) = mpsc::channel(); let (tm_server_tx, tm_server_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel();
// let (pus_event_tx, pus_event_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel();
@ -162,9 +163,13 @@ fn main() {
stop_signal.clone(), stop_signal.clone(),
); );
let mut tcp_spp_client = let mut tcp_spp_client = TcpSppClient::new(
TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx, VALID_PACKET_ID_LIST) TCP_SPP_CLIENT.id(),
.expect("creating TCP SPP client failed"); tc_source_tx,
tm_tcp_client_rx,
VALID_PACKET_ID_LIST,
)
.expect("creating TCP SPP client failed");
info!("Starting CTRL task"); info!("Starting CTRL task");
let ctrl_stop_signal = stop_signal.clone(); let ctrl_stop_signal = stop_signal.clone();

View File

@ -77,7 +77,8 @@ impl TmFunnelCommon {
pub struct TmFunnelDynamic { pub struct TmFunnelDynamic {
common: TmFunnelCommon, common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>, tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::Sender<PacketAsVec>, tm_udp_server_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_tx: mpsc::Sender<PacketAsVec>,
stop_signal: Arc<AtomicBool>, stop_signal: Arc<AtomicBool>,
} }
@ -85,13 +86,15 @@ impl TmFunnelDynamic {
pub fn new( pub fn new(
sync_tm_tcp_source: SyncTcpTmSource, sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>, tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::Sender<PacketAsVec>, tm_udp_server_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_tx: mpsc::Sender<PacketAsVec>,
stop_signal: Arc<AtomicBool>, stop_signal: Arc<AtomicBool>,
) -> Self { ) -> Self {
Self { Self {
common: TmFunnelCommon::new(sync_tm_tcp_source), common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx, tm_funnel_rx,
tm_server_tx, tm_udp_server_tx,
tm_tcp_client_tx,
stop_signal, stop_signal,
} }
} }
@ -110,9 +113,14 @@ impl TmFunnelDynamic {
.expect("Creating TM zero copy writer failed"); .expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer); self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet); self.common.sync_tm_tcp_source.add_tm(&tm.packet);
self.tm_server_tx let result = self.tm_udp_server_tx.send(tm);
.send(tm) if result.is_err() {
.expect("Sending TM to server failed"); log::error!("TM UDP server has disconnected");
}
let result = self.tm_tcp_client_tx.send(tm);
if result.is_err() {
log::error!("TM TCP client has disconnected");
}
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break; break;
} }