simplified networking code

This commit is contained in:
Robin Müller 2024-04-18 16:44:46 +02:00
parent 948c42c7c2
commit 7a86078223
Signed by: muellerr
GPG Key ID: A649FB78196E3849
8 changed files with 95 additions and 105 deletions

1
Cargo.lock generated
View File

@ -485,7 +485,6 @@ dependencies = [
"humantime", "humantime",
"lazy_static", "lazy_static",
"log", "log",
"mio",
"num_enum", "num_enum",
"satrs", "satrs",
"satrs-mib", "satrs-mib",

View File

@ -16,7 +16,9 @@ strum = { version = "0.26", features = ["derive"] }
thiserror = "1" thiserror = "1"
derive-new = "0.6" derive-new = "0.6"
num_enum = "0.7" num_enum = "0.7"
mio = "0.8"
# socket2 = "0.5"
# mio = "0.8"
[dependencies.satrs] [dependencies.satrs]
version = "0.2.0-rc.3" version = "0.2.0-rc.3"

40
pytmtc/server-test.py Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python3
import socket
from spacepackets.ecss.tc import PusTelecommand
EXP_ID = 278
APID = 1024 + EXP_ID
SEND_PING_ONCE = True
def main():
global SEND_PING_ONCE
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_addr = ("localhost", 4096)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(server_addr)
while True:
server_socket.listen()
(conn_socket, conn_addr) = server_socket.accept()
print("TCP client {} connected", conn_addr)
while True:
bytes_recvd = conn_socket.recv(4096)
if len(bytes_recvd) > 0:
print(f"Received bytes from TCP client: {bytes_recvd.decode()}")
elif len(bytes_recvd) == 0:
break
else:
print("error receiving data from TCP client")
if SEND_PING_ONCE:
print("sending back ping")
ping_tc = PusTelecommand(service=17, subservice=1, seq_count=0, apid=APID)
conn_socket.sendall(ping_tc.pack())
SEND_PING_ONCE = False
conn_socket.close()
continue
# server_socket.close()
if __name__ == "__main__":
main()

View File

@ -215,11 +215,14 @@ pub mod components {
} }
pub mod tasks { pub mod tasks {
use std::time::Duration;
pub const FREQ_MS_UDP_TMTC: u64 = 200; pub const FREQ_MS_UDP_TMTC: u64 = 200;
pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const FREQ_MS_CTRL: u64 = 400; pub const FREQ_MS_CTRL: u64 = 400;
pub const STOP_CHECK_FREQUENCY: u64 = 400; pub const STOP_CHECK_FREQUENCY_MS: u64 = 400;
pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS);
} }

View File

