diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index 6e17a4e..2e8bb01 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -12,6 +12,9 @@ use crate::tmtc::{ReceivesTc, TmPacketSource}; use thiserror::Error; // Re-export the TMTC in COBS server. +pub use crate::hal::std::tcp_spacepackets_server::{ + CcsdsTcParser, CcsdsTmSender, TcpSpacepacketsServer, +}; pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; /// Configuration struct for the generic TCP TMTC server @@ -317,3 +320,59 @@ impl TcpTmtcServerBase { self.listener.local_addr() } } + +#[cfg(test)] +pub(crate) mod tests { + use std::sync::Mutex; + + use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; + + use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore}; + + #[derive(Default, Clone)] + pub(crate) struct SyncTcCacher { + pub(crate) tc_queue: Arc>>>, + } + impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Clone)] + pub(crate) struct SyncTmSource { + tm_queue: Arc>>>, + } + + impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } + } + + impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } + } +} diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 1018364..2226f35 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -1,4 +1,5 @@ -use std::{io::Write, net::TcpStream}; +use delegate::delegate; +use std::{io::Write, net::{TcpStream, TcpListener, SocketAddr}}; use alloc::boxed::Box; @@ -7,11 +8,19 @@ use crate::{ tmtc::{ReceivesTc, TmPacketSource}, }; -use super::tcp_server::{ConnectionResult, TcpTcParser, TcpTmSender, TcpTmtcError}; +use super::tcp_server::{ + ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, +}; /// Concrete [TcpTcParser] implementation for the []. pub struct CcsdsTcParser { - packet_id_lookup: Box, + packet_id_lookup: Box, +} + +impl CcsdsTcParser { + pub fn new(packet_id_lookup: Box) -> Self { + Self { packet_id_lookup } + } } impl TcpTcParser for CcsdsTcParser { @@ -66,5 +75,73 @@ impl TcpTmSender for CcsdsTmSender { } } +/// TCP TMTC server implementation for exchange of tightly stuffed CCSDS space packets. +/// +/// This serves only works if CCSDS space packets are the only packet type being exchanged. +/// It uses the CCSDS [spacepackets::PacketId] as the packet delimiter and start marker when +/// parsing for packets. The user specifies a set of expected [spacepackets::PacketId]s as part +/// of the server configuration for that purpose. +/// +/// ## Example +/// +/// The [TCP integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// also serves as the example application for this module. +pub struct TcpSpacepacketsServer { + generic_server: TcpTmtcGenericServer, +} + +impl TcpSpacepacketsServer { + /// Create a new TCP TMTC server which exchanges TMTC packets encoded with + /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommands which were decoded successfully will be + /// forwarded to this TC receiver. + pub fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + packet_id_lookup: Box + ) -> Result> { + Ok(Self { + generic_server: TcpTmtcGenericServer::new( + cfg, + CcsdsTcParser::new(packet_id_lookup), + CcsdsTmSender::default(), + tm_source, + tc_receiver, + )?, + }) + } + + delegate! { + to self.generic_server { + pub fn listener(&mut self) -> &mut TcpListener; + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result; + + /// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call. + pub fn handle_next_connection( + &mut self, + ) -> Result>; + } + } +} + #[cfg(test)] -mod tests {} +mod tests { + use super::TcpSpacepacketsServer; + + #[test] + fn test_basic() { + let + let server = TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup) + + } +} diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs index 10d72d0..f019eaf 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -173,61 +173,13 @@ mod tests { use crate::{ encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, - hal::std::tcp_server::ServerConfig, - tmtc::{ReceivesTcCore, TmPacketSourceCore}, + hal::std::tcp_server::{ServerConfig, tests::{SyncTcCacher, SyncTmSource}}, }; - use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + use alloc::{boxed::Box, sync::Arc}; use cobs::encode; use super::TcpTmtcInCobsServer; - #[derive(Default, Clone)] - struct SyncTcCacher { - tc_queue: Arc>>>, - } - impl ReceivesTcCore for SyncTcCacher { - type Error = (); - - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); - tc_queue.push_back(tc_raw.to_vec()); - Ok(()) - } - } - - #[derive(Default, Clone)] - struct SyncTmSource { - tm_queue: Arc>>>, - } - - impl SyncTmSource { - pub(crate) fn add_tm(&mut self, tm: &[u8]) { - let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); - tm_queue.push_back(tm.to_vec()); - } - } - - impl TmPacketSourceCore for SyncTmSource { - type Error = (); - - fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { - let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); - if !tm_queue.is_empty() { - let next_vec = tm_queue.front().unwrap(); - if buffer.len() < next_vec.len() { - panic!( - "provided buffer too small, must be at least {} bytes", - next_vec.len() - ); - } - let next_vec = tm_queue.pop_front().unwrap(); - buffer[0..next_vec.len()].copy_from_slice(&next_vec); - return Ok(next_vec.len()); - } - Ok(0) - } - } - fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx) }