- Add new shared subcrate satrs-shared to split off some shared components not expected to change very often. - Renmame `satrs-core` to `satrs`. It is expected that sat-rs will remain the primary crate, so the core information is superfluous, and core also implies stability, which will not be the case for some time.
This commit is contained in:
4
satrs/src/hal/mod.rs
Normal file
4
satrs/src/hal/mod.rs
Normal file
@ -0,0 +1,4 @@
|
||||
//! # Hardware Abstraction Layer module
|
||||
#[cfg(feature = "std")]
|
||||
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
|
||||
pub mod std;
|
6
satrs/src/hal/std/mod.rs
Normal file
6
satrs/src/hal/std/mod.rs
Normal file
@ -0,0 +1,6 @@
|
||||
//! Helper modules intended to be used on systems with a full [std] runtime.
|
||||
pub mod tcp_server;
|
||||
pub mod udp_server;
|
||||
|
||||
mod tcp_cobs_server;
|
||||
mod tcp_spacepackets_server;
|
379
satrs/src/hal/std/tcp_cobs_server.rs
Normal file
379
satrs/src/hal/std/tcp_cobs_server.rs
Normal file
@ -0,0 +1,379 @@
|
||||
use alloc::vec;
|
||||
use cobs::encode;
|
||||
use delegate::delegate;
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::TcpListener;
|
||||
use std::net::TcpStream;
|
||||
use std::vec::Vec;
|
||||
|
||||
use crate::encoding::parse_buffer_for_cobs_encoded_packets;
|
||||
use crate::tmtc::ReceivesTc;
|
||||
use crate::tmtc::TmPacketSource;
|
||||
|
||||
use crate::hal::std::tcp_server::{
|
||||
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
||||
};
|
||||
|
||||
/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer].
|
||||
#[derive(Default)]
|
||||
pub struct CobsTcParser {}
|
||||
|
||||
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
|
||||
fn handle_tc_parsing(
|
||||
&mut self,
|
||||
tc_buffer: &mut [u8],
|
||||
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||
conn_result: &mut ConnectionResult,
|
||||
current_write_idx: usize,
|
||||
next_write_idx: &mut usize,
|
||||
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
|
||||
&mut tc_buffer[..current_write_idx],
|
||||
tc_receiver.upcast_mut(),
|
||||
next_write_idx,
|
||||
)
|
||||
.map_err(|e| TcpTmtcError::TcError(e))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Concrete [TcpTmSender] implementation for the [TcpTmtcInCobsServer].
|
||||
pub struct CobsTmSender {
|
||||
tm_encoding_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl CobsTmSender {
|
||||
fn new(tm_buffer_size: usize) -> Self {
|
||||
Self {
|
||||
// The buffer should be large enough to hold the maximum expected TM size encoded with
|
||||
// COBS.
|
||||
tm_encoding_buffer: vec![0; cobs::max_encoding_length(tm_buffer_size)],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
||||
fn handle_tm_sending(
|
||||
&mut self,
|
||||
tm_buffer: &mut [u8],
|
||||
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||
conn_result: &mut ConnectionResult,
|
||||
stream: &mut TcpStream,
|
||||
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
||||
let mut tm_was_sent = false;
|
||||
loop {
|
||||
// Write TM until TM source is exhausted. For now, there is no limit for the amount
|
||||
// of TM written this way.
|
||||
let read_tm_len = tm_source
|
||||
.retrieve_packet(tm_buffer)
|
||||
.map_err(|e| TcpTmtcError::TmError(e))?;
|
||||
|
||||
if read_tm_len == 0 {
|
||||
return Ok(tm_was_sent);
|
||||
}
|
||||
tm_was_sent = true;
|
||||
conn_result.num_sent_tms += 1;
|
||||
|
||||
// Encode into COBS and sent to client.
|
||||
let mut current_idx = 0;
|
||||
self.tm_encoding_buffer[current_idx] = 0;
|
||||
current_idx += 1;
|
||||
current_idx += encode(
|
||||
&tm_buffer[..read_tm_len],
|
||||
&mut self.tm_encoding_buffer[current_idx..],
|
||||
);
|
||||
self.tm_encoding_buffer[current_idx] = 0;
|
||||
current_idx += 1;
|
||||
stream.write_all(&self.tm_encoding_buffer[..current_idx])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the
|
||||
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
||||
///
|
||||
/// Telemetry will be encoded with the COBS protocol using [cobs::encode] in addition to being
|
||||
/// wrapped with the sentinel value 0 as the packet delimiter as well before being sent back to
|
||||
/// the client. Please note that the server will send as much data as it can retrieve from the
|
||||
/// [TmPacketSource] in its current implementation.
|
||||
///
|
||||
/// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data
|
||||
/// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full
|
||||
/// packets even from a data stream which is split up. The server wil use the
|
||||
/// [parse_buffer_for_cobs_encoded_packets] function to parse for packets and pass them to a
|
||||
/// generic TC receiver. The user can use [crate::encoding::encode_packet_with_cobs] to encode
|
||||
/// telecommands sent to the server.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
||||
/// test also serves as the example application for this module.
|
||||
pub struct TcpTmtcInCobsServer<
|
||||
TmError,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> {
|
||||
generic_server:
|
||||
TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, CobsTmSender, CobsTcParser>,
|
||||
}
|
||||
|
||||
impl<
|
||||
TmError: 'static,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> TcpTmtcInCobsServer<TmError, TcError, TmSource, TcReceiver>
|
||||
{
|
||||
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
|
||||
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
||||
///
|
||||
/// ## Parameter
|
||||
///
|
||||
/// * `cfg` - Configuration of the server.
|
||||
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
|
||||
/// then sent back to the client.
|
||||
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
|
||||
/// forwarded to this TC receiver.
|
||||
pub fn new(
|
||||
cfg: ServerConfig,
|
||||
tm_source: TmSource,
|
||||
tc_receiver: TcReceiver,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
generic_server: TcpTmtcGenericServer::new(
|
||||
cfg,
|
||||
CobsTcParser::default(),
|
||||
CobsTmSender::new(cfg.tm_buffer_size),
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
delegate! {
|
||||
to self.generic_server {
|
||||
pub fn listener(&mut self) -> &mut TcpListener;
|
||||
|
||||
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
||||
/// useful if using the port number 0 for OS auto-assignment.
|
||||
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
|
||||
|
||||
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
||||
pub fn handle_next_connection(
|
||||
&mut self,
|
||||
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::{
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
||||
thread,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
||||
hal::std::tcp_server::{
|
||||
tests::{SyncTcCacher, SyncTmSource},
|
||||
ServerConfig,
|
||||
},
|
||||
};
|
||||
use alloc::sync::Arc;
|
||||
use cobs::encode;
|
||||
|
||||
use super::TcpTmtcInCobsServer;
|
||||
|
||||
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
||||
}
|
||||
|
||||
fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||
encode_packet(&INVERTED_PACKET, encoded_buf, current_idx)
|
||||
}
|
||||
|
||||
fn encode_packet(packet: &[u8], encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||
encoded_buf[*current_idx] = 0;
|
||||
*current_idx += 1;
|
||||
*current_idx += encode(packet, &mut encoded_buf[*current_idx..]);
|
||||
encoded_buf[*current_idx] = 0;
|
||||
*current_idx += 1;
|
||||
}
|
||||
|
||||
fn generic_tmtc_server(
|
||||
addr: &SocketAddr,
|
||||
tc_receiver: SyncTcCacher,
|
||||
tm_source: SyncTmSource,
|
||||
) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
|
||||
TcpTmtcInCobsServer::new(
|
||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
)
|
||||
.expect("TCP server generation failed")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_basic_no_tm() {
|
||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let tm_source = SyncTmSource::default();
|
||||
let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source);
|
||||
let dest_addr = tcp_server
|
||||
.local_addr()
|
||||
.expect("retrieving dest addr failed");
|
||||
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||
let set_if_done = conn_handled.clone();
|
||||
// Call the connection handler in separate thread, does block.
|
||||
thread::spawn(move || {
|
||||
let result = tcp_server.handle_next_connection();
|
||||
if result.is_err() {
|
||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||
}
|
||||
let conn_result = result.unwrap();
|
||||
assert_eq!(conn_result.num_received_tcs, 1);
|
||||
assert_eq!(conn_result.num_sent_tms, 0);
|
||||
set_if_done.store(true, Ordering::Relaxed);
|
||||
});
|
||||
// Send TC to server now.
|
||||
let mut encoded_buf: [u8; 16] = [0; 16];
|
||||
let mut current_idx = 0;
|
||||
encode_simple_packet(&mut encoded_buf, &mut current_idx);
|
||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||
stream
|
||||
.write_all(&encoded_buf[..current_idx])
|
||||
.expect("writing to TCP server failed");
|
||||
drop(stream);
|
||||
// A certain amount of time is allowed for the transaction to complete.
|
||||
for _ in 0..3 {
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
panic!("connection was not handled properly");
|
||||
}
|
||||
// Check that the packet was received and decoded successfully.
|
||||
let mut tc_queue = tc_receiver
|
||||
.tc_queue
|
||||
.lock()
|
||||
.expect("locking tc queue failed");
|
||||
assert_eq!(tc_queue.len(), 1);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
||||
drop(tc_queue);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_basic_multi_tm_multi_tc() {
|
||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let mut tm_source = SyncTmSource::default();
|
||||
tm_source.add_tm(&INVERTED_PACKET);
|
||||
tm_source.add_tm(&SIMPLE_PACKET);
|
||||
let mut tcp_server =
|
||||
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
|
||||
let dest_addr = tcp_server
|
||||
.local_addr()
|
||||
.expect("retrieving dest addr failed");
|
||||
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||
let set_if_done = conn_handled.clone();
|
||||
// Call the connection handler in separate thread, does block.
|
||||
thread::spawn(move || {
|
||||
let result = tcp_server.handle_next_connection();
|
||||
if result.is_err() {
|
||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||
}
|
||||
let conn_result = result.unwrap();
|
||||
assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received");
|
||||
assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received");
|
||||
set_if_done.store(true, Ordering::Relaxed);
|
||||
});
|
||||
// Send TC to server now.
|
||||
let mut encoded_buf: [u8; 32] = [0; 32];
|
||||
let mut current_idx = 0;
|
||||
encode_simple_packet(&mut encoded_buf, &mut current_idx);
|
||||
encode_inverted_packet(&mut encoded_buf, &mut current_idx);
|
||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||
stream
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("setting reas timeout failed");
|
||||
stream
|
||||
.write_all(&encoded_buf[..current_idx])
|
||||
.expect("writing to TCP server failed");
|
||||
// Done with writing.
|
||||
stream
|
||||
.shutdown(std::net::Shutdown::Write)
|
||||
.expect("shutting down write failed");
|
||||
let mut read_buf: [u8; 16] = [0; 16];
|
||||
let mut read_len_total = 0;
|
||||
// Timeout ensures this does not block forever.
|
||||
while read_len_total < 16 {
|
||||
let read_len = stream.read(&mut read_buf).expect("read failed");
|
||||
read_len_total += read_len;
|
||||
// Read until full expected size is available.
|
||||
if read_len == 16 {
|
||||
// Read first TM packet.
|
||||
current_idx = 0;
|
||||
assert_eq!(read_len, 16);
|
||||
assert_eq!(read_buf[0], 0);
|
||||
current_idx += 1;
|
||||
let mut dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
|
||||
.expect("COBS decoding failed");
|
||||
assert_eq!(dec_report.dst_used, 5);
|
||||
// Skip first sentinel byte.
|
||||
assert_eq!(
|
||||
&read_buf[current_idx..current_idx + INVERTED_PACKET.len()],
|
||||
&INVERTED_PACKET
|
||||
);
|
||||
current_idx += dec_report.src_used;
|
||||
// End sentinel.
|
||||
assert_eq!(read_buf[current_idx], 0, "invalid sentinel end byte");
|
||||
current_idx += 1;
|
||||
|
||||
// Read second TM packet.
|
||||
assert_eq!(read_buf[current_idx], 0);
|
||||
current_idx += 1;
|
||||
dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
|
||||
.expect("COBS decoding failed");
|
||||
assert_eq!(dec_report.dst_used, 5);
|
||||
// Skip first sentinel byte.
|
||||
assert_eq!(
|
||||
&read_buf[current_idx..current_idx + SIMPLE_PACKET.len()],
|
||||
&SIMPLE_PACKET
|
||||
);
|
||||
current_idx += dec_report.src_used;
|
||||
// End sentinel.
|
||||
assert_eq!(read_buf[current_idx], 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
drop(stream);
|
||||
|
||||
// A certain amount of time is allowed for the transaction to complete.
|
||||
for _ in 0..3 {
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
panic!("connection was not handled properly");
|
||||
}
|
||||
// Check that the packet was received and decoded successfully.
|
||||
let mut tc_queue = tc_receiver
|
||||
.tc_queue
|
||||
.lock()
|
||||
.expect("locking tc queue failed");
|
||||
assert_eq!(tc_queue.len(), 2);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
|
||||
drop(tc_queue);
|
||||
}
|
||||
}
|
359
satrs/src/hal/std/tcp_server.rs
Normal file
359
satrs/src/hal/std/tcp_server.rs
Normal file
@ -0,0 +1,359 @@
|
||||
//! Generic TCP TMTC servers with different TMTC format flavours.
|
||||
use alloc::vec;
|
||||
use alloc::vec::Vec;
|
||||
use core::time::Duration;
|
||||
use socket2::{Domain, Socket, Type};
|
||||
use std::io::Read;
|
||||
use std::net::TcpListener;
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::thread;
|
||||
|
||||
use crate::tmtc::{ReceivesTc, TmPacketSource};
|
||||
use thiserror::Error;
|
||||
|
||||
// Re-export the TMTC in COBS server.
|
||||
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
|
||||
pub use crate::hal::std::tcp_spacepackets_server::{
|
||||
SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
|
||||
};
|
||||
|
||||
/// Configuration struct for the generic TCP TMTC server
|
||||
///
|
||||
/// ## Parameters
|
||||
///
|
||||
/// * `addr` - Address of the TCP server.
|
||||
/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or
|
||||
/// no TM needs to be sent, the TCP server will delay for the specified amount of time
|
||||
/// to reduce CPU load.
|
||||
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
|
||||
/// encoding of that data. This buffer should at large enough to hold the maximum expected
|
||||
/// TM size read from the packet source.
|
||||
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
|
||||
/// the client. It is recommended to make this buffer larger to allow reading multiple
|
||||
/// consecutive packets as well, for example by using common buffer sizes like 4096 or 8192
|
||||
/// byte. The buffer should at the very least be large enough to hold the maximum expected
|
||||
/// telecommand size.
|
||||
/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
|
||||
/// especially useful if the address and port are static for the server. Set to false by
|
||||
/// default.
|
||||
/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
|
||||
/// especially useful if the address and port are static for the server. Set to false by
|
||||
/// default.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct ServerConfig {
|
||||
pub addr: SocketAddr,
|
||||
pub inner_loop_delay: Duration,
|
||||
pub tm_buffer_size: usize,
|
||||
pub tc_buffer_size: usize,
|
||||
pub reuse_addr: bool,
|
||||
pub reuse_port: bool,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn new(
|
||||
addr: SocketAddr,
|
||||
inner_loop_delay: Duration,
|
||||
tm_buffer_size: usize,
|
||||
tc_buffer_size: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
addr,
|
||||
inner_loop_delay,
|
||||
tm_buffer_size,
|
||||
tc_buffer_size,
|
||||
reuse_addr: false,
|
||||
reuse_port: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum TcpTmtcError<TmError, TcError> {
|
||||
#[error("TM retrieval error: {0}")]
|
||||
TmError(TmError),
|
||||
#[error("TC retrieval error: {0}")]
|
||||
TcError(TcError),
|
||||
#[error("io error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
/// Result of one connection attempt. Contains the client address if a connection was established,
|
||||
/// in addition to the number of telecommands and telemetry packets exchanged.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ConnectionResult {
|
||||
pub addr: Option<SocketAddr>,
|
||||
pub num_received_tcs: u32,
|
||||
pub num_sent_tms: u32,
|
||||
}
|
||||
|
||||
/// Generic parser abstraction for an object which can parse for telecommands given a raw
|
||||
/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand
|
||||
/// receiver. This allows different encoding schemes for telecommands.
|
||||
pub trait TcpTcParser<TmError, TcError> {
|
||||
fn handle_tc_parsing(
|
||||
&mut self,
|
||||
tc_buffer: &mut [u8],
|
||||
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||
conn_result: &mut ConnectionResult,
|
||||
current_write_idx: usize,
|
||||
next_write_idx: &mut usize,
|
||||
) -> Result<(), TcpTmtcError<TmError, TcError>>;
|
||||
}
|
||||
|
||||
/// Generic sender abstraction for an object which can pull telemetry from a given TM source
|
||||
/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream].
|
||||
/// The concrete implementation can also perform any encoding steps which are necessary before
|
||||
/// sending back the data to a client.
|
||||
pub trait TcpTmSender<TmError, TcError> {
|
||||
fn handle_tm_sending(
|
||||
&mut self,
|
||||
tm_buffer: &mut [u8],
|
||||
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||
conn_result: &mut ConnectionResult,
|
||||
stream: &mut TcpStream,
|
||||
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
|
||||
}
|
||||
|
||||
/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which
|
||||
/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry.
|
||||
///
|
||||
/// This server implements a generic TMTC handling logic and allows modifying its behaviour
|
||||
/// through the following 4 core abstractions:
|
||||
///
|
||||
/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client.
|
||||
/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver.
|
||||
/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client.
|
||||
/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender].
|
||||
///
|
||||
/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without
|
||||
/// having to re-implement common logic.
|
||||
///
|
||||
/// Currently, this framework offers the following concrete implementations:
|
||||
///
|
||||
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
|
||||
pub struct TcpTmtcGenericServer<
|
||||
TmError,
|
||||
TcError,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
TmSender: TcpTmSender<TmError, TcError>,
|
||||
TcParser: TcpTcParser<TmError, TcError>,
|
||||
> {
|
||||
pub(crate) listener: TcpListener,
|
||||
pub(crate) inner_loop_delay: Duration,
|
||||
pub(crate) tm_source: TmSource,
|
||||
pub(crate) tm_buffer: Vec<u8>,
|
||||
pub(crate) tc_receiver: TcReceiver,
|
||||
pub(crate) tc_buffer: Vec<u8>,
|
||||
tc_handler: TcParser,
|
||||
tm_handler: TmSender,
|
||||
}
|
||||
|
||||
impl<
|
||||
TmError: 'static,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
TmSender: TcpTmSender<TmError, TcError>,
|
||||
TcParser: TcpTcParser<TmError, TcError>,
|
||||
> TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, TmSender, TcParser>
|
||||
{
|
||||
/// Create a new generic TMTC server instance.
|
||||
///
|
||||
/// ## Parameter
|
||||
///
|
||||
/// * `cfg` - Configuration of the server.
|
||||
/// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from
|
||||
/// the client.
|
||||
/// * `tm_sender` - Sends back telemetry to the client using the specified TM source.
|
||||
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
|
||||
/// then sent back to the client.
|
||||
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
|
||||
/// to this TC receiver.
|
||||
pub fn new(
|
||||
cfg: ServerConfig,
|
||||
tc_parser: TcParser,
|
||||
tm_sender: TmSender,
|
||||
tm_source: TmSource,
|
||||
tc_receiver: TcReceiver,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
// Create a TCP listener bound to two addresses.
|
||||
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
|
||||
socket.set_reuse_address(cfg.reuse_addr)?;
|
||||
#[cfg(unix)]
|
||||
socket.set_reuse_port(cfg.reuse_port)?;
|
||||
let addr = (cfg.addr).into();
|
||||
socket.bind(&addr)?;
|
||||
socket.listen(128)?;
|
||||
Ok(Self {
|
||||
tc_handler: tc_parser,
|
||||
tm_handler: tm_sender,
|
||||
listener: socket.into(),
|
||||
inner_loop_delay: cfg.inner_loop_delay,
|
||||
tm_source,
|
||||
tm_buffer: vec![0; cfg.tm_buffer_size],
|
||||
tc_receiver,
|
||||
tc_buffer: vec![0; cfg.tc_buffer_size],
|
||||
})
|
||||
}
|
||||
|
||||
/// Retrieve the internal [TcpListener] class.
|
||||
pub fn listener(&mut self) -> &mut TcpListener {
|
||||
&mut self.listener
|
||||
}
|
||||
|
||||
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
||||
/// useful if using the port number 0 for OS auto-assignment.
|
||||
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
|
||||
self.listener.local_addr()
|
||||
}
|
||||
|
||||
/// This call is used to handle the next connection to a client. Right now, it performs
|
||||
/// the following steps:
|
||||
///
|
||||
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
|
||||
/// until a client connects.
|
||||
/// 2. It reads all the telecommands from the client and parses all received data using the
|
||||
/// user specified [TcpTcParser].
|
||||
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
|
||||
/// user specified [TcpTmSender].
|
||||
///
|
||||
/// The server will delay for a user-specified period if the client connects to the server
|
||||
/// for prolonged periods and there is no traffic for the server. This is the case if the
|
||||
/// client does not send any telecommands and no telemetry needs to be sent back to the client.
|
||||
pub fn handle_next_connection(
|
||||
&mut self,
|
||||
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
|
||||
let mut connection_result = ConnectionResult::default();
|
||||
let mut current_write_idx;
|
||||
let mut next_write_idx = 0;
|
||||
let (mut stream, addr) = self.listener.accept()?;
|
||||
stream.set_nonblocking(true)?;
|
||||
connection_result.addr = Some(addr);
|
||||
current_write_idx = next_write_idx;
|
||||
loop {
|
||||
let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
|
||||
match read_result {
|
||||
Ok(0) => {
|
||||
// Connection closed by client. If any TC was read, parse for complete packets.
|
||||
// After that, break the outer loop.
|
||||
if current_write_idx > 0 {
|
||||
self.tc_handler.handle_tc_parsing(
|
||||
&mut self.tc_buffer,
|
||||
&mut self.tc_receiver,
|
||||
&mut connection_result,
|
||||
current_write_idx,
|
||||
&mut next_write_idx,
|
||||
)?;
|
||||
}
|
||||
break;
|
||||
}
|
||||
Ok(read_len) => {
|
||||
current_write_idx += read_len;
|
||||
// TC buffer is full, we must parse for complete packets now.
|
||||
if current_write_idx == self.tc_buffer.capacity() {
|
||||
self.tc_handler.handle_tc_parsing(
|
||||
&mut self.tc_buffer,
|
||||
&mut self.tc_receiver,
|
||||
&mut connection_result,
|
||||
current_write_idx,
|
||||
&mut next_write_idx,
|
||||
)?;
|
||||
current_write_idx = next_write_idx;
|
||||
}
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
// As per [TcpStream::set_read_timeout] documentation, this should work for
|
||||
// both UNIX and Windows.
|
||||
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
|
||||
self.tc_handler.handle_tc_parsing(
|
||||
&mut self.tc_buffer,
|
||||
&mut self.tc_receiver,
|
||||
&mut connection_result,
|
||||
current_write_idx,
|
||||
&mut next_write_idx,
|
||||
)?;
|
||||
current_write_idx = next_write_idx;
|
||||
|
||||
if !self.tm_handler.handle_tm_sending(
|
||||
&mut self.tm_buffer,
|
||||
&mut self.tm_source,
|
||||
&mut connection_result,
|
||||
&mut stream,
|
||||
)? {
|
||||
// No TC read, no TM was sent, but the client has not disconnected.
|
||||
// Perform an inner delay to avoid burning CPU time.
|
||||
thread::sleep(self.inner_loop_delay);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(TcpTmtcError::Io(e));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
self.tm_handler.handle_tm_sending(
|
||||
&mut self.tm_buffer,
|
||||
&mut self.tm_source,
|
||||
&mut connection_result,
|
||||
&mut stream,
|
||||
)?;
|
||||
Ok(connection_result)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::sync::Mutex;
|
||||
|
||||
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
|
||||
|
||||
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct SyncTcCacher {
|
||||
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||
}
|
||||
impl ReceivesTcCore for SyncTcCacher {
|
||||
type Error = ();
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
||||
tc_queue.push_back(tc_raw.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct SyncTmSource {
|
||||
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl SyncTmSource {
|
||||
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
|
||||
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
|
||||
tm_queue.push_back(tm.to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
impl TmPacketSourceCore for SyncTmSource {
|
||||
type Error = ();
|
||||
|
||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
|
||||
if !tm_queue.is_empty() {
|
||||
let next_vec = tm_queue.front().unwrap();
|
||||
if buffer.len() < next_vec.len() {
|
||||
panic!(
|
||||
"provided buffer too small, must be at least {} bytes",
|
||||
next_vec.len()
|
||||
);
|
||||
}
|
||||
let next_vec = tm_queue.pop_front().unwrap();
|
||||
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
||||
return Ok(next_vec.len());
|
||||
}
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
362
satrs/src/hal/std/tcp_spacepackets_server.rs
Normal file
362
satrs/src/hal/std/tcp_spacepackets_server.rs
Normal file
@ -0,0 +1,362 @@
|
||||
use delegate::delegate;
|
||||
use std::{
|
||||
io::Write,
|
||||
net::{SocketAddr, TcpListener, TcpStream},
|
||||
};
|
||||
|
||||
use alloc::boxed::Box;
|
||||
|
||||
use crate::{
|
||||
encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets},
|
||||
tmtc::{ReceivesTc, TmPacketSource},
|
||||
};
|
||||
|
||||
use super::tcp_server::{
|
||||
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
||||
};
|
||||
|
||||
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
|
||||
pub struct SpacepacketsTcParser {
|
||||
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
|
||||
}
|
||||
|
||||
impl SpacepacketsTcParser {
|
||||
pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
|
||||
Self { packet_id_lookup }
|
||||
}
|
||||
}
|
||||
|
||||
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for SpacepacketsTcParser {
|
||||
fn handle_tc_parsing(
|
||||
&mut self,
|
||||
tc_buffer: &mut [u8],
|
||||
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||
conn_result: &mut ConnectionResult,
|
||||
current_write_idx: usize,
|
||||
next_write_idx: &mut usize,
|
||||
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||
// Reader vec full, need to parse for packets.
|
||||
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
|
||||
&mut tc_buffer[..current_write_idx],
|
||||
self.packet_id_lookup.as_ref(),
|
||||
tc_receiver.upcast_mut(),
|
||||
next_write_idx,
|
||||
)
|
||||
.map_err(|e| TcpTmtcError::TcError(e))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer].
|
||||
#[derive(Default)]
|
||||
pub struct SpacepacketsTmSender {}
|
||||
|
||||
impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
||||
fn handle_tm_sending(
|
||||
&mut self,
|
||||
tm_buffer: &mut [u8],
|
||||
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||
conn_result: &mut ConnectionResult,
|
||||
stream: &mut TcpStream,
|
||||
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
||||
let mut tm_was_sent = false;
|
||||
loop {
|
||||
// Write TM until TM source is exhausted. For now, there is no limit for the amount
|
||||
// of TM written this way.
|
||||
let read_tm_len = tm_source
|
||||
.retrieve_packet(tm_buffer)
|
||||
.map_err(|e| TcpTmtcError::TmError(e))?;
|
||||
|
||||
if read_tm_len == 0 {
|
||||
return Ok(tm_was_sent);
|
||||
}
|
||||
tm_was_sent = true;
|
||||
conn_result.num_sent_tms += 1;
|
||||
|
||||
stream.write_all(&tm_buffer[..read_tm_len])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// TCP TMTC server implementation for exchange of tightly stuffed
|
||||
/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf).
|
||||
///
|
||||
/// This serves only works if
|
||||
/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only
|
||||
/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter
|
||||
/// and start marker when parsing for packets. The user specifies a set of expected
|
||||
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
|
||||
///
|
||||
/// ## Example
|
||||
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
||||
/// also serves as the example application for this module.
|
||||
pub struct TcpSpacepacketsServer<
|
||||
TmError,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> {
|
||||
generic_server: TcpTmtcGenericServer<
|
||||
TmError,
|
||||
TcError,
|
||||
TmSource,
|
||||
TcReceiver,
|
||||
SpacepacketsTmSender,
|
||||
SpacepacketsTcParser,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<
|
||||
TmError: 'static,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> TcpSpacepacketsServer<TmError, TcError, TmSource, TcReceiver>
|
||||
{
|
||||
///
|
||||
/// ## Parameter
|
||||
///
|
||||
/// * `cfg` - Configuration of the server.
|
||||
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
|
||||
/// then sent back to the client.
|
||||
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
|
||||
/// forwarded to this TC receiver.
|
||||
/// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet
|
||||
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
|
||||
pub fn new(
|
||||
cfg: ServerConfig,
|
||||
tm_source: TmSource,
|
||||
tc_receiver: TcReceiver,
|
||||
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
generic_server: TcpTmtcGenericServer::new(
|
||||
cfg,
|
||||
SpacepacketsTcParser::new(packet_id_lookup),
|
||||
SpacepacketsTmSender::default(),
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
|
||||
delegate! {
|
||||
to self.generic_server {
|
||||
pub fn listener(&mut self) -> &mut TcpListener;
|
||||
|
||||
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
||||
/// useful if using the port number 0 for OS auto-assignment.
|
||||
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
|
||||
|
||||
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
||||
pub fn handle_next_connection(
|
||||
&mut self,
|
||||
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::{
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
#[allow(unused_imports)]
|
||||
use std::println;
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
||||
thread,
|
||||
};
|
||||
|
||||
use alloc::{boxed::Box, sync::Arc};
|
||||
use hashbrown::HashSet;
|
||||
use spacepackets::{
|
||||
ecss::{tc::PusTcCreator, WritablePusPacket},
|
||||
PacketId, SpHeader,
|
||||
};
|
||||
|
||||
use crate::hal::std::tcp_server::{
|
||||
tests::{SyncTcCacher, SyncTmSource},
|
||||
ServerConfig,
|
||||
};
|
||||
|
||||
use super::TcpSpacepacketsServer;
|
||||
|
||||
const TEST_APID_0: u16 = 0x02;
|
||||
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
|
||||
const TEST_APID_1: u16 = 0x10;
|
||||
const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1);
|
||||
|
||||
fn generic_tmtc_server(
|
||||
addr: &SocketAddr,
|
||||
tc_receiver: SyncTcCacher,
|
||||
tm_source: SyncTmSource,
|
||||
packet_id_lookup: HashSet<PacketId>,
|
||||
) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> {
|
||||
TcpSpacepacketsServer::new(
|
||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
Box::new(packet_id_lookup),
|
||||
)
|
||||
.expect("TCP server generation failed")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_tc_only() {
|
||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let tm_source = SyncTmSource::default();
|
||||
let mut packet_id_lookup = HashSet::new();
|
||||
packet_id_lookup.insert(TEST_PACKET_ID_0);
|
||||
let mut tcp_server = generic_tmtc_server(
|
||||
&auto_port_addr,
|
||||
tc_receiver.clone(),
|
||||
tm_source,
|
||||
packet_id_lookup,
|
||||
);
|
||||
let dest_addr = tcp_server
|
||||
.local_addr()
|
||||
.expect("retrieving dest addr failed");
|
||||
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||
let set_if_done = conn_handled.clone();
|
||||
// Call the connection handler in separate thread, does block.
|
||||
thread::spawn(move || {
|
||||
let result = tcp_server.handle_next_connection();
|
||||
if result.is_err() {
|
||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||
}
|
||||
let conn_result = result.unwrap();
|
||||
assert_eq!(conn_result.num_received_tcs, 1);
|
||||
assert_eq!(conn_result.num_sent_tms, 0);
|
||||
set_if_done.store(true, Ordering::Relaxed);
|
||||
});
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||
let tc_0 = ping_tc.to_vec().expect("packet generation failed");
|
||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||
stream
|
||||
.write_all(&tc_0)
|
||||
.expect("writing to TCP server failed");
|
||||
drop(stream);
|
||||
|
||||
// A certain amount of time is allowed for the transaction to complete.
|
||||
for _ in 0..3 {
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
panic!("connection was not handled properly");
|
||||
}
|
||||
// Check that TC has arrived.
|
||||
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||
assert_eq!(tc_queue.len(), 1);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multi_tc_multi_tm() {
|
||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let mut tm_source = SyncTmSource::default();
|
||||
|
||||
// Add telemetry
|
||||
let mut total_tm_len = 0;
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
|
||||
let tm_0 = verif_tm.to_vec().expect("writing packet failed");
|
||||
total_tm_len += tm_0.len();
|
||||
tm_source.add_tm(&tm_0);
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
|
||||
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true);
|
||||
let tm_1 = verif_tm.to_vec().expect("writing packet failed");
|
||||
total_tm_len += tm_1.len();
|
||||
tm_source.add_tm(&tm_1);
|
||||
|
||||
// Set up server
|
||||
let mut packet_id_lookup = HashSet::new();
|
||||
packet_id_lookup.insert(TEST_PACKET_ID_0);
|
||||
packet_id_lookup.insert(TEST_PACKET_ID_1);
|
||||
let mut tcp_server = generic_tmtc_server(
|
||||
&auto_port_addr,
|
||||
tc_receiver.clone(),
|
||||
tm_source,
|
||||
packet_id_lookup,
|
||||
);
|
||||
let dest_addr = tcp_server
|
||||
.local_addr()
|
||||
.expect("retrieving dest addr failed");
|
||||
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||
let set_if_done = conn_handled.clone();
|
||||
|
||||
// Call the connection handler in separate thread, does block.
|
||||
thread::spawn(move || {
|
||||
let result = tcp_server.handle_next_connection();
|
||||
if result.is_err() {
|
||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||
}
|
||||
let conn_result = result.unwrap();
|
||||
assert_eq!(
|
||||
conn_result.num_received_tcs, 2,
|
||||
"wrong number of received TCs"
|
||||
);
|
||||
assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs");
|
||||
set_if_done.store(true, Ordering::Relaxed);
|
||||
});
|
||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||
stream
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("setting reas timeout failed");
|
||||
|
||||
// Send telecommands
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||
let tc_0 = ping_tc.to_vec().expect("ping tc creation failed");
|
||||
stream
|
||||
.write_all(&tc_0)
|
||||
.expect("writing to TCP server failed");
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
|
||||
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
|
||||
let tc_1 = action_tc.to_vec().expect("action tc creation failed");
|
||||
stream
|
||||
.write_all(&tc_1)
|
||||
.expect("writing to TCP server failed");
|
||||
|
||||
// Done with writing.
|
||||
stream
|
||||
.shutdown(std::net::Shutdown::Write)
|
||||
.expect("shutting down write failed");
|
||||
let mut read_buf: [u8; 32] = [0; 32];
|
||||
let mut current_idx = 0;
|
||||
let mut read_len_total = 0;
|
||||
// Timeout ensures this does not block forever.
|
||||
while read_len_total < total_tm_len {
|
||||
let read_len = stream
|
||||
.read(&mut read_buf[current_idx..])
|
||||
.expect("read failed");
|
||||
current_idx += read_len;
|
||||
read_len_total += read_len;
|
||||
}
|
||||
drop(stream);
|
||||
assert_eq!(read_buf[..tm_0.len()], tm_0);
|
||||
assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1);
|
||||
|
||||
// A certain amount of time is allowed for the transaction to complete.
|
||||
for _ in 0..3 {
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
if !conn_handled.load(Ordering::Relaxed) {
|
||||
panic!("connection was not handled properly");
|
||||
}
|
||||
// Check that TC has arrived.
|
||||
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||
assert_eq!(tc_queue.len(), 2);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), tc_1);
|
||||
}
|
||||
}
|
215
satrs/src/hal/std/udp_server.rs
Normal file
215
satrs/src/hal/std/udp_server.rs
Normal file
@ -0,0 +1,215 @@
|
||||
//! Generic UDP TC server.
|
||||
use crate::tmtc::{ReceivesTc, ReceivesTcCore};
|
||||
use std::boxed::Box;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
|
||||
use std::vec;
|
||||
use std::vec::Vec;
|
||||
|
||||
/// This UDP server can be used to receive CCSDS space packet telecommands or any other telecommand
|
||||
/// format.
|
||||
///
|
||||
/// It caches all received telecomands into a vector. The maximum expected telecommand size should
|
||||
/// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC
|
||||
/// receiver in form of a special trait object which implements [ReceivesTc]. Please note that the
|
||||
/// receiver should copy out the received data if it the data is required past the
|
||||
/// [ReceivesTcCore::pass_tc] call.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
/// use spacepackets::ecss::WritablePusPacket;
|
||||
/// use satrs::hal::std::udp_server::UdpTcServer;
|
||||
/// use satrs::tmtc::{ReceivesTc, ReceivesTcCore};
|
||||
/// use spacepackets::SpHeader;
|
||||
/// use spacepackets::ecss::tc::PusTcCreator;
|
||||
///
|
||||
/// #[derive (Default)]
|
||||
/// struct PingReceiver {}
|
||||
/// impl ReceivesTcCore for PingReceiver {
|
||||
/// type Error = ();
|
||||
/// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
/// assert_eq!(tc_raw.len(), 13);
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// let mut buf = [0; 32];
|
||||
/// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
|
||||
/// let ping_receiver = PingReceiver::default();
|
||||
/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver))
|
||||
/// .expect("Creating UDP TMTC server failed");
|
||||
/// let mut sph = SpHeader::tc_unseg(0x02, 0, 0).unwrap();
|
||||
/// let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||
/// let len = pus_tc
|
||||
/// .write_to_bytes(&mut buf)
|
||||
/// .expect("Error writing PUS TC packet");
|
||||
/// assert_eq!(len, 13);
|
||||
/// let client = UdpSocket::bind("127.0.0.1:7778").expect("Connecting to UDP server failed");
|
||||
/// client
|
||||
/// .send_to(&buf[0..len], dest_addr)
|
||||
/// .expect("Error sending PUS TC via UDP");
|
||||
/// ```
|
||||
///
|
||||
/// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example)
|
||||
/// server code also includes
|
||||
/// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67)
|
||||
/// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port
|
||||
/// and then forwards them to a generic CCSDS packet receiver.
|
||||
pub struct UdpTcServer<E> {
|
||||
pub socket: UdpSocket,
|
||||
recv_buf: Vec<u8>,
|
||||
sender_addr: Option<SocketAddr>,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = E>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ReceiveResult<E> {
|
||||
NothingReceived,
|
||||
IoError(Error),
|
||||
ReceiverError(E),
|
||||
}
|
||||
|
||||
impl<E> From<Error> for ReceiveResult<E> {
|
||||
fn from(e: Error) -> Self {
|
||||
ReceiveResult::IoError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: PartialEq> PartialEq for ReceiveResult<E> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
use ReceiveResult::*;
|
||||
match (self, other) {
|
||||
(IoError(ref e), IoError(ref other_e)) => e.kind() == other_e.kind(),
|
||||
(NothingReceived, NothingReceived) => true,
|
||||
(ReceiverError(e), ReceiverError(other_e)) => e == other_e,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Eq + PartialEq> Eq for ReceiveResult<E> {}
|
||||
|
||||
impl<E: 'static> ReceivesTcCore for UdpTcServer<E> {
|
||||
type Error = E;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.tc_receiver.pass_tc(tc_raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: 'static> UdpTcServer<E> {
|
||||
pub fn new<A: ToSocketAddrs>(
|
||||
addr: A,
|
||||
max_recv_size: usize,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = E>>,
|
||||
) -> Result<Self, Error> {
|
||||
let server = Self {
|
||||
socket: UdpSocket::bind(addr)?,
|
||||
recv_buf: vec![0; max_recv_size],
|
||||
sender_addr: None,
|
||||
tc_receiver,
|
||||
};
|
||||
server.socket.set_nonblocking(true)?;
|
||||
Ok(server)
|
||||
}
|
||||
|
||||
pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult<E>> {
|
||||
let res = match self.socket.recv_from(&mut self.recv_buf) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut {
|
||||
Err(ReceiveResult::NothingReceived)
|
||||
} else {
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
};
|
||||
let (num_bytes, from) = res;
|
||||
self.sender_addr = Some(from);
|
||||
self.tc_receiver
|
||||
.pass_tc(&self.recv_buf[0..num_bytes])
|
||||
.map_err(|e| ReceiveResult::ReceiverError(e))?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn last_sender(&self) -> Option<SocketAddr> {
|
||||
self.sender_addr
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer};
|
||||
use crate::tmtc::ReceivesTcCore;
|
||||
use spacepackets::ecss::tc::PusTcCreator;
|
||||
use spacepackets::ecss::WritablePusPacket;
|
||||
use spacepackets::SpHeader;
|
||||
use std::boxed::Box;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
use std::vec::Vec;
|
||||
|
||||
fn is_send<T: Send>(_: &T) {}
|
||||
|
||||
#[derive(Default)]
|
||||
struct PingReceiver {
|
||||
pub sent_cmds: VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl ReceivesTcCore for PingReceiver {
|
||||
type Error = ();
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let mut sent_data = Vec::new();
|
||||
sent_data.extend_from_slice(tc_raw);
|
||||
self.sent_cmds.push_back(sent_data);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_test() {
|
||||
let mut buf = [0; 32];
|
||||
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
|
||||
let ping_receiver = PingReceiver::default();
|
||||
is_send(&ping_receiver);
|
||||
let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver))
|
||||
.expect("Creating UDP TMTC server failed");
|
||||
is_send(&udp_tc_server);
|
||||
let mut sph = SpHeader::tc_unseg(0x02, 0, 0).unwrap();
|
||||
let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||
let len = pus_tc
|
||||
.write_to_bytes(&mut buf)
|
||||
.expect("Error writing PUS TC packet");
|
||||
let client = UdpSocket::bind("127.0.0.1:7778").expect("Connecting to UDP server failed");
|
||||
client
|
||||
.send_to(&buf[0..len], dest_addr)
|
||||
.expect("Error sending PUS TC via UDP");
|
||||
let local_addr = client.local_addr().unwrap();
|
||||
udp_tc_server
|
||||
.try_recv_tc()
|
||||
.expect("Error receiving sent telecommand");
|
||||
assert_eq!(
|
||||
udp_tc_server.last_sender().expect("No sender set"),
|
||||
local_addr
|
||||
);
|
||||
let ping_receiver: &mut PingReceiver = udp_tc_server.tc_receiver.downcast_mut().unwrap();
|
||||
assert_eq!(ping_receiver.sent_cmds.len(), 1);
|
||||
let sent_cmd = ping_receiver.sent_cmds.pop_front().unwrap();
|
||||
assert_eq!(sent_cmd, buf[0..len]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nothing_received() {
|
||||
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7779);
|
||||
let ping_receiver = PingReceiver::default();
|
||||
let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver))
|
||||
.expect("Creating UDP TMTC server failed");
|
||||
let res = udp_tc_server.try_recv_tc();
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert_eq!(err, ReceiveResult::NothingReceived);
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user