Some checks are pending
Rust/sat-rs/pipeline/head Build started...
- Add new shared subcrate satrs-shared to split off some shared components not expected to change very often. - Renmame `satrs-core` to `satrs`. It is expected that sat-rs will remain the primary crate, so the core information is superfluous, and core also implies stability, which will not be the case for some time.
380 lines
14 KiB
Rust
380 lines
14 KiB
Rust
use alloc::vec;
|
|
use cobs::encode;
|
|
use delegate::delegate;
|
|
use std::io::Write;
|
|
use std::net::SocketAddr;
|
|
use std::net::TcpListener;
|
|
use std::net::TcpStream;
|
|
use std::vec::Vec;
|
|
|
|
use crate::encoding::parse_buffer_for_cobs_encoded_packets;
|
|
use crate::tmtc::ReceivesTc;
|
|
use crate::tmtc::TmPacketSource;
|
|
|
|
use crate::hal::std::tcp_server::{
|
|
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
|
};
|
|
|
|
/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer].
|
|
#[derive(Default)]
|
|
pub struct CobsTcParser {}
|
|
|
|
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
|
|
fn handle_tc_parsing(
|
|
&mut self,
|
|
tc_buffer: &mut [u8],
|
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
|
conn_result: &mut ConnectionResult,
|
|
current_write_idx: usize,
|
|
next_write_idx: &mut usize,
|
|
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
|
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
|
|
&mut tc_buffer[..current_write_idx],
|
|
tc_receiver.upcast_mut(),
|
|
next_write_idx,
|
|
)
|
|
.map_err(|e| TcpTmtcError::TcError(e))?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Concrete [TcpTmSender] implementation for the [TcpTmtcInCobsServer].
|
|
pub struct CobsTmSender {
|
|
tm_encoding_buffer: Vec<u8>,
|
|
}
|
|
|
|
impl CobsTmSender {
|
|
fn new(tm_buffer_size: usize) -> Self {
|
|
Self {
|
|
// The buffer should be large enough to hold the maximum expected TM size encoded with
|
|
// COBS.
|
|
tm_encoding_buffer: vec![0; cobs::max_encoding_length(tm_buffer_size)],
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|
fn handle_tm_sending(
|
|
&mut self,
|
|
tm_buffer: &mut [u8],
|
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
|
conn_result: &mut ConnectionResult,
|
|
stream: &mut TcpStream,
|
|
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
|
let mut tm_was_sent = false;
|
|
loop {
|
|
// Write TM until TM source is exhausted. For now, there is no limit for the amount
|
|
// of TM written this way.
|
|
let read_tm_len = tm_source
|
|
.retrieve_packet(tm_buffer)
|
|
.map_err(|e| TcpTmtcError::TmError(e))?;
|
|
|
|
if read_tm_len == 0 {
|
|
return Ok(tm_was_sent);
|
|
}
|
|
tm_was_sent = true;
|
|
conn_result.num_sent_tms += 1;
|
|
|
|
// Encode into COBS and sent to client.
|
|
let mut current_idx = 0;
|
|
self.tm_encoding_buffer[current_idx] = 0;
|
|
current_idx += 1;
|
|
current_idx += encode(
|
|
&tm_buffer[..read_tm_len],
|
|
&mut self.tm_encoding_buffer[current_idx..],
|
|
);
|
|
self.tm_encoding_buffer[current_idx] = 0;
|
|
current_idx += 1;
|
|
stream.write_all(&self.tm_encoding_buffer[..current_idx])?;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the
|
|
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
|
///
|
|
/// Telemetry will be encoded with the COBS protocol using [cobs::encode] in addition to being
|
|
/// wrapped with the sentinel value 0 as the packet delimiter as well before being sent back to
|
|
/// the client. Please note that the server will send as much data as it can retrieve from the
|
|
/// [TmPacketSource] in its current implementation.
|
|
///
|
|
/// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data
|
|
/// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full
|
|
/// packets even from a data stream which is split up. The server wil use the
|
|
/// [parse_buffer_for_cobs_encoded_packets] function to parse for packets and pass them to a
|
|
/// generic TC receiver. The user can use [crate::encoding::encode_packet_with_cobs] to encode
|
|
/// telecommands sent to the server.
|
|
///
|
|
/// ## Example
|
|
///
|
|
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
|
/// test also serves as the example application for this module.
|
|
pub struct TcpTmtcInCobsServer<
|
|
TmError,
|
|
TcError: 'static,
|
|
TmSource: TmPacketSource<Error = TmError>,
|
|
TcReceiver: ReceivesTc<Error = TcError>,
|
|
> {
|
|
generic_server:
|
|
TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, CobsTmSender, CobsTcParser>,
|
|
}
|
|
|
|
impl<
|
|
TmError: 'static,
|
|
TcError: 'static,
|
|
TmSource: TmPacketSource<Error = TmError>,
|
|
TcReceiver: ReceivesTc<Error = TcError>,
|
|
> TcpTmtcInCobsServer<TmError, TcError, TmSource, TcReceiver>
|
|
{
|
|
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
|
|
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
|
///
|
|
/// ## Parameter
|
|
///
|
|
/// * `cfg` - Configuration of the server.
|
|
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
|
|
/// then sent back to the client.
|
|
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
|
|
/// forwarded to this TC receiver.
|
|
pub fn new(
|
|
cfg: ServerConfig,
|
|
tm_source: TmSource,
|
|
tc_receiver: TcReceiver,
|
|
) -> Result<Self, std::io::Error> {
|
|
Ok(Self {
|
|
generic_server: TcpTmtcGenericServer::new(
|
|
cfg,
|
|
CobsTcParser::default(),
|
|
CobsTmSender::new(cfg.tm_buffer_size),
|
|
tm_source,
|
|
tc_receiver,
|
|
)?,
|
|
})
|
|
}
|
|
|
|
delegate! {
|
|
to self.generic_server {
|
|
pub fn listener(&mut self) -> &mut TcpListener;
|
|
|
|
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
|
/// useful if using the port number 0 for OS auto-assignment.
|
|
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
|
|
|
|
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
|
pub fn handle_next_connection(
|
|
&mut self,
|
|
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use core::{
|
|
sync::atomic::{AtomicBool, Ordering},
|
|
time::Duration,
|
|
};
|
|
use std::{
|
|
io::{Read, Write},
|
|
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
|
thread,
|
|
};
|
|
|
|
use crate::{
|
|
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
|
hal::std::tcp_server::{
|
|
tests::{SyncTcCacher, SyncTmSource},
|
|
ServerConfig,
|
|
},
|
|
};
|
|
use alloc::sync::Arc;
|
|
use cobs::encode;
|
|
|
|
use super::TcpTmtcInCobsServer;
|
|
|
|
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
|
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
|
}
|
|
|
|
fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
|
encode_packet(&INVERTED_PACKET, encoded_buf, current_idx)
|
|
}
|
|
|
|
fn encode_packet(packet: &[u8], encoded_buf: &mut [u8], current_idx: &mut usize) {
|
|
encoded_buf[*current_idx] = 0;
|
|
*current_idx += 1;
|
|
*current_idx += encode(packet, &mut encoded_buf[*current_idx..]);
|
|
encoded_buf[*current_idx] = 0;
|
|
*current_idx += 1;
|
|
}
|
|
|
|
fn generic_tmtc_server(
|
|
addr: &SocketAddr,
|
|
tc_receiver: SyncTcCacher,
|
|
tm_source: SyncTmSource,
|
|
) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
|
|
TcpTmtcInCobsServer::new(
|
|
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
|
tm_source,
|
|
tc_receiver,
|
|
)
|
|
.expect("TCP server generation failed")
|
|
}
|
|
|
|
#[test]
|
|
fn test_server_basic_no_tm() {
|
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
|
let tc_receiver = SyncTcCacher::default();
|
|
let tm_source = SyncTmSource::default();
|
|
let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source);
|
|
let dest_addr = tcp_server
|
|
.local_addr()
|
|
.expect("retrieving dest addr failed");
|
|
let conn_handled: Arc<AtomicBool> = Default::default();
|
|
let set_if_done = conn_handled.clone();
|
|
// Call the connection handler in separate thread, does block.
|
|
thread::spawn(move || {
|
|
let result = tcp_server.handle_next_connection();
|
|
if result.is_err() {
|
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
|
}
|
|
let conn_result = result.unwrap();
|
|
assert_eq!(conn_result.num_received_tcs, 1);
|
|
assert_eq!(conn_result.num_sent_tms, 0);
|
|
set_if_done.store(true, Ordering::Relaxed);
|
|
});
|
|
// Send TC to server now.
|
|
let mut encoded_buf: [u8; 16] = [0; 16];
|
|
let mut current_idx = 0;
|
|
encode_simple_packet(&mut encoded_buf, &mut current_idx);
|
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
|
stream
|
|
.write_all(&encoded_buf[..current_idx])
|
|
.expect("writing to TCP server failed");
|
|
drop(stream);
|
|
// A certain amount of time is allowed for the transaction to complete.
|
|
for _ in 0..3 {
|
|
if !conn_handled.load(Ordering::Relaxed) {
|
|
thread::sleep(Duration::from_millis(5));
|
|
}
|
|
}
|
|
if !conn_handled.load(Ordering::Relaxed) {
|
|
panic!("connection was not handled properly");
|
|
}
|
|
// Check that the packet was received and decoded successfully.
|
|
let mut tc_queue = tc_receiver
|
|
.tc_queue
|
|
.lock()
|
|
.expect("locking tc queue failed");
|
|
assert_eq!(tc_queue.len(), 1);
|
|
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
|
drop(tc_queue);
|
|
}
|
|
|
|
#[test]
|
|
fn test_server_basic_multi_tm_multi_tc() {
|
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
|
let tc_receiver = SyncTcCacher::default();
|
|
let mut tm_source = SyncTmSource::default();
|
|
tm_source.add_tm(&INVERTED_PACKET);
|
|
tm_source.add_tm(&SIMPLE_PACKET);
|
|
let mut tcp_server =
|
|
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
|
|
let dest_addr = tcp_server
|
|
.local_addr()
|
|
.expect("retrieving dest addr failed");
|
|
let conn_handled: Arc<AtomicBool> = Default::default();
|
|
let set_if_done = conn_handled.clone();
|
|
// Call the connection handler in separate thread, does block.
|
|
thread::spawn(move || {
|
|
let result = tcp_server.handle_next_connection();
|
|
if result.is_err() {
|
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
|
}
|
|
let conn_result = result.unwrap();
|
|
assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received");
|
|
assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received");
|
|
set_if_done.store(true, Ordering::Relaxed);
|
|
});
|
|
// Send TC to server now.
|
|
let mut encoded_buf: [u8; 32] = [0; 32];
|
|
let mut current_idx = 0;
|
|
encode_simple_packet(&mut encoded_buf, &mut current_idx);
|
|
encode_inverted_packet(&mut encoded_buf, &mut current_idx);
|
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
|
stream
|
|
.set_read_timeout(Some(Duration::from_millis(10)))
|
|
.expect("setting reas timeout failed");
|
|
stream
|
|
.write_all(&encoded_buf[..current_idx])
|
|
.expect("writing to TCP server failed");
|
|
// Done with writing.
|
|
stream
|
|
.shutdown(std::net::Shutdown::Write)
|
|
.expect("shutting down write failed");
|
|
let mut read_buf: [u8; 16] = [0; 16];
|
|
let mut read_len_total = 0;
|
|
// Timeout ensures this does not block forever.
|
|
while read_len_total < 16 {
|
|
let read_len = stream.read(&mut read_buf).expect("read failed");
|
|
read_len_total += read_len;
|
|
// Read until full expected size is available.
|
|
if read_len == 16 {
|
|
// Read first TM packet.
|
|
current_idx = 0;
|
|
assert_eq!(read_len, 16);
|
|
assert_eq!(read_buf[0], 0);
|
|
current_idx += 1;
|
|
let mut dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
|
|
.expect("COBS decoding failed");
|
|
assert_eq!(dec_report.dst_used, 5);
|
|
// Skip first sentinel byte.
|
|
assert_eq!(
|
|
&read_buf[current_idx..current_idx + INVERTED_PACKET.len()],
|
|
&INVERTED_PACKET
|
|
);
|
|
current_idx += dec_report.src_used;
|
|
// End sentinel.
|
|
assert_eq!(read_buf[current_idx], 0, "invalid sentinel end byte");
|
|
current_idx += 1;
|
|
|
|
// Read second TM packet.
|
|
assert_eq!(read_buf[current_idx], 0);
|
|
current_idx += 1;
|
|
dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
|
|
.expect("COBS decoding failed");
|
|
assert_eq!(dec_report.dst_used, 5);
|
|
// Skip first sentinel byte.
|
|
assert_eq!(
|
|
&read_buf[current_idx..current_idx + SIMPLE_PACKET.len()],
|
|
&SIMPLE_PACKET
|
|
);
|
|
current_idx += dec_report.src_used;
|
|
// End sentinel.
|
|
assert_eq!(read_buf[current_idx], 0);
|
|
break;
|
|
}
|
|
}
|
|
drop(stream);
|
|
|
|
// A certain amount of time is allowed for the transaction to complete.
|
|
for _ in 0..3 {
|
|
if !conn_handled.load(Ordering::Relaxed) {
|
|
thread::sleep(Duration::from_millis(5));
|
|
}
|
|
}
|
|
if !conn_handled.load(Ordering::Relaxed) {
|
|
panic!("connection was not handled properly");
|
|
}
|
|
// Check that the packet was received and decoded successfully.
|
|
let mut tc_queue = tc_receiver
|
|
.tc_queue
|
|
.lock()
|
|
.expect("locking tc queue failed");
|
|
assert_eq!(tc_queue.len(), 2);
|
|
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
|
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
|
|
drop(tc_queue);
|
|
}
|
|
}
|