this already looks very promising
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
Robin Müller 2023-09-16 21:24:01 +02:00
parent 0e6d903942
commit e3043ce2d7
Signed by: muellerr
GPG Key ID: A649FB78196E3849

View File

@ -18,18 +18,45 @@ use super::tcp_server::TcpTmtcError;
/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the /// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
/// ///
/// TCP is stream oriented, so a client can read available telemetry using [std::io::Read] as well.
/// To allow flexibly specifying the telemetry sent back to clients, a generic TM abstraction
/// in form of the [TmPacketSource] trait is used. Telemetry will be encoded with the COBS
/// protocol using [cobs::encode] in addition to being wrapped with the sentinel value 0 as the
/// packet delimiter as well before being sent back to the client. Please note that the server
/// will send as much data as it can retrieve from the [TmPacketSource] in its current
/// implementation.
///
/// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data /// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data
/// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full /// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full
/// packets even from a data stream which is split up. /// packets even from a data stream which is split up. The server wil use the
/// /// [parse_buffer_for_cobs_encoded_packets] function to parse for packets and pass them to a
/// The server wil use the [parse_buffer_for_cobs_encoded_packets] function to parse for packets /// generic TC receiver.
/// and pass them to a generic TC receiver.
pub struct TcpTmtcInCobsServer<TcError, TmError> { pub struct TcpTmtcInCobsServer<TcError, TmError> {
base: TcpTmtcServerBase<TcError, TmError>, base: TcpTmtcServerBase<TcError, TmError>,
tm_encoding_buffer: Vec<u8>, tm_encoding_buffer: Vec<u8>,
} }
impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> { impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
/// Create a new TMTC server which exchanges TMTC packets encoded with
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
///
/// ## Parameter
///
/// * `addr` - Address of the TCP server.
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
/// encoding of that data. This buffer should at large enough to hold the maximum expected
/// TM size in addition to the COBS encoding overhead. You can use
/// [cobs::max_encoding_length] to calculate this size.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
/// the client. It is recommended to make this buffer larger to allow reading multiple
/// consecutive packets as well, for example by using 4096 or 8192 byte. The buffer should
/// at the very least be large enough to hold the maximum expected telecommand size in
/// addition to its COBS encoding overhead. You can use [cobs::max_encoding_length] to
/// calculate this size.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver.
pub fn new<A: ToSocketAddrs>( pub fn new<A: ToSocketAddrs>(
addr: A, addr: A,
tm_buffer_size: usize, tm_buffer_size: usize,
@ -49,6 +76,15 @@ impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
}) })
} }
/// This call is used to handle the next connection to a client. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 2. It reads all the telecommands from the client, which are expected to be COBS
/// encoded packets.
/// 3. After reading and parsing all telecommands, it sends back all telemetry it can retrieve
/// from the user specified [TmPacketSource] back to the client.
pub fn handle_next_connection( pub fn handle_next_connection(
&mut self, &mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> { ) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
@ -71,8 +107,8 @@ impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
&mut next_write_idx, &mut next_write_idx,
) )
.map_err(|e| TcpTmtcError::TcError(e))?; .map_err(|e| TcpTmtcError::TcError(e))?;
current_write_idx = next_write_idx;
} }
current_write_idx = next_write_idx;
continue; continue;
} }
break; break;
@ -173,7 +209,7 @@ mod tests {
time::Duration, time::Duration,
}; };
use std::{ use std::{
io::Write, io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::Mutex, sync::Mutex,
thread, thread,
@ -186,43 +222,64 @@ mod tests {
use super::{parse_buffer_for_cobs_encoded_packets, TcpTmtcInCobsServer}; use super::{parse_buffer_for_cobs_encoded_packets, TcpTmtcInCobsServer};
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1];
#[derive(Default)] #[derive(Default, Clone)]
struct TestTcSender { struct SyncTcCacher {
received_tcs: Vec<Vec<u8>>, tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
} }
impl ReceivesTcCore for SyncTcCacher {
impl ReceivesTcCore for TestTcSender {
type Error = (); type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.received_tcs.push(tc_raw.to_vec()); let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default)]
struct TcCacher {
tc_queue: VecDeque<Vec<u8>>,
}
impl ReceivesTcCore for TcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_queue.push_back(tc_raw.to_vec());
Ok(()) Ok(())
} }
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct TmSource { struct SyncTmSource {
shared_tm_source: Arc<Mutex<VecDeque<Vec<u8>>>>, tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
} }
impl TmSource { impl SyncTmSource {
fn new() -> Self { pub(crate) fn add_tm(&mut self, tm: &[u8]) {
Self { let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
shared_tm_source: Default::default(), tm_queue.push_back(tm.to_vec());
}
}
fn add_tm(&mut self, tm: &[u8]) {
let mut shared_tm_source = self.shared_tm_source.lock().unwrap();
shared_tm_source.push_back(tm.to_vec());
} }
} }
impl TmPacketSource for TmSource { impl TmPacketSource for SyncTmSource {
type Error = (); type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> { fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
let tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
if !tm_queue.is_empty() {
let next_vec = tm_queue.front().unwrap();
if buffer.len() < next_vec.len() {
panic!(
"provided buffer too small, must be at least {} bytes",
next_vec.len()
);
}
buffer[0..next_vec.len()].copy_from_slice(next_vec);
return Ok(next_vec.len());
}
Ok(0) Ok(0)
} }
} }
@ -237,7 +294,7 @@ mod tests {
#[test] #[test]
fn test_parsing_simple_packet() { fn test_parsing_simple_packet() {
let mut test_sender = TestTcSender::default(); let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -249,23 +306,22 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(packets, 1); assert_eq!(packets, 1);
assert_eq!(test_sender.received_tcs.len(), 1); assert_eq!(test_sender.tc_queue.len(), 1);
let packet = &test_sender.received_tcs[0]; let packet = &test_sender.tc_queue[0];
assert_eq!(packet, &SIMPLE_PACKET); assert_eq!(packet, &SIMPLE_PACKET);
} }
#[test] #[test]
fn test_parsing_consecutive_packets() { fn test_parsing_consecutive_packets() {
let mut test_sender = TestTcSender::default(); let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
let inverted_packet: [u8; 5] = [5, 4, 3, 2, 1];
// Second packet // Second packet
encoded_buf[current_idx] = 0; encoded_buf[current_idx] = 0;
current_idx += 1; current_idx += 1;
current_idx += encode(&inverted_packet, &mut encoded_buf[current_idx..]); current_idx += encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]);
encoded_buf[current_idx] = 0; encoded_buf[current_idx] = 0;
current_idx += 1; current_idx += 1;
let mut next_read_idx = 0; let mut next_read_idx = 0;
@ -276,16 +332,16 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(packets, 2); assert_eq!(packets, 2);
assert_eq!(test_sender.received_tcs.len(), 2); assert_eq!(test_sender.tc_queue.len(), 2);
let packet0 = &test_sender.received_tcs[0]; let packet0 = &test_sender.tc_queue[0];
assert_eq!(packet0, &SIMPLE_PACKET); assert_eq!(packet0, &SIMPLE_PACKET);
let packet1 = &test_sender.received_tcs[1]; let packet1 = &test_sender.tc_queue[1];
assert_eq!(packet1, &inverted_packet); assert_eq!(packet1, &INVERTED_PACKET);
} }
#[test] #[test]
fn test_split_tail_packet_only() { fn test_split_tail_packet_only() {
let mut test_sender = TestTcSender::default(); let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -298,22 +354,21 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(packets, 0); assert_eq!(packets, 0);
assert_eq!(test_sender.received_tcs.len(), 0); assert_eq!(test_sender.tc_queue.len(), 0);
assert_eq!(next_read_idx, 0); assert_eq!(next_read_idx, 0);
} }
fn generic_test_split_packet(cut_off: usize) { fn generic_test_split_packet(cut_off: usize) {
let mut test_sender = TestTcSender::default(); let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let inverted_packet: [u8; 5] = [5, 4, 3, 2, 1]; assert!(cut_off < INVERTED_PACKET.len() + 1);
assert!(cut_off < inverted_packet.len() + 1);
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
// Second packet // Second packet
encoded_buf[current_idx] = 0; encoded_buf[current_idx] = 0;
let packet_start = current_idx; let packet_start = current_idx;
current_idx += 1; current_idx += 1;
let encoded_len = encode(&inverted_packet, &mut encoded_buf[current_idx..]); let encoded_len = encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]);
assert_eq!(encoded_len, 6); assert_eq!(encoded_len, 6);
current_idx += encoded_len; current_idx += encoded_len;
// We cut off the sentinel byte, so we expecte the write index to be the length of the // We cut off the sentinel byte, so we expecte the write index to be the length of the
@ -331,8 +386,8 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(packets, 1); assert_eq!(packets, 1);
assert_eq!(test_sender.received_tcs.len(), 1); assert_eq!(test_sender.tc_queue.len(), 1);
assert_eq!(&test_sender.received_tcs[0], &SIMPLE_PACKET); assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, next_expected_write_idx); assert_eq!(next_write_idx, next_expected_write_idx);
assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start); assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start);
} }
@ -354,7 +409,7 @@ mod tests {
#[test] #[test]
fn test_zero_at_end() { fn test_zero_at_end() {
let mut test_sender = TestTcSender::default(); let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut next_write_idx = 0; let mut next_write_idx = 0;
let mut current_idx = 0; let mut current_idx = 0;
@ -371,15 +426,15 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(packets, 1); assert_eq!(packets, 1);
assert_eq!(test_sender.received_tcs.len(), 1); assert_eq!(test_sender.tc_queue.len(), 1);
assert_eq!(&test_sender.received_tcs[0], &SIMPLE_PACKET); assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, 1); assert_eq!(next_write_idx, 1);
assert_eq!(encoded_buf[0], 0); assert_eq!(encoded_buf[0], 0);
} }
#[test] #[test]
fn test_all_zeroes() { fn test_all_zeroes() {
let mut test_sender = TestTcSender::default(); let mut test_sender = TcCacher::default();
let mut all_zeroes: [u8; 5] = [0; 5]; let mut all_zeroes: [u8; 5] = [0; 5];
let mut next_write_idx = 0; let mut next_write_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
@ -390,23 +445,32 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(packets, 0); assert_eq!(packets, 0);
assert!(test_sender.received_tcs.is_empty()); assert!(test_sender.tc_queue.is_empty());
assert_eq!(next_write_idx, 0); assert_eq!(next_write_idx, 0);
} }
#[test] fn generic_tmtc_server(
fn test_server_basic() { addr: &SocketAddr,
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); tc_receiver: SyncTcCacher,
let tc_receiver = TestTcSender::default(); tm_source: SyncTmSource,
let tm_source = TmSource::default(); ) -> TcpTmtcInCobsServer<(), ()> {
let mut tcp_server = TcpTmtcInCobsServer::new( TcpTmtcInCobsServer::new(
dest_addr, addr,
1024, 1024,
Box::new(tm_source), Box::new(tm_source),
1024, 1024,
Box::new(tc_receiver), Box::new(tc_receiver.clone()),
) )
.expect("TCP server generation failed"); .expect("TCP server generation failed")
}
#[test]
fn test_server_basic_no_tm() {
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default();
let mut tcp_server =
generic_tmtc_server(&dest_addr, tc_receiver.clone(), tm_source.clone());
let conn_handled: Arc<AtomicBool> = Default::default(); let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone(); let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block. // Call the connection handler in separate thread, does block.
@ -415,6 +479,9 @@ mod tests {
if result.is_err() { if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err()); panic!("handling connection failed: {:?}", result.unwrap_err());
} }
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 0);
set_if_done.store(true, Ordering::Relaxed); set_if_done.store(true, Ordering::Relaxed);
}); });
// Send TC to server now. // Send TC to server now.
@ -423,16 +490,88 @@ mod tests {
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream stream
.write_all(&encoded_buf) .write_all(&encoded_buf[..current_idx])
.expect("writing to TCP server failed"); .expect("writing to TCP server failed");
drop(stream);
// A certain amount of time is allowed for the transaction to complete. // A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 { for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) { if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(1)); thread::sleep(Duration::from_millis(5));
} }
} }
if !conn_handled.load(Ordering::Relaxed) { if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly"); panic!("connection was not handled properly");
} }
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
}
#[test]
fn test_server_basic_no_tm_multi_tc() {}
#[test]
fn test_server_basic_with_tm() {
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
tm_source.add_tm(&INVERTED_PACKET);
let mut tcp_server =
generic_tmtc_server(&dest_addr, tc_receiver.clone(), tm_source.clone());
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 1);
set_if_done.store(true, Ordering::Relaxed);
});
// Send TC to server now.
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.write_all(&encoded_buf[..current_idx])
.expect("writing to TCP server failed");
let mut read_buf: [u8; 16] = [0; 16];
let read_len = stream.read(&mut read_buf).expect("read failed");
// 1 byte encoding overhead, 2 sentinel bytes.
assert_eq!(read_len, 8);
assert_eq!(read_buf[0], 0);
assert_eq!(read_buf[read_len - 1], 0);
let decoded_len =
cobs::decode_in_place(&mut read_buf[1..read_len]).expect("COBS decoding failed");
assert_eq!(decoded_len, 5);
assert_eq!(&read_buf[..INVERTED_PACKET.len()], &INVERTED_PACKET);
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
} }
} }