extend mio client to allow reconnection

This commit is contained in:
Robin Müller 2024-04-16 13:08:10 +02:00
parent b5e048a13b
commit 8ce305491b
6 changed files with 100 additions and 25 deletions

8
Cargo.lock generated
View File

@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]] [[package]]
name = "satrs" name = "satrs"
version = "0.2.0-rc.0" version = "0.2.0-rc.0"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9"
dependencies = [ dependencies = [
"bus", "bus",
"cobs", "cobs",
@ -615,7 +615,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-mib" name = "satrs-mib"
version = "0.1.1" version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9"
dependencies = [ dependencies = [
"csv", "csv",
"satrs-mib-codegen", "satrs-mib-codegen",
@ -627,7 +627,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-mib-codegen" name = "satrs-mib-codegen"
version = "0.1.1" version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -637,7 +637,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-shared" name = "satrs-shared"
version = "0.1.3" version = "0.1.3"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9"
dependencies = [ dependencies = [
"serde", "serde",
"spacepackets", "spacepackets",

View File

@ -19,13 +19,13 @@ mio = "0.8"
[dependencies.satrs] [dependencies.satrs]
version = "0.2.0-rc.0" version = "0.2.0-rc.0"
git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
branch = "rework-tmtc-modules" branch = "main"
features = ["test_util"] features = ["test_util"]
[dependencies.satrs-mib] [dependencies.satrs-mib]
version = "0.1.1" version = "0.1.1"
git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
branch = "rework-tmtc-modules" branch = "main"
[dev-dependencies] [dev-dependencies]
env_logger = "0.11" env_logger = "0.11"

View File

@ -31,6 +31,6 @@ impl SpacePacketValidator for SimpleSpValidator {
if self.valid_ids.contains(&sp_header.packet_id) { if self.valid_ids.contains(&sp_header.packet_id) {
return SpValidity::Valid; return SpValidity::Valid;
} }
SpValidity::Ignore SpValidity::Skip
} }
} }

View File

@ -1,4 +1,4 @@
use std::io::{self, Read}; use std::io::{self, Read, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc; use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
@ -28,9 +28,11 @@ pub struct TcpSppClient {
id: ComponentId, id: ComponentId,
poll: Poll, poll: Poll,
events: Events, events: Events,
client: TcpStream, // Optional to allow periodic reconnection attempts on the TCP server.
client: Option<TcpStream>,
read_buf: [u8; 4096], read_buf: [u8; 4096],
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>, tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
server_addr: SocketAddr,
tc_source_tx: mpsc::Sender<PacketAsVec>, tc_source_tx: mpsc::Sender<PacketAsVec>,
validator: SimpleSpValidator, validator: SimpleSpValidator,
} }
@ -42,28 +44,75 @@ impl TcpSppClient {
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>, tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId], valid_ids: &'static [PacketId],
) -> io::Result<Self> { ) -> io::Result<Self> {
let poll = Poll::new()?; let mut poll = Poll::new()?;
let events = Events::with_capacity(128); let events = Events::with_capacity(128);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); let server_addr =
let mut client = TcpStream::connect(addr)?; SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT);
poll.registry().register( let client = Self::attempt_connection(&mut poll, &server_addr);
&mut client, if client.is_err() {
Token(0), log::warn!(
Interest::READABLE | Interest::WRITABLE, "connection to TCP server {} failed: {}",
)?; server_addr,
client.unwrap_err()
);
return Ok(Self {
id,
poll,
events,
client: None,
read_buf: [0; 4096],
server_addr,
tm_tcp_client_rx,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
});
}
Ok(Self { Ok(Self {
id, id,
poll, poll,
events, events,
client, client: Some(client.unwrap()),
read_buf: [0; 4096], read_buf: [0; 4096],
server_addr,
tm_tcp_client_rx, tm_tcp_client_rx,
tc_source_tx, tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
}) })
} }
pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result<TcpStream> {
let mut client = TcpStream::connect(*server_addr)?;
poll.registry().register(
&mut client,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(client)
}
pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> {
if self.client.is_some() {
return self.perform_regular_operation();
} else {
let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr);
match client_result {
Ok(client) => {
self.client = Some(client);
self.perform_regular_operation()?;
}
Err(ref e) => {
log::warn!(
"connection to TCP server {} failed: {}",
self.server_addr,
e
);
}
}
}
Ok(())
}
pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> {
self.poll.poll( self.poll.poll(
&mut self.events, &mut self.events,
Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), Some(Duration::from_millis(STOP_CHECK_FREQUENCY)),
@ -75,8 +124,7 @@ impl TcpSppClient {
self.read_from_server()?; self.read_from_server()?;
} }
if event.is_writable() { if event.is_writable() {
// Read packets from a queue and send them here.. self.write_to_server()?;
// self.client.write_all(b"hello")?;
} }
} }
} }
@ -84,7 +132,11 @@ impl TcpSppClient {
} }
pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> {
match self.client.read(&mut self.read_buf) { let client = self
.client
.as_mut()
.expect("TCP stream invalid when it should not be");
match client.read(&mut self.read_buf) {
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()),
Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?,
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
@ -92,6 +144,28 @@ impl TcpSppClient {
Ok(()) Ok(())
} }
pub fn write_to_server(&mut self) -> io::Result<()> {
let client = self
.client
.as_mut()
.expect("TCP stream invalid when it should not be");
loop {
match self.tm_tcp_client_rx.try_recv() {
Ok(tm) => {
client.write_all(&tm.packet)?;
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::error!("TM sender to TCP client has disconnected");
break;
}
},
}
}
Ok(())
}
pub fn handle_read_bytstream( pub fn handle_read_bytstream(
&mut self, &mut self,
read_bytes: usize, read_bytes: usize,

View File

@ -46,7 +46,7 @@ fn main() {
let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (tm_server_tx, tm_server_rx) = mpsc::channel(); let (tm_tcp_server_tx, tm_tcp_server_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel();
@ -129,7 +129,7 @@ fn main() {
let mut udp_tmtc_server = UdpTmtcServer { let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server, udp_tc_server,
tm_handler: DynamicUdpTmHandler { tm_handler: DynamicUdpTmHandler {
tm_rx: tm_server_rx, tm_rx: tm_tcp_server_rx,
}, },
}; };
@ -153,7 +153,8 @@ fn main() {
let mut tm_funnel = TmFunnelDynamic::new( let mut tm_funnel = TmFunnelDynamic::new(
sync_tm_tcp_source, sync_tm_tcp_source,
tm_funnel_rx, tm_funnel_rx,
tm_server_tx, tm_tcp_server_tx,
tm_tcp_client_tx,
stop_signal.clone(), stop_signal.clone(),
); );

View File

@ -113,7 +113,7 @@ impl TmFunnelDynamic {
.expect("Creating TM zero copy writer failed"); .expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer); self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet); self.common.sync_tm_tcp_source.add_tm(&tm.packet);
let result = self.tm_udp_server_tx.send(tm); let result = self.tm_udp_server_tx.send(tm.clone());
if result.is_err() { if result.is_err() {
log::error!("TM UDP server has disconnected"); log::error!("TM UDP server has disconnected");
} }