From ddac5ceab3ee34f09764a3d4cb77183d957182d9 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 19 Apr 2024 17:40:38 +0200 Subject: [PATCH] Networking update --- .gitignore | 1 + Cargo.lock | 56 ++++- Cargo.toml | 2 + README.md | 16 +- pytmtc/archive/tcp_server.py | 176 ++++++++++++++ pytmtc/{main.py => pyclient.py} | 1 + pytmtc/pyserver.py | 185 +++++++++++++++ scripts/bld-deploy-remote.py | 12 +- src/config.rs | 83 ++++++- src/handlers/camera.rs | 18 +- src/interface/tcp_server.rs | 5 +- src/interface/tcp_spp_client.rs | 394 ++++++++++++++++++++------------ src/main.rs | 19 +- src/pus/test.rs | 1 - src/tmtc/tm_sink.rs | 7 +- templates/exp278.toml | 5 + 16 files changed, 790 insertions(+), 191 deletions(-) create mode 100644 pytmtc/archive/tcp_server.py rename pytmtc/{main.py => pyclient.py} (99%) create mode 100755 pytmtc/pyserver.py create mode 100644 templates/exp278.toml diff --git a/.gitignore b/.gitignore index f9e18df..6624bed 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ # Ignore logs folder generared by application. /logs +/exp278.toml diff --git a/Cargo.lock b/Cargo.lock index 8bfd1c1..18d474e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,8 +491,10 @@ dependencies = [ "satrs-mib", "serde", "serde_json", + "socket2", "strum", "thiserror", + "toml", ] [[package]] @@ -520,7 +522,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_edit", + "toml_edit 0.21.1", ] [[package]] @@ -593,9 +595,9 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" -version = "0.2.0-rc.1" +version = "0.2.0-rc.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d6f3286d35464fdc75dc846b663aaad4a81437a50e623053b1b4d481d782cd0" +checksum = "6aa9241e4d6cb0cc395927cfe653d8bc4a9cb6b2c27f28fec713d5e6ceb0ba23" dependencies = [ "bus", "cobs", @@ -693,6 +695,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +dependencies = [ + "serde", +] + [[package]] name = "smallvec" version = "0.6.14" @@ -798,11 +809,26 @@ dependencies = [ "syn 2.0.59", ] +[[package]] +name = "toml" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit 0.22.12", +] + [[package]] name = "toml_datetime" version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -812,7 +838,20 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap", "toml_datetime", - "winnow", + "winnow 0.5.40", +] + +[[package]] +name = "toml_edit" +version = "0.22.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow 0.6.6", ] [[package]] @@ -1050,6 +1089,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" +dependencies = [ + "memchr", +] + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/Cargo.toml b/Cargo.toml index 46ab7bf..3bd0973 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] fern = "0.6" +toml = "0.8" chrono = "0.4" log = "0.4" lazy_static = "1" @@ -18,6 +19,7 @@ num_enum = "0.7" serde = "1" serde_json = "1" mio = "0.8" +socket2 = "0.5" [dependencies.satrs] version = "0.2.0-rc.3" diff --git a/README.md b/README.md index 069dd95..34a8e69 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ ESA OPS-SAT Rust experiment ======== This is the primary repository for the ESA OPS-SAT experiment. +The primary repository to generate packages for ESOC can be found [here](https://egit.irs.uni-stuttgart.de/rust/ops-sat-experiment). +You can also find some more general documentation about OPS-SAT there. ## Pre-Requisites @@ -26,17 +28,3 @@ cross build ```sh cross build --release ``` - -## Documentation - -The [wiki](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki) -appears to be a useful source for documentation. - -- [OBSW documents](https://opssat1.esoc.esa.int/projects/experimenter-information/dmsf?folder_id=7) -- [Software Integration Process](https://opssat1.esoc.esa.int/dmsf/files/34/view) -- [SPP/TCP bridge](https://opssat1.esoc.esa.int/dmsf/files/65/view) -- [Cross-compiling SEPP](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Cross-compiling_SEPP_application) -- [TMTC infrastructure](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Live_TM_TC_data) -- [Submitting an Experiment](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Building_and_submitting_your_application_to_ESOC) -- [Building with Yocto and Docker](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Building_an_application_locally_using_Yocto_Toolchain_in_a_Docker) -- [SPP over CAN](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/SPP_over_CAN_communication) diff --git a/pytmtc/archive/tcp_server.py b/pytmtc/archive/tcp_server.py new file mode 100644 index 0000000..a65cfb4 --- /dev/null +++ b/pytmtc/archive/tcp_server.py @@ -0,0 +1,176 @@ +from typing import Any, Optional +import select +import time +import socket +import logging +from threading import Thread, Event, Lock +from collections import deque + +from tmtccmd.com import ComInterface +from tmtccmd.tmtc import TelemetryListT + + +_LOGGER = logging.getLogger(__name__) + + +class TcpServer(ComInterface): + def __init__(self, port: int): + self.port = port + self._max_num_packets_in_tc_queue = 500 + self._max_num_packets_in_tm_queue = 500 + self._default_timeout_secs = 0.5 + self._server_addr = ("localhost", self.port) + self._tc_packet_queue = deque() + self._tm_packet_queue = deque() + self._tc_lock = Lock() + self._tm_lock = Lock() + self._kill_signal = Event() + self._server_socket: Optional[socket.socket] = None + self._server_thread = Thread(target=self._server_task, daemon=True) + self._connected = False + # self._conn_start = None + # self._writing_done = False + # self._reading_done = False + + @property + def connected(self) -> bool: + return self._connected + + @property + def id(self) -> str: + return "tcp_server" + + def initialize(self, args: Any = 0) -> Any: + """Perform initializations step which can not be done in constructor or which require + returnvalues. + """ + pass + + def open(self, args: Any = 0): + """Opens the communication interface to allow communication. + + :return: + """ + if self.connected: + return + self._connected = True + self._server_thread.start() + + def _server_task(self): + self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # We need to check the kill signal periodically to allow closing the server. + self._server_socket.settimeout(self._default_timeout_secs) + self._server_socket.bind(self._server_addr) + self._server_socket.listen() + while True and not self._kill_signal.is_set(): + try: + (conn_socket, conn_addr) = self._server_socket.accept() + self._handle_connection(conn_socket, conn_addr) + # conn_socket.close() + """ + if ( + self._reading_done and self._writing_done + ) or time.time() - self.conn_start > 0.5: + print("reading and writing done") + break + + """ + except TimeoutError: + continue + + def _handle_connection(self, conn_socket: socket.socket, conn_addr: Any): + _LOGGER.info(f"TCP client {conn_addr} connected") + queue_len = 0 + + while True: + with self._tc_lock: + queue_len = len(self._tc_packet_queue) + outputs = [] + if queue_len > 0: + outputs.append(conn_socket) + (readable, writable, _) = select.select( + [conn_socket], + outputs, + [], + 0.2, + ) + + if writable and writable[0]: + print("writeable") + while queue_len > 0: + next_packet = bytes() + with self._tc_lock: + next_packet = self._tc_packet_queue.popleft() + if len(next_packet) > 0: + conn_socket.sendall(next_packet) + queue_len -= 1 + if readable and readable[0]: + print("readable") + while True: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + print(f"Received bytes from TCP client: {bytes_recvd.decode()}") + with self._tm_lock: + self._tm_packet_queue.append(bytes_recvd) + elif len(bytes_recvd) == 0: + break + else: + print("error receiving data from TCP client") + + def is_open(self) -> bool: + """Can be used to check whether the communication interface is open. This is useful if + opening a COM interface takes a longer time and is non-blocking + """ + return self.connected + + def close(self, args: Any = 0): + """Closes the ComIF and releases any held resources (for example a Communication Port). + + :return: + """ + self._kill_signal.set() + self._server_thread.join() + self._connected = False + + def send(self, data: bytes): + """Send raw data. + + :raises SendError: Sending failed for some reason. + """ + with self._tc_lock: + if len(self._tc_packet_queue) >= self._max_num_packets_in_tc_queue: + # Remove oldest packet + self._tc_packet_queue.popleft() + self._tc_packet_queue.append(data) + + def receive(self, parameters: Any = 0) -> TelemetryListT: + """Returns a list of received packets. The child class can use a separate thread to poll for + the packets or use some other mechanism and container like a deque to store packets + to be returned here. + + :param parameters: + :raises ReceptionDecodeError: If the underlying COM interface uses encoding and + decoding and the decoding fails, this exception will be returned. + :return: + """ + + with self._tm_lock: + packet_list = [] + while self._tm_packet_queue: + packet_list.append(self._tm_packet_queue.popleft()) + return packet_list + + def data_available(self, timeout: float, parameters: Any = 0) -> int: + """Check whether TM packets are available. + + :param timeout: Can be used to block on available data if supported by the specific + communication interface. + :param parameters: Can be an arbitrary parameter. + :raises ReceptionDecodeError: If the underlying COM interface uses encoding and + decoding when determining the number of available packets, this exception can be + thrown on decoding errors. + :return: 0 if no data is available, number of packets otherwise. + """ + with self._tm_lock: + return len(self._tm_packet_queue) diff --git a/pytmtc/main.py b/pytmtc/pyclient.py similarity index 99% rename from pytmtc/main.py rename to pytmtc/pyclient.py index a3a69cf..a02d648 100755 --- a/pytmtc/main.py +++ b/pytmtc/pyclient.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Example client for the sat-rs example application""" +from __future__ import annotations import logging import sys import time diff --git a/pytmtc/pyserver.py b/pytmtc/pyserver.py new file mode 100755 index 0000000..19caf93 --- /dev/null +++ b/pytmtc/pyserver.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +import socket +import abc +import time +import select +import logging +from typing import Any +from threading import Event, Thread +from collections import deque +from multiprocessing import Queue +from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId +from spacepackets.ecss.tc import PacketType + +EXP_ID = 278 +EXP_APID = 1024 + EXP_ID +EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID) +EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID) +OPSSAT_SERVER_PORT = 4096 +TMTC_SERVER_PORT = 4097 +LOG_LEVEL = logging.DEBUG + + +TC_QUEUE = Queue() +TM_QUEUE = Queue() +KILL_SIGNAL = Event() + + +_LOGGER = logging.getLogger(__name__) + + +def main(): + logging.basicConfig( + format="[%(asctime)s] [%(levelname)-5s] %(message)s", + level=LOG_LEVEL, + datefmt="%Y-%m-%d %H:%M:%S", + ) + print("Starting OPS-SAT ground TMTC server") + KILL_SIGNAL.clear() + ops_sat_thread = OpsSatServer() + ops_sat_thread.start() + tmtc_thread = TmtcServer() + tmtc_thread.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + _LOGGER.info("Shutting down server gracefully") + KILL_SIGNAL.set() + ops_sat_thread.join() + tmtc_thread.join() + + +class BaseServer(Thread): + def __init__(self, log_prefix: str, port: int): + self.log_prefix = log_prefix + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_addr = ("0.0.0.0", port) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.setblocking(False) + self.server_socket.settimeout(0.4) + self.server_socket.bind(server_addr) + super().__init__() + + def run(self) -> None: + self.run_sync_version() + + def run_sync_version(self) -> None: + self.server_socket.listen() + while True and not KILL_SIGNAL.is_set(): + try: + (conn_socket, conn_addr) = self.server_socket.accept() + self.handle_connection(conn_socket, conn_addr) + except TimeoutError: + continue + + def handle_connection(self, conn_socket: socket.socket, conn_addr: Any): + conn_socket.setblocking(False) + print(f"{self.log_prefix} TCP client {conn_addr} connected") + analysis_deque = deque() + while True and not KILL_SIGNAL.is_set(): + conn_socket.settimeout(0.2) + try: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") + analysis_deque.append(bytes_recvd) + elif len(bytes_recvd) == 0: + self.handle_read_bytestream(analysis_deque) + break + else: + print("error receiving data from TCP client") + except BlockingIOError: + self.handle_timeout(conn_socket, analysis_deque) + time.sleep(0.2) + except TimeoutError: + self.handle_timeout(conn_socket, analysis_deque) + + def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque): + if len(analysis_deque) > 0: + self.handle_read_bytestream(analysis_deque) + self.send_data_to_client(conn_socket) + + def run_select_version(self) -> None: + while True: + self.server_socket.listen() + (conn_socket, conn_addr) = self.server_socket.accept() + print(f"{self.log_prefix} TCP client {conn_addr} connected") + analysis_deque = deque() + while True: + outputs = [] + if self.send_data_available(): + outputs.append(conn_socket) + (readable, writable, _) = select.select([conn_socket], outputs, [], 0.2) + if readable and readable[0]: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + _LOGGER.debug("received data from TCP client: {}", bytes_recvd) + analysis_deque.append(bytes_recvd) + elif len(bytes_recvd) == 0: + self.handle_read_bytestream(analysis_deque) + break + else: + print("error receiving data from TCP client") + if writable and writable[0]: + self.send_data_to_client(conn_socket) + if not writable and not readable: + if len(analysis_deque) > 0: + self.handle_read_bytestream(analysis_deque) + self.send_data_to_client(conn_socket) + + @abc.abstractmethod + def handle_read_bytestream(self, analysis_deque: deque): + pass + + @abc.abstractmethod + def send_data_to_client(self, conn_socket: socket.socket): + pass + + @abc.abstractmethod + def send_data_available(self) -> bool: + pass + + +class OpsSatServer(BaseServer): + def __init__(self): + super().__init__("[OPS-SAT]", OPSSAT_SERVER_PORT) + + def handle_read_bytestream(self, analysis_deque: deque): + parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM]) + for packet in parsed_packets: + _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") + TM_QUEUE.put(packet) + + def send_data_to_client(self, conn_socket: socket.socket): + while not TC_QUEUE.empty(): + next_packet = TC_QUEUE.get() + _LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]") + conn_socket.sendall(next_packet) + + def send_data_available(self) -> bool: + return not TC_QUEUE.empty() + + +class TmtcServer(BaseServer): + def __init__(self): + super().__init__("[TMTC]", TMTC_SERVER_PORT) + + def handle_read_bytestream(self, analysis_deque: deque): + parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC]) + for packet in parsed_packets: + _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") + TC_QUEUE.put(packet) + + def send_data_to_client(self, conn_socket: socket.socket): + while not TM_QUEUE.empty(): + next_packet = TM_QUEUE.get() + _LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]") + conn_socket.sendall(next_packet) + + def send_data_available(self) -> bool: + return not TM_QUEUE.empty() + + +if __name__ == "__main__": + main() diff --git a/scripts/bld-deploy-remote.py b/scripts/bld-deploy-remote.py index 9bce107..e208083 100755 --- a/scripts/bld-deploy-remote.py +++ b/scripts/bld-deploy-remote.py @@ -4,7 +4,6 @@ on a remote machine, e.g. a Raspberry Pi""" import argparse import os import sys -import platform import time from typing import Final @@ -16,10 +15,10 @@ BUILDER = "cross" # remote configurations by tweaking / hardcoding these parameter, which generally are constant # for a given board DEFAULT_USER_NAME: Final = "root" -DEFAULT_ADDRESS: Final = "192.254.108.30" +DEFAULT_ADDRESS: Final = "small_flatsat" DEFAULT_TOOLCHAIN: Final = "armv7-unknown-linux-gnueabihf" DEFAULT_APP_NAME: Final = "ops-sat-rs" -DEFAULT_TARGET_FOLDER: Final = "/tmp" +DEFAULT_TARGET_FOLDER: Final = "/home/exp278/" DEFAULT_DEBUG_PORT: Final = "1234" DEFAULT_GDB_APP = "gdb-multiarch" @@ -140,10 +139,11 @@ def bld_deploy_run(args): sshpass_args = f"-f {args.sshfile}" elif args.sshenv: sshpass_args = "-e" - ssh_target_ident = f"{args.user}@{args.address}" + # ssh_target_ident = f"{args.user}@{args.address}" + ssh_target_ident = "small_flatsat" sshpass_cmd = "" - if platform.system() != "Windows": - sshpass_cmd = f"sshpass {sshpass_args}" + # if platform.system() != "Windows": + # sshpass_cmd = f"sshpass {sshpass_args}" dest_path = f"{args.dest}/{args.app}" if not args.source: source_path = f"{os.getcwd()}/target/{args.tc}/{build_folder}/{args.app}" diff --git a/src/config.rs b/src/config.rs index 4ee0a0e..e48b764 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,6 +8,7 @@ use std::net::Ipv4Addr; use std::path::{Path, PathBuf}; pub const STOP_FILE_NAME: &str = "stop-experiment"; +pub const CONFIG_FILE_NAME: &str = "exp278.toml"; pub const HOME_FOLER_EXPERIMENT: &str = "/home/exp278"; pub const LOG_FOLDER: &str = "logs"; @@ -21,7 +22,8 @@ pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPER // TODO: Would be nice if this can be commanded as well.. /// Can be enabled to print all SPP packets received from the SPP server on port 4096. -pub const SPP_CLIENT_WIRETAPPING_RX: bool = false; +pub const SPP_CLIENT_WIRETAPPING_RX: bool = true; +pub const SPP_CLIENT_WIRETAPPING_TX: bool = true; #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] @@ -51,6 +53,80 @@ lazy_static! { }; } +pub mod cfg_file { + use std::{ + fs::File, + io::Read, + path::{Path, PathBuf}, + }; + + use super::{CONFIG_FILE_NAME, HOME_PATH, TCP_SPP_SERVER_PORT}; + + pub const SPP_CLIENT_PORT_CFG_KEY: &str = "tcp_spp_server_port"; + + #[derive(Debug)] + pub struct AppCfg { + pub tcp_spp_server_port: u16, + } + + impl Default for AppCfg { + fn default() -> Self { + Self { + tcp_spp_server_port: TCP_SPP_SERVER_PORT, + } + } + } + + pub fn create_app_config() -> AppCfg { + let mut cfg_path = HOME_PATH.clone(); + cfg_path.push(CONFIG_FILE_NAME); + let cfg_path_home = cfg_path.as_path(); + let relevant_path = if Path::new(CONFIG_FILE_NAME).exists() { + Some(PathBuf::from(Path::new(CONFIG_FILE_NAME))) + } else if cfg_path_home.exists() { + Some(PathBuf::from(cfg_path_home)) + } else { + None + }; + + let mut app_cfg = AppCfg::default(); + if relevant_path.is_none() { + log::warn!("No config file found, using default values"); + return app_cfg; + } + let relevant_path = relevant_path.unwrap(); + match File::open(relevant_path.as_path()) { + Ok(mut file) => { + let mut toml_str = String::new(); + match file.read_to_string(&mut toml_str) { + Ok(_size) => match toml_str.parse::() { + Ok(table) => { + handle_config_file_table(table, &mut app_cfg); + } + Err(e) => log::error!("error parsing TOML config file: {e}"), + }, + Err(e) => log::error!("error reading TOML config file: {e}"), + } + } + Err(e) => log::error!("error opening TOML config file: {e}"), + } + app_cfg + } + + #[allow(clippy::collapsible_match)] + pub fn handle_config_file_table(table: toml::Table, app_cfg: &mut AppCfg) { + if let Some(value) = table.get(SPP_CLIENT_PORT_CFG_KEY) { + if let toml::Value::Integer(port) = value { + if *port < 0 { + log::warn!("invalid port value, is negative"); + } else { + app_cfg.tcp_spp_server_port = *port as u16 + } + } + } + } +} + pub mod tmtc_err { use super::*; use satrs::res_code::ResultU16; @@ -140,11 +216,14 @@ pub mod components { } pub mod tasks { + use std::time::Duration; + pub const FREQ_MS_UDP_TMTC: u64 = 200; pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_CTRL: u64 = 400; - pub const STOP_CHECK_FREQUENCY: u64 = 400; + pub const STOP_CHECK_FREQUENCY_MS: u64 = 400; + pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS); } diff --git a/src/handlers/camera.rs b/src/handlers/camera.rs index d0eae69..68d0c61 100644 --- a/src/handlers/camera.rs +++ b/src/handlers/camera.rs @@ -24,7 +24,6 @@ /// v Y /// /// see also https://opssat1.esoc.esa.int/dmsf/files/6/view - use crate::requests::CompositeRequest; use derive_new::new; use log::debug; @@ -154,10 +153,20 @@ impl IMS100BatchHandler { self.handle_hk_request(&msg.requestor_info, hk_request); } CompositeRequest::Action(action_request) => { - self.handle_action_request(&msg.requestor_info, action_request); + if let Err(e) = + self.handle_action_request(&msg.requestor_info, action_request) + { + log::warn!("camera action request IO error: {e}"); + } + } + }, + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("composite request receiver disconnected"); + break; } }, - Err(_) => {} } } } @@ -236,6 +245,7 @@ impl IMS100BatchHandler { Ok(()) } + #[allow(clippy::too_many_arguments)] pub fn take_picture_from_str( &mut self, R: &str, @@ -281,4 +291,4 @@ mod tests { fn test_crc() { // TODO } -} \ No newline at end of file +} diff --git a/src/interface/tcp_server.rs b/src/interface/tcp_server.rs index c3275c6..23c773d 100644 --- a/src/interface/tcp_server.rs +++ b/src/interface/tcp_server.rs @@ -1,7 +1,6 @@ use std::{ collections::VecDeque, sync::{atomic::AtomicBool, mpsc, Arc, Mutex}, - time::Duration, }; use log::{info, warn}; @@ -113,9 +112,7 @@ impl TcpTask { } pub fn periodic_operation(&mut self) { - let result = self - .0 - .handle_all_connections(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + let result = self.0.handle_all_connections(Some(STOP_CHECK_FREQUENCY)); match result { Ok(_conn_result) => (), Err(e) => { diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 847f000..8e41c43 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -1,12 +1,12 @@ -use std::io::{self, Read, Write}; +use std::io::{self, Read}; +use std::net::TcpStream as StdTcpStream; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; -use std::time::Duration; -use mio::net::TcpStream; +use mio::net::TcpStream as MioTcpStream; use mio::{Events, Interest, Poll, Token}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; -use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, TCP_SPP_SERVER_PORT}; +use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, SPP_CLIENT_WIRETAPPING_TX}; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; use satrs::queue::GenericSendError; use satrs::spacepackets::PacketId; @@ -17,19 +17,16 @@ use thiserror::Error; use super::{SimpleSpValidator, TcpComponent}; #[derive(Debug, Error)] -pub enum PacketForwardingError { +pub enum ClientError { #[error("send error: {0}")] Send(#[from] GenericSendError), #[error("io error: {0}")] Io(#[from] io::Error), } -pub struct TcpSppClient { +#[allow(dead_code)] +pub struct TcpSppClientCommon { id: ComponentId, - poll: Poll, - events: Events, - // Optional to allow periodic reconnection attempts on the TCP server. - client: Option, read_buf: [u8; 4096], tm_tcp_client_rx: mpsc::Receiver, server_addr: SocketAddr, @@ -37,143 +34,13 @@ pub struct TcpSppClient { validator: SimpleSpValidator, } -impl TcpSppClient { - pub fn new( - id: ComponentId, - tc_source_tx: mpsc::Sender, - tm_tcp_client_rx: mpsc::Receiver, - valid_ids: &'static [PacketId], - ) -> io::Result { - let mut poll = Poll::new()?; - let events = Events::with_capacity(128); - let server_addr = - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); - let client = Self::attempt_connection(&mut poll, &server_addr); - if client.is_err() { - log::warn!( - "connection to TCP server {} failed: {}", - server_addr, - client.unwrap_err() - ); - return Ok(Self { - id, - poll, - events, - client: None, - read_buf: [0; 4096], - server_addr, - tm_tcp_client_rx, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), - }); - } - Ok(Self { - id, - poll, - events, - client: Some(client.unwrap()), - read_buf: [0; 4096], - server_addr, - tm_tcp_client_rx, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), - }) - } - - pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result { - let mut client = TcpStream::connect(*server_addr)?; - poll.registry().register( - &mut client, - Token(0), - Interest::READABLE | Interest::WRITABLE, - )?; - Ok(client) - } - - pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { - if self.client.is_some() { - return self.perform_regular_operation(); - } else { - let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr); - match client_result { - Ok(client) => { - self.client = Some(client); - self.perform_regular_operation()?; - } - Err(ref e) => { - log::warn!( - "connection to TCP server {} failed: {}", - self.server_addr, - e - ); - } - } - } - Ok(()) - } - - pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> { - self.poll.poll( - &mut self.events, - Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), - )?; - let events: Vec = self.events.iter().cloned().collect(); - for event in events { - if event.token() == Token(0) { - if event.is_readable() { - self.read_from_server()?; - } - if event.is_writable() { - self.write_to_server()?; - } - } - } - Ok(()) - } - - pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { - let client = self - .client - .as_mut() - .expect("TCP stream invalid when it should not be"); - match client.read(&mut self.read_buf) { - Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), - Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, - Err(e) => return Err(e.into()), - } - Ok(()) - } - - pub fn write_to_server(&mut self) -> io::Result<()> { - let client = self - .client - .as_mut() - .expect("TCP stream invalid when it should not be"); - loop { - match self.tm_tcp_client_rx.try_recv() { - Ok(tm) => { - client.write_all(&tm.packet)?; - } - Err(e) => match e { - mpsc::TryRecvError::Empty => break, - mpsc::TryRecvError::Disconnected => { - log::error!("TM sender to TCP client has disconnected"); - break; - } - }, - } - } - Ok(()) - } - - pub fn handle_read_bytstream( - &mut self, - read_bytes: usize, - ) -> Result<(), PacketForwardingError> { +#[allow(dead_code)] +impl TcpSppClientCommon { + pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { let mut dummy = 0; if SPP_CLIENT_WIRETAPPING_RX { log::debug!( - "received {} bytes on TCP client: {:x?}", + "SPP TCP RX {} bytes: {:x?}", read_bytes, &self.read_buf[..read_bytes] ); @@ -188,4 +55,243 @@ impl TcpSppClient { )?; Ok(()) } + + pub fn write_to_server(&mut self, client: &mut impl io::Write) -> io::Result<()> { + loop { + match self.tm_tcp_client_rx.try_recv() { + Ok(tm) => { + if SPP_CLIENT_WIRETAPPING_TX { + log::debug!( + "SPP TCP TX {}: {:x?}", + tm.packet.len(), + tm.packet.as_slice() + ); + } + client.write_all(&tm.packet)?; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::error!("TM sender to TCP client has disconnected"); + break; + } + }, + } + } + Ok(()) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum ConnectionStatus { + Unknown, + Connected, + LostConnection, + TryingReconnect, +} + +#[allow(dead_code)] +pub struct TcpSppClientMio { + common: TcpSppClientCommon, + poll: Poll, + events: Events, + // Optional to allow periodic reconnection attempts on the TCP server. + client: Option, + connection: ConnectionStatus, +} + +#[allow(dead_code)] +impl TcpSppClientMio { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, + valid_ids: &'static [PacketId], + port: u16, + ) -> io::Result { + let poll = Poll::new()?; + let events = Events::with_capacity(128); + let mut client = Self { + common: TcpSppClientCommon { + id, + read_buf: [0; 4096], + tm_tcp_client_rx, + server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }, + poll, + events, + client: None, + connection: ConnectionStatus::Unknown, + }; + client.connect()?; + Ok(client) + } + + pub fn connect(&mut self) -> io::Result<()> { + let mut client = MioTcpStream::connect(self.common.server_addr)?; + + self.poll.registry().register( + &mut client, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; + self.client = Some(client); + self.connection = ConnectionStatus::TryingReconnect; + Ok(()) + } + + pub fn operation(&mut self) -> Result<(), ClientError> { + match self.connection { + ConnectionStatus::TryingReconnect | ConnectionStatus::Unknown => { + self.check_conn_status()? + } + ConnectionStatus::Connected => { + self.check_conn_status()?; + self.poll + .poll(&mut self.events, Some(STOP_CHECK_FREQUENCY))?; + let events: Vec = self.events.iter().cloned().collect(); + for event in events { + if event.token() == Token(0) { + if event.is_readable() { + log::warn!("TCP client is readable"); + self.read_from_server()?; + } + if event.is_writable() { + log::warn!("TCP client is writable"); + self.common.write_to_server(self.client.as_mut().unwrap())?; + } + } + } + return Ok(()); + } + ConnectionStatus::LostConnection => self.connect()?, + }; + std::thread::sleep(STOP_CHECK_FREQUENCY); + Ok(()) + } + + pub fn read_from_server(&mut self) -> Result<(), ClientError> { + match self + .client + .as_mut() + .unwrap() + .read(&mut self.common.read_buf) + { + Ok(0) => (), + Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, + Err(e) => return Err(e.into()), + } + Ok(()) + } + + pub fn check_conn_status(&mut self) -> io::Result<()> { + match self.client.as_mut().unwrap().peer_addr() { + Ok(_) => { + if self.connection == ConnectionStatus::Unknown + || self.connection == ConnectionStatus::TryingReconnect + { + self.connection = ConnectionStatus::Connected; + } + Ok(()) + } + Err(e) => { + if e.kind() == io::ErrorKind::NotConnected { + log::warn!("lost connection, or do not have one"); + self.connection = ConnectionStatus::LostConnection; + return Ok(()); + } + Err(e) + } + } + } +} + +pub struct TcpSppClientStd { + common: TcpSppClientCommon, + // Optional to allow periodic reconnection attempts on the TCP server. + stream: Option, +} + +impl TcpSppClientStd { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, + valid_ids: &'static [PacketId], + port: u16, + ) -> io::Result { + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + let client = match StdTcpStream::connect(server_addr) { + Ok(stream) => { + stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; + Some(stream) + } + Err(e) => { + log::warn!("error connecting to server: {}", e); + None + } + }; + Ok(Self { + common: TcpSppClientCommon { + id, + read_buf: [0; 4096], + tm_tcp_client_rx, + server_addr, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }, + stream: client, + }) + } + + pub fn attempt_connect(&mut self, log_error: bool) -> io::Result { + Ok(match StdTcpStream::connect(self.common.server_addr) { + Ok(stream) => { + stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; + self.stream = Some(stream); + true + } + Err(e) => { + if log_error { + log::warn!("error connecting to server: {}", e); + } + false + } + }) + } + + pub fn operation(&mut self) -> Result<(), ClientError> { + if let Some(client) = &mut self.stream { + match client.read(&mut self.common.read_buf) { + // Not sure if this can happen or this is actually an error condition.. + Ok(0) => { + log::info!("server closed connection"); + self.stream = None; + } + Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut + { + self.common.write_to_server(client)?; + return Ok(()); + } + log::warn!("server error: {e:?}"); + if e.kind() == io::ErrorKind::ConnectionReset { + self.stream = None; + } + return Err(e.into()); + } + } + } else { + if self.attempt_connect(false)? { + log::info!("reconnected to server succesfully"); + return self.operation(); + } + std::thread::sleep(STOP_CHECK_FREQUENCY); + } + + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index 4a629be..36e4fc2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use std::{ use log::info; use ops_sat_rs::config::{ + cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, VALID_PACKET_ID_LIST, @@ -14,7 +15,7 @@ use ops_sat_rs::config::{ use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; -use crate::tmtc::tc_source::TcSourceTaskDynamic; +use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ @@ -22,10 +23,7 @@ use crate::{ interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, }; -use crate::{ - interface::tcp_spp_client::TcpSppClient, - pus::{PusTcDistributor, PusTcMpscRouter}, -}; +use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic}; use crate::{ pus::{action::create_action_service, stack::PusStack}, requests::GenericRequestRouter, @@ -43,6 +41,9 @@ fn main() { setup_logger().expect("setting up logging with fern failed"); println!("OPS-SAT Rust Experiment OBSW"); + let app_cfg = create_app_config(); + println!("App Configuration: {:?}", app_cfg); + let stop_signal = Arc::new(AtomicBool::new(false)); let (tc_source_tx, tc_source_rx) = mpsc::channel(); @@ -165,11 +166,12 @@ fn main() { stop_signal.clone(), ); - let mut tcp_spp_client = TcpSppClient::new( + let mut tcp_spp_client = TcpSppClientStd::new( TCP_SPP_CLIENT.id(), tc_source_tx, tm_tcp_client_rx, VALID_PACKET_ID_LIST, + app_cfg.tcp_spp_server_port, ) .expect("creating TCP SPP client failed"); @@ -228,7 +230,10 @@ fn main() { .spawn(move || { info!("Running TCP SPP client"); loop { - let _result = tcp_spp_client.periodic_operation(); + let result = tcp_spp_client.operation(); + if let Err(e) = result { + log::error!("TCP SPP client error: {}", e); + } if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } diff --git a/src/pus/test.rs b/src/pus/test.rs index 7e7c6fc..3a14a2a 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -42,7 +42,6 @@ pub struct TestCustomServiceWrapper { EcssTcInVecConverter, VerificationReporter, >, - // pub test_srv_event_sender: mpsc::Sender, } impl TestCustomServiceWrapper { diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index fdb8043..e2e0127 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -1,6 +1,6 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::{collections::HashMap, sync::mpsc, time::Duration}; +use std::{collections::HashMap, sync::mpsc}; use log::info; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; @@ -101,10 +101,7 @@ impl TmFunnelDynamic { pub fn operation(&mut self) { loop { - match self - .tm_funnel_rx - .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) - { + match self.tm_funnel_rx.recv_timeout(STOP_CHECK_FREQUENCY) { Ok(mut tm) => { // Read the TM, set sequence counter and message counter, and finally update // the CRC. diff --git a/templates/exp278.toml b/templates/exp278.toml new file mode 100644 index 0000000..d506070 --- /dev/null +++ b/templates/exp278.toml @@ -0,0 +1,5 @@ +# This configuration file should either be inside the (experiment) home folder or in the current +# folder the application is run from. + +# On the small flatsat, change this to 9999. +tcp_spp_server_port = 4096 -- 2.43.0