@ -1,7 +1,6 @@
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
sync::{atomic::AtomicBool, mpsc, Arc, Mutex}, sync::{atomic::AtomicBool, mpsc, Arc, Mutex},
time::Duration,
}; };
use log::{info, warn}; use log::{info, warn};
@ -113,9 +112,7 @@ impl TcpTask {
} }
pub fn periodic_operation(&mut self) { pub fn periodic_operation(&mut self) {
let result = self let result = self.0.handle_all_connections(Some(STOP_CHECK_FREQUENCY));
.0
.handle_all_connections(Some(Duration::from_millis(STOP_CHECK_FREQUENCY)));
match result { match result {
Ok(_conn_result) => (), Ok(_conn_result) => (),
Err(e) => { Err(e) => {

View File

@ -3,21 +3,25 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc; use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
use mio::net::TcpStream;
use mio::{Events, Interest, Poll, Token};
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use ops_sat_rs::config::SPP_CLIENT_WIRETAPPING_RX; use ops_sat_rs::config::SPP_CLIENT_WIRETAPPING_RX;
use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets;
use satrs::queue::GenericSendError; use satrs::queue::GenericSendError;
use satrs::spacepackets::PacketId; use satrs::spacepackets::PacketId;
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
use satrs::ComponentId; use satrs::ComponentId;
use std::net::TcpStream;
use thiserror::Error; use thiserror::Error;
use super::{SimpleSpValidator, TcpComponent}; use super::{SimpleSpValidator, TcpComponent};
#[derive(Debug)]
pub enum ConnectionResult {
Connected(TcpStream),
Timeout,
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum PacketForwardingError { pub enum ClientError {
#[error("send error: {0}")] #[error("send error: {0}")]
Send(#[from] GenericSendError), Send(#[from] GenericSendError),
#[error("io error: {0}")] #[error("io error: {0}")]
@ -26,10 +30,6 @@ pub enum PacketForwardingError {
pub struct TcpSppClient { pub struct TcpSppClient {
id: ComponentId, id: ComponentId,
poll: Poll,
events: Events,
// Optional to allow periodic reconnection attempts on the TCP server.
client: Option<TcpStream>,
read_buf: [u8; 4096], read_buf: [u8; 4096],
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>, tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
server_addr: SocketAddr, server_addr: SocketAddr,
@ -45,33 +45,9 @@ impl TcpSppClient {
valid_ids: &'static [PacketId], valid_ids: &'static [PacketId],
port: u16, port: u16,
) -> io::Result<Self> { ) -> io::Result<Self> {
let mut poll = Poll::new()?;
let events = Events::with_capacity(128);
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let client = Self::attempt_connection(&mut poll, &server_addr);
if client.is_err() {
log::warn!(
"connection to TCP server {} failed: {}",
server_addr,
client.unwrap_err()
);
return Ok(Self {
id,
poll,
events,
client: None,
read_buf: [0; 4096],
server_addr,
tm_tcp_client_rx,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
});
}
Ok(Self { Ok(Self {
id, id,
poll,
events,
client: Some(client.unwrap()),
read_buf: [0; 4096], read_buf: [0; 4096],
server_addr, server_addr,
tm_tcp_client_rx, tm_tcp_client_rx,
@ -80,62 +56,45 @@ impl TcpSppClient {
}) })
} }
pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result<TcpStream> { pub fn attempt_connection(
let mut client = TcpStream::connect(*server_addr)?; &mut self,
poll.registry().register( timeout: Duration,
&mut client, ) -> Result<ConnectionResult, ClientError> {
Token(0), match TcpStream::connect_timeout(&self.server_addr, timeout) {
Interest::READABLE | Interest::WRITABLE, Ok(client) => Ok(ConnectionResult::Connected(client)),
)?; Err(e) => {
Ok(client) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut {
return Ok(ConnectionResult::Timeout);
}
Err(e.into())
}
}
} }
pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { pub fn operation(&mut self, timeout: Duration) {
if self.client.is_some() { match self.attempt_connection(timeout) {
return self.perform_regular_operation(); Ok(result) => {
} else { if let ConnectionResult::Connected(mut client) = result {
let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr); if let Err(e) = self.handle_client_operation(&mut client) {
match client_result { log::error!("error handling TCP client operation: {}", e)
Ok(client) => {
self.client = Some(client);
self.perform_regular_operation()?;
} }
Err(ref e) => { drop(client);
log::warn!( std::thread::sleep(timeout);
"connection to TCP server {} failed: {}", }
self.server_addr, }
e Err(e) => {
); log::error!("TCP client error: {}", e);
} }
} }
} }
pub fn handle_client_operation(&mut self, client: &mut TcpStream) -> Result<(), ClientError> {
self.read_from_server(client)?;
self.write_to_server(client)?;
Ok(()) Ok(())
} }
pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> { pub fn read_from_server(&mut self, client: &mut TcpStream) -> Result<(), ClientError> {
self.poll.poll(
&mut self.events,
Some(Duration::from_millis(STOP_CHECK_FREQUENCY)),
)?;
let events: Vec<mio::event::Event> = self.events.iter().cloned().collect();
for event in events {
if event.token() == Token(0) {
if event.is_readable() {
self.read_from_server()?;
}
if event.is_writable() {
self.write_to_server()?;
}
}
}
Ok(())
}
pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> {
let client = self
.client
.as_mut()
.expect("TCP stream invalid when it should not be");
match client.read(&mut self.read_buf) { match client.read(&mut self.read_buf) {
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()),
Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?,
@ -144,11 +103,7 @@ impl TcpSppClient {
Ok(()) Ok(())
} }
pub fn write_to_server(&mut self) -> io::Result<()> { pub fn write_to_server(&mut self, client: &mut TcpStream) -> io::Result<()> {
let client = self
.client
.as_mut()
.expect("TCP stream invalid when it should not be");
loop { loop {
match self.tm_tcp_client_rx.try_recv() { match self.tm_tcp_client_rx.try_recv() {
Ok(tm) => { Ok(tm) => {
@ -166,10 +121,7 @@ impl TcpSppClient {
Ok(()) Ok(())
} }
pub fn handle_read_bytstream( pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> {
&mut self,
read_bytes: usize,
) -> Result<(), PacketForwardingError> {
let mut dummy = 0; let mut dummy = 0;
if SPP_CLIENT_WIRETAPPING_RX { if SPP_CLIENT_WIRETAPPING_RX {
log::debug!( log::debug!(

View File

@ -9,7 +9,7 @@ use log::info;
use ops_sat_rs::config::{ use ops_sat_rs::config::{
cfg_file::create_app_config, cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
VALID_PACKET_ID_LIST, VALID_PACKET_ID_LIST,
}; };
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
@ -232,7 +232,7 @@ fn main() {
.spawn(move || { .spawn(move || {
info!("Running TCP SPP client"); info!("Running TCP SPP client");
loop { loop {
let _result = tcp_spp_client.periodic_operation(); tcp_spp_client.operation(STOP_CHECK_FREQUENCY);
if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break; break;
} }

View File

@ -1,6 +1,6 @@
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::{collections::HashMap, sync::mpsc, time::Duration}; use std::{collections::HashMap, sync::mpsc};
use log::info; use log::info;
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
@ -101,10 +101,7 @@ impl TmFunnelDynamic {
pub fn operation(&mut self) { pub fn operation(&mut self) {
loop { loop {
match self match self.tm_funnel_rx.recv_timeout(STOP_CHECK_FREQUENCY) {
.tm_funnel_rx
.recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY))
{
Ok(mut tm) => { Ok(mut tm) => {
// Read the TM, set sequence counter and message counter, and finally update // Read the TM, set sequence counter and message counter, and finally update
// the CRC. // the CRC.