diff --git a/Cargo.lock b/Cargo.lock index 570d8ac..4e5f4ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,7 +627,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "proc-macro2", "quote", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "serde", "spacepackets", diff --git a/Cargo.toml b/Cargo.toml index fdfac9a..a082321 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,13 +19,13 @@ mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.0" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -branch = "rework-tmtc-modules" +branch = "main" features = ["test_util"] [dependencies.satrs-mib] version = "0.1.1" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -branch = "rework-tmtc-modules" +branch = "main" [dev-dependencies] env_logger = "0.11" diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 5da763d..6b6038c 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -31,6 +31,6 @@ impl SpacePacketValidator for SimpleSpValidator { if self.valid_ids.contains(&sp_header.packet_id) { return SpValidity::Valid; } - SpValidity::Ignore + SpValidity::Skip } } diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 894c693..847f000 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -1,4 +1,4 @@ -use std::io::{self, Read}; +use std::io::{self, Read, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; use std::time::Duration; @@ -28,9 +28,11 @@ pub struct TcpSppClient { id: ComponentId, poll: Poll, events: Events, - client: TcpStream, + // Optional to allow periodic reconnection attempts on the TCP server. + client: Option, read_buf: [u8; 4096], tm_tcp_client_rx: mpsc::Receiver, + server_addr: SocketAddr, tc_source_tx: mpsc::Sender, validator: SimpleSpValidator, } @@ -42,28 +44,75 @@ impl TcpSppClient { tm_tcp_client_rx: mpsc::Receiver, valid_ids: &'static [PacketId], ) -> io::Result { - let poll = Poll::new()?; + let mut poll = Poll::new()?; let events = Events::with_capacity(128); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); - let mut client = TcpStream::connect(addr)?; - poll.registry().register( - &mut client, - Token(0), - Interest::READABLE | Interest::WRITABLE, - )?; + let server_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); + let client = Self::attempt_connection(&mut poll, &server_addr); + if client.is_err() { + log::warn!( + "connection to TCP server {} failed: {}", + server_addr, + client.unwrap_err() + ); + return Ok(Self { + id, + poll, + events, + client: None, + read_buf: [0; 4096], + server_addr, + tm_tcp_client_rx, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }); + } Ok(Self { id, poll, events, - client, + client: Some(client.unwrap()), read_buf: [0; 4096], + server_addr, tm_tcp_client_rx, tc_source_tx, validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }) } + pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result { + let mut client = TcpStream::connect(*server_addr)?; + poll.registry().register( + &mut client, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; + Ok(client) + } + pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { + if self.client.is_some() { + return self.perform_regular_operation(); + } else { + let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr); + match client_result { + Ok(client) => { + self.client = Some(client); + self.perform_regular_operation()?; + } + Err(ref e) => { + log::warn!( + "connection to TCP server {} failed: {}", + self.server_addr, + e + ); + } + } + } + Ok(()) + } + + pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> { self.poll.poll( &mut self.events, Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), @@ -75,8 +124,7 @@ impl TcpSppClient { self.read_from_server()?; } if event.is_writable() { - // Read packets from a queue and send them here.. - // self.client.write_all(b"hello")?; + self.write_to_server()?; } } } @@ -84,7 +132,11 @@ impl TcpSppClient { } pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { - match self.client.read(&mut self.read_buf) { + let client = self + .client + .as_mut() + .expect("TCP stream invalid when it should not be"); + match client.read(&mut self.read_buf) { Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, Err(e) => return Err(e.into()), @@ -92,6 +144,28 @@ impl TcpSppClient { Ok(()) } + pub fn write_to_server(&mut self) -> io::Result<()> { + let client = self + .client + .as_mut() + .expect("TCP stream invalid when it should not be"); + loop { + match self.tm_tcp_client_rx.try_recv() { + Ok(tm) => { + client.write_all(&tm.packet)?; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::error!("TM sender to TCP client has disconnected"); + break; + } + }, + } + } + Ok(()) + } + pub fn handle_read_bytstream( &mut self, read_bytes: usize, diff --git a/src/main.rs b/src/main.rs index 038cc4e..afc0843 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,7 +46,7 @@ fn main() { let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); - let (tm_server_tx, tm_server_rx) = mpsc::channel(); + let (tm_tcp_server_tx, tm_tcp_server_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel(); @@ -129,7 +129,7 @@ fn main() { let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_handler: DynamicUdpTmHandler { - tm_rx: tm_server_rx, + tm_rx: tm_tcp_server_rx, }, }; @@ -153,7 +153,8 @@ fn main() { let mut tm_funnel = TmFunnelDynamic::new( sync_tm_tcp_source, tm_funnel_rx, - tm_server_tx, + tm_tcp_server_tx, + tm_tcp_client_tx, stop_signal.clone(), ); diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index 642b106..fdb8043 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -113,7 +113,7 @@ impl TmFunnelDynamic { .expect("Creating TM zero copy writer failed"); self.common.apply_packet_processing(zero_copy_writer); self.common.sync_tm_tcp_source.add_tm(&tm.packet); - let result = self.tm_udp_server_tx.send(tm); + let result = self.tm_udp_server_tx.send(tm.clone()); if result.is_err() { log::error!("TM UDP server has disconnected"); }