it's finally working properly

This commit is contained in:
Robin Müller 2024-04-19 17:19:33 +02:00
parent 3461ea6e1e
commit 5cc56bc8f4
3 changed files with 80 additions and 49 deletions

View File

@ -4,7 +4,8 @@ import abc
import time import time
import select import select
import logging import logging
from threading import Thread from typing import Any
from threading import Event, Thread
from collections import deque from collections import deque
from multiprocessing import Queue from multiprocessing import Queue
from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId
@ -21,6 +22,7 @@ LOG_LEVEL = logging.DEBUG
TC_QUEUE = Queue() TC_QUEUE = Queue()
TM_QUEUE = Queue() TM_QUEUE = Queue()
KILL_SIGNAL = Event()
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -33,10 +35,17 @@ def main():
datefmt="%Y-%m-%d %H:%M:%S", datefmt="%Y-%m-%d %H:%M:%S",
) )
print("Starting OPS-SAT ground TMTC server") print("Starting OPS-SAT ground TMTC server")
KILL_SIGNAL.clear()
ops_sat_thread = OpsSatServer() ops_sat_thread = OpsSatServer()
ops_sat_thread.start() ops_sat_thread.start()
tmtc_thread = TmtcServer() tmtc_thread = TmtcServer()
tmtc_thread.start() tmtc_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
_LOGGER.info("Shutting down server gracefully")
KILL_SIGNAL.set()
ops_sat_thread.join() ops_sat_thread.join()
tmtc_thread.join() tmtc_thread.join()
@ -47,6 +56,8 @@ class BaseServer(Thread):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_addr = ("0.0.0.0", port) server_addr = ("0.0.0.0", port)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False)
self.server_socket.settimeout(0.4)
self.server_socket.bind(server_addr) self.server_socket.bind(server_addr)
super().__init__() super().__init__()
@ -54,29 +65,35 @@ class BaseServer(Thread):
self.run_sync_version() self.run_sync_version()
def run_sync_version(self) -> None: def run_sync_version(self) -> None:
while True: self.server_socket.listen()
self.server_socket.listen() while True and not KILL_SIGNAL.is_set():
(conn_socket, conn_addr) = self.server_socket.accept() try:
conn_socket.setblocking(False) (conn_socket, conn_addr) = self.server_socket.accept()
print(f"{self.log_prefix} TCP client {conn_addr} connected") self.handle_connection(conn_socket, conn_addr)
analysis_deque = deque() except TimeoutError:
while True: continue
conn_socket.settimeout(0.2)
try: def handle_connection(self, conn_socket: socket.socket, conn_addr: Any):
bytes_recvd = conn_socket.recv(4096) conn_socket.setblocking(False)
if len(bytes_recvd) > 0: print(f"{self.log_prefix} TCP client {conn_addr} connected")
_LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") analysis_deque = deque()
analysis_deque.append(bytes_recvd) while True and not KILL_SIGNAL.is_set():
elif len(bytes_recvd) == 0: conn_socket.settimeout(0.2)
self.handle_read_bytestream(analysis_deque) try:
break bytes_recvd = conn_socket.recv(4096)
else: if len(bytes_recvd) > 0:
print("error receiving data from TCP client") _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}")
except BlockingIOError: analysis_deque.append(bytes_recvd)
self.handle_timeout(conn_socket, analysis_deque) elif len(bytes_recvd) == 0:
time.sleep(0.2) self.handle_read_bytestream(analysis_deque)
except TimeoutError: break
self.handle_timeout(conn_socket, analysis_deque) else:
print("error receiving data from TCP client")
except BlockingIOError:
self.handle_timeout(conn_socket, analysis_deque)
time.sleep(0.2)
except TimeoutError:
self.handle_timeout(conn_socket, analysis_deque)
def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque): def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque):
if len(analysis_deque) > 0: if len(analysis_deque) > 0:

View File

@ -1,4 +1,3 @@
use socket2::{Domain, Socket, Type};
use std::io::{self, Read}; use std::io::{self, Read};
use std::net::TcpStream as StdTcpStream; use std::net::TcpStream as StdTcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -212,7 +211,7 @@ impl TcpSppClientMio {
pub struct TcpSppClientStd { pub struct TcpSppClientStd {
common: TcpSppClientCommon, common: TcpSppClientCommon,
// Optional to allow periodic reconnection attempts on the TCP server. // Optional to allow periodic reconnection attempts on the TCP server.
client: Option<StdTcpStream>, stream: Option<StdTcpStream>,
} }
impl TcpSppClientStd { impl TcpSppClientStd {
@ -224,23 +223,16 @@ impl TcpSppClientStd {
port: u16, port: u16,
) -> io::Result<Self> { ) -> io::Result<Self> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
// Create a TCP listener bound to two addresses. let client = match StdTcpStream::connect(server_addr) {
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; Ok(stream) => {
socket.set_reuse_address(true)?; stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?;
socket.set_nonblocking(true)?; Some(stream)
socket.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?;
let client = match socket.connect(&server_addr.into()) {
Ok(_) => {
let client: StdTcpStream = socket.into();
Some(client)
} }
Err(e) => { Err(e) => {
log::error!("error connecting to server: {}", e); log::warn!("error connecting to server: {}", e);
None None
} }
}; };
// let tcp_stream = StdTcpStream::connect_timeout(addr, timeout)
Ok(Self { Ok(Self {
common: TcpSppClientCommon { common: TcpSppClientCommon {
id, id,
@ -250,28 +242,53 @@ impl TcpSppClientStd {
tc_source_tx, tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
}, },
client, stream: client,
})
}
pub fn attempt_connect(&mut self, log_error: bool) -> io::Result<bool> {
Ok(match StdTcpStream::connect(self.common.server_addr) {
Ok(stream) => {
stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?;
self.stream = Some(stream);
true
}
Err(e) => {
if log_error {
log::warn!("error connecting to server: {}", e);
}
false
}
}) })
} }
pub fn operation(&mut self) -> Result<(), ClientError> { pub fn operation(&mut self) -> Result<(), ClientError> {
if let Some(client) = &mut self.client { if let Some(client) = &mut self.stream {
log::info!("client is valid");
match client.read(&mut self.common.read_buf) { match client.read(&mut self.common.read_buf) {
Ok(0) => println!("read 0 bytes"), // Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
self.stream = None;
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => { Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{ {
log::info!("timeout shit");
self.common.write_to_server(client)?; self.common.write_to_server(client)?;
return Ok(()); return Ok(());
} }
log::warn!("server error: {e:?}");
if e.kind() == io::ErrorKind::ConnectionReset {
self.stream = None;
}
return Err(e.into()); return Err(e.into());
} }
} }
} else { } else {
// TODO: Reconnect attempt occasionally. if self.attempt_connect(false)? {
log::info!("reconnected to server succesfully");
return self.operation();
}
std::thread::sleep(STOP_CHECK_FREQUENCY); std::thread::sleep(STOP_CHECK_FREQUENCY);
} }

View File

@ -9,12 +9,13 @@ use log::info;
use ops_sat_rs::config::{ use ops_sat_rs::config::{
cfg_file::create_app_config, cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
VALID_PACKET_ID_LIST, VALID_PACKET_ID_LIST,
}; };
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::tmtc::tm_sink::TmFunnelDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{controller::ExperimentController, pus::test::create_test_service};
use crate::{ use crate::{
@ -22,10 +23,6 @@ use crate::{
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
logger::setup_logger, logger::setup_logger,
}; };
use crate::{
interface::tcp_spp_client::TcpSppClientMio,
pus::{PusTcDistributor, PusTcMpscRouter},
};
use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic}; use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic};
use crate::{ use crate::{
pus::{action::create_action_service, stack::PusStack}, pus::{action::create_action_service, stack::PusStack},