Merge pull request 'CCSDS TCP Server' (#79) from tcp-ccsds-server into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #79
This commit is contained in:
commit
9f37c84dfc
@ -64,7 +64,7 @@ impl PacketIdLookup for [PacketId] {
|
|||||||
pub fn parse_buffer_for_ccsds_space_packets<E>(
|
pub fn parse_buffer_for_ccsds_space_packets<E>(
|
||||||
buf: &mut [u8],
|
buf: &mut [u8],
|
||||||
packet_id_lookup: &(impl PacketIdLookup + ?Sized),
|
packet_id_lookup: &(impl PacketIdLookup + ?Sized),
|
||||||
tc_receiver: &mut impl ReceivesTcCore<Error = E>,
|
tc_receiver: &mut (impl ReceivesTcCore<Error = E> + ?Sized),
|
||||||
next_write_idx: &mut usize,
|
next_write_idx: &mut usize,
|
||||||
) -> Result<u32, E> {
|
) -> Result<u32, E> {
|
||||||
*next_write_idx = 0;
|
*next_write_idx = 0;
|
||||||
@ -80,7 +80,7 @@ pub fn parse_buffer_for_ccsds_space_packets<E>(
|
|||||||
let length_field =
|
let length_field =
|
||||||
u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap());
|
u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap());
|
||||||
let packet_size = length_field + 7;
|
let packet_size = length_field + 7;
|
||||||
if (current_idx + packet_size as usize) < buf_len {
|
if (current_idx + packet_size as usize) <= buf_len {
|
||||||
tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?;
|
tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?;
|
||||||
packets_found += 1;
|
packets_found += 1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2,5 +2,5 @@
|
|||||||
pub mod tcp_server;
|
pub mod tcp_server;
|
||||||
pub mod udp_server;
|
pub mod udp_server;
|
||||||
|
|
||||||
|
mod tcp_cobs_server;
|
||||||
mod tcp_spacepackets_server;
|
mod tcp_spacepackets_server;
|
||||||
mod tcp_with_cobs_server;
|
|
||||||
|
@ -24,7 +24,7 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
|
|||||||
fn handle_tc_parsing(
|
fn handle_tc_parsing(
|
||||||
&mut self,
|
&mut self,
|
||||||
tc_buffer: &mut [u8],
|
tc_buffer: &mut [u8],
|
||||||
tc_receiver: &mut dyn ReceivesTc<Error = TcError>,
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut ConnectionResult,
|
||||||
current_write_idx: usize,
|
current_write_idx: usize,
|
||||||
next_write_idx: &mut usize,
|
next_write_idx: &mut usize,
|
||||||
@ -59,7 +59,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
fn handle_tm_sending(
|
fn handle_tm_sending(
|
||||||
&mut self,
|
&mut self,
|
||||||
tm_buffer: &mut [u8],
|
tm_buffer: &mut [u8],
|
||||||
tm_source: &mut dyn TmPacketSource<Error = TmError>,
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut ConnectionResult,
|
||||||
stream: &mut TcpStream,
|
stream: &mut TcpStream,
|
||||||
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
||||||
@ -109,7 +109,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
///
|
///
|
||||||
/// ## Example
|
/// ## Example
|
||||||
///
|
///
|
||||||
/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs)
|
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
||||||
/// test also serves as the example application for this module.
|
/// test also serves as the example application for this module.
|
||||||
pub struct TcpTmtcInCobsServer<TmError, TcError: 'static> {
|
pub struct TcpTmtcInCobsServer<TmError, TcError: 'static> {
|
||||||
generic_server: TcpTmtcGenericServer<TmError, TcError, CobsTmSender, CobsTcParser>,
|
generic_server: TcpTmtcGenericServer<TmError, TcError, CobsTmSender, CobsTcParser>,
|
||||||
@ -167,67 +167,21 @@ mod tests {
|
|||||||
use std::{
|
use std::{
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
||||||
sync::Mutex,
|
|
||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
||||||
hal::std::tcp_server::ServerConfig,
|
hal::std::tcp_server::{
|
||||||
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
tests::{SyncTcCacher, SyncTmSource},
|
||||||
|
ServerConfig,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
|
use alloc::{boxed::Box, sync::Arc};
|
||||||
use cobs::encode;
|
use cobs::encode;
|
||||||
|
|
||||||
use super::TcpTmtcInCobsServer;
|
use super::TcpTmtcInCobsServer;
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
|
||||||
struct SyncTcCacher {
|
|
||||||
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
|
||||||
}
|
|
||||||
impl ReceivesTcCore for SyncTcCacher {
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
|
||||||
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
|
||||||
tc_queue.push_back(tc_raw.to_vec());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
|
||||||
struct SyncTmSource {
|
|
||||||
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncTmSource {
|
|
||||||
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
|
|
||||||
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
|
|
||||||
tm_queue.push_back(tm.to_vec());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TmPacketSourceCore for SyncTmSource {
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
|
||||||
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
|
|
||||||
if !tm_queue.is_empty() {
|
|
||||||
let next_vec = tm_queue.front().unwrap();
|
|
||||||
if buffer.len() < next_vec.len() {
|
|
||||||
panic!(
|
|
||||||
"provided buffer too small, must be at least {} bytes",
|
|
||||||
next_vec.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let next_vec = tm_queue.pop_front().unwrap();
|
|
||||||
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
|
||||||
return Ok(next_vec.len());
|
|
||||||
}
|
|
||||||
Ok(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||||
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
||||||
}
|
}
|
@ -12,7 +12,10 @@ use crate::tmtc::{ReceivesTc, TmPacketSource};
|
|||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
// Re-export the TMTC in COBS server.
|
// Re-export the TMTC in COBS server.
|
||||||
pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
|
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
|
||||||
|
pub use crate::hal::std::tcp_spacepackets_server::{
|
||||||
|
SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
|
||||||
|
};
|
||||||
|
|
||||||
/// Configuration struct for the generic TCP TMTC server
|
/// Configuration struct for the generic TCP TMTC server
|
||||||
///
|
///
|
||||||
@ -90,7 +93,7 @@ pub trait TcpTcParser<TmError, TcError> {
|
|||||||
fn handle_tc_parsing(
|
fn handle_tc_parsing(
|
||||||
&mut self,
|
&mut self,
|
||||||
tc_buffer: &mut [u8],
|
tc_buffer: &mut [u8],
|
||||||
tc_receiver: &mut dyn ReceivesTc<Error = TcError>,
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut ConnectionResult,
|
||||||
current_write_idx: usize,
|
current_write_idx: usize,
|
||||||
next_write_idx: &mut usize,
|
next_write_idx: &mut usize,
|
||||||
@ -105,7 +108,7 @@ pub trait TcpTmSender<TmError, TcError> {
|
|||||||
fn handle_tm_sending(
|
fn handle_tm_sending(
|
||||||
&mut self,
|
&mut self,
|
||||||
tm_buffer: &mut [u8],
|
tm_buffer: &mut [u8],
|
||||||
tm_source: &mut dyn TmPacketSource<Error = TmError>,
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut ConnectionResult,
|
||||||
stream: &mut TcpStream,
|
stream: &mut TcpStream,
|
||||||
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
|
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
|
||||||
@ -317,3 +320,59 @@ impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
|
|||||||
self.listener.local_addr()
|
self.listener.local_addr()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) mod tests {
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
|
||||||
|
|
||||||
|
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct SyncTcCacher {
|
||||||
|
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
|
}
|
||||||
|
impl ReceivesTcCore for SyncTcCacher {
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||||
|
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
||||||
|
tc_queue.push_back(tc_raw.to_vec());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct SyncTmSource {
|
||||||
|
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SyncTmSource {
|
||||||
|
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
|
||||||
|
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
|
||||||
|
tm_queue.push_back(tm.to_vec());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TmPacketSourceCore for SyncTmSource {
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||||
|
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
|
||||||
|
if !tm_queue.is_empty() {
|
||||||
|
let next_vec = tm_queue.front().unwrap();
|
||||||
|
if buffer.len() < next_vec.len() {
|
||||||
|
panic!(
|
||||||
|
"provided buffer too small, must be at least {} bytes",
|
||||||
|
next_vec.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let next_vec = tm_queue.pop_front().unwrap();
|
||||||
|
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
||||||
|
return Ok(next_vec.len());
|
||||||
|
}
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1 +1,363 @@
|
|||||||
|
use delegate::delegate;
|
||||||
|
use std::{
|
||||||
|
io::Write,
|
||||||
|
net::{SocketAddr, TcpListener, TcpStream},
|
||||||
|
};
|
||||||
|
|
||||||
|
use alloc::boxed::Box;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets},
|
||||||
|
tmtc::{ReceivesTc, TmPacketSource},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::tcp_server::{
|
||||||
|
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
|
||||||
|
pub struct SpacepacketsTcParser {
|
||||||
|
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SpacepacketsTcParser {
|
||||||
|
pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
|
||||||
|
Self { packet_id_lookup }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for SpacepacketsTcParser {
|
||||||
|
fn handle_tc_parsing(
|
||||||
|
&mut self,
|
||||||
|
tc_buffer: &mut [u8],
|
||||||
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||||
|
conn_result: &mut ConnectionResult,
|
||||||
|
current_write_idx: usize,
|
||||||
|
next_write_idx: &mut usize,
|
||||||
|
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||||
|
// Reader vec full, need to parse for packets.
|
||||||
|
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
|
||||||
|
&mut tc_buffer[..current_write_idx],
|
||||||
|
self.packet_id_lookup.as_ref(),
|
||||||
|
tc_receiver.upcast_mut(),
|
||||||
|
next_write_idx,
|
||||||
|
)
|
||||||
|
.map_err(|e| TcpTmtcError::TcError(e))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer].
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct SpacepacketsTmSender {}
|
||||||
|
|
||||||
|
impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
||||||
|
fn handle_tm_sending(
|
||||||
|
&mut self,
|
||||||
|
tm_buffer: &mut [u8],
|
||||||
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||||
|
conn_result: &mut ConnectionResult,
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
||||||
|
let mut tm_was_sent = false;
|
||||||
|
loop {
|
||||||
|
// Write TM until TM source is exhausted. For now, there is no limit for the amount
|
||||||
|
// of TM written this way.
|
||||||
|
let read_tm_len = tm_source
|
||||||
|
.retrieve_packet(tm_buffer)
|
||||||
|
.map_err(|e| TcpTmtcError::TmError(e))?;
|
||||||
|
|
||||||
|
if read_tm_len == 0 {
|
||||||
|
return Ok(tm_was_sent);
|
||||||
|
}
|
||||||
|
tm_was_sent = true;
|
||||||
|
conn_result.num_sent_tms += 1;
|
||||||
|
|
||||||
|
stream.write_all(&tm_buffer[..read_tm_len])?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TCP TMTC server implementation for exchange of tightly stuffed
|
||||||
|
/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf).
|
||||||
|
///
|
||||||
|
/// This serves only works if
|
||||||
|
/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only
|
||||||
|
/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter
|
||||||
|
/// and start marker when parsing for packets. The user specifies a set of expected
|
||||||
|
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
|
||||||
|
///
|
||||||
|
/// ## Example
|
||||||
|
///
|
||||||
|
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
||||||
|
/// also serves as the example application for this module.
|
||||||
|
pub struct TcpSpacepacketsServer<TmError, TcError: 'static> {
|
||||||
|
generic_server:
|
||||||
|
TcpTmtcGenericServer<TmError, TcError, SpacepacketsTmSender, SpacepacketsTcParser>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError> {
|
||||||
|
/// Create a new TCP TMTC server which exchanges CCSDS space packets.
|
||||||
|
///
|
||||||
|
/// ## Parameter
|
||||||
|
///
|
||||||
|
/// * `cfg` - Configuration of the server.
|
||||||
|
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
|
||||||
|
/// then sent back to the client.
|
||||||
|
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
|
||||||
|
/// forwarded to this TC receiver.
|
||||||
|
/// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet
|
||||||
|
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
|
||||||
|
pub fn new(
|
||||||
|
cfg: ServerConfig,
|
||||||
|
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||||
|
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||||
|
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
|
||||||
|
) -> Result<Self, TcpTmtcError<TmError, TcError>> {
|
||||||
|
Ok(Self {
|
||||||
|
generic_server: TcpTmtcGenericServer::new(
|
||||||
|
cfg,
|
||||||
|
SpacepacketsTcParser::new(packet_id_lookup),
|
||||||
|
SpacepacketsTmSender::default(),
|
||||||
|
tm_source,
|
||||||
|
tc_receiver,
|
||||||
|
)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
delegate! {
|
||||||
|
to self.generic_server {
|
||||||
|
pub fn listener(&mut self) -> &mut TcpListener;
|
||||||
|
|
||||||
|
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
||||||
|
/// useful if using the port number 0 for OS auto-assignment.
|
||||||
|
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
|
||||||
|
|
||||||
|
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
||||||
|
pub fn handle_next_connection(
|
||||||
|
&mut self,
|
||||||
|
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use core::{
|
||||||
|
sync::atomic::{AtomicBool, Ordering},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
#[allow(unused_imports)]
|
||||||
|
use std::println;
|
||||||
|
use std::{
|
||||||
|
io::{Read, Write},
|
||||||
|
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
|
use alloc::{boxed::Box, sync::Arc};
|
||||||
|
use hashbrown::HashSet;
|
||||||
|
use spacepackets::{
|
||||||
|
ecss::{tc::PusTcCreator, SerializablePusPacket},
|
||||||
|
PacketId, SpHeader,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::hal::std::tcp_server::{
|
||||||
|
tests::{SyncTcCacher, SyncTmSource},
|
||||||
|
ServerConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::TcpSpacepacketsServer;
|
||||||
|
|
||||||
|
const TEST_APID_0: u16 = 0x02;
|
||||||
|
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
|
||||||
|
const TEST_APID_1: u16 = 0x10;
|
||||||
|
const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1);
|
||||||
|
|
||||||
|
fn generic_tmtc_server(
|
||||||
|
addr: &SocketAddr,
|
||||||
|
tc_receiver: SyncTcCacher,
|
||||||
|
tm_source: SyncTmSource,
|
||||||
|
packet_id_lookup: HashSet<PacketId>,
|
||||||
|
) -> TcpSpacepacketsServer<(), ()> {
|
||||||
|
TcpSpacepacketsServer::new(
|
||||||
|
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||||
|
Box::new(tm_source),
|
||||||
|
Box::new(tc_receiver),
|
||||||
|
Box::new(packet_id_lookup),
|
||||||
|
)
|
||||||
|
.expect("TCP server generation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_basic_tc_only() {
|
||||||
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
|
let tc_receiver = SyncTcCacher::default();
|
||||||
|
let tm_source = SyncTmSource::default();
|
||||||
|
let mut packet_id_lookup = HashSet::new();
|
||||||
|
packet_id_lookup.insert(TEST_PACKET_ID_0);
|
||||||
|
let mut tcp_server = generic_tmtc_server(
|
||||||
|
&auto_port_addr,
|
||||||
|
tc_receiver.clone(),
|
||||||
|
tm_source,
|
||||||
|
packet_id_lookup,
|
||||||
|
);
|
||||||
|
let dest_addr = tcp_server
|
||||||
|
.local_addr()
|
||||||
|
.expect("retrieving dest addr failed");
|
||||||
|
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||||
|
let set_if_done = conn_handled.clone();
|
||||||
|
// Call the connection handler in separate thread, does block.
|
||||||
|
thread::spawn(move || {
|
||||||
|
let result = tcp_server.handle_next_connection();
|
||||||
|
if result.is_err() {
|
||||||
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
|
}
|
||||||
|
let conn_result = result.unwrap();
|
||||||
|
assert_eq!(conn_result.num_received_tcs, 1);
|
||||||
|
assert_eq!(conn_result.num_sent_tms, 0);
|
||||||
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||||
|
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||||
|
let mut buffer: [u8; 32] = [0; 32];
|
||||||
|
let packet_len_ping = ping_tc
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
|
stream
|
||||||
|
.write_all(&buffer[..packet_len_ping])
|
||||||
|
.expect("writing to TCP server failed");
|
||||||
|
drop(stream);
|
||||||
|
|
||||||
|
// A certain amount of time is allowed for the transaction to complete.
|
||||||
|
for _ in 0..3 {
|
||||||
|
if !conn_handled.load(Ordering::Relaxed) {
|
||||||
|
thread::sleep(Duration::from_millis(5));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !conn_handled.load(Ordering::Relaxed) {
|
||||||
|
panic!("connection was not handled properly");
|
||||||
|
}
|
||||||
|
// Check that TC has arrived.
|
||||||
|
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||||
|
assert_eq!(tc_queue.len(), 1);
|
||||||
|
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multi_tc_multi_tm() {
|
||||||
|
let mut buffer: [u8; 32] = [0; 32];
|
||||||
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
|
let tc_receiver = SyncTcCacher::default();
|
||||||
|
let mut tm_source = SyncTmSource::default();
|
||||||
|
|
||||||
|
// Add telemetry
|
||||||
|
let mut total_tm_len = 0;
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||||
|
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
|
||||||
|
let tm_packet_len = verif_tm
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
total_tm_len += tm_packet_len;
|
||||||
|
let tm_0 = buffer[..tm_packet_len].to_vec();
|
||||||
|
tm_source.add_tm(&tm_0);
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
|
||||||
|
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true);
|
||||||
|
let tm_packet_len = verif_tm
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
total_tm_len += tm_packet_len;
|
||||||
|
let tm_1 = buffer[..tm_packet_len].to_vec();
|
||||||
|
tm_source.add_tm(&tm_1);
|
||||||
|
|
||||||
|
// Set up server
|
||||||
|
let mut packet_id_lookup = HashSet::new();
|
||||||
|
packet_id_lookup.insert(TEST_PACKET_ID_0);
|
||||||
|
packet_id_lookup.insert(TEST_PACKET_ID_1);
|
||||||
|
let mut tcp_server = generic_tmtc_server(
|
||||||
|
&auto_port_addr,
|
||||||
|
tc_receiver.clone(),
|
||||||
|
tm_source,
|
||||||
|
packet_id_lookup,
|
||||||
|
);
|
||||||
|
let dest_addr = tcp_server
|
||||||
|
.local_addr()
|
||||||
|
.expect("retrieving dest addr failed");
|
||||||
|
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||||
|
let set_if_done = conn_handled.clone();
|
||||||
|
|
||||||
|
// Call the connection handler in separate thread, does block.
|
||||||
|
thread::spawn(move || {
|
||||||
|
let result = tcp_server.handle_next_connection();
|
||||||
|
if result.is_err() {
|
||||||
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
|
}
|
||||||
|
let conn_result = result.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
conn_result.num_received_tcs, 2,
|
||||||
|
"wrong number of received TCs"
|
||||||
|
);
|
||||||
|
assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs");
|
||||||
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
|
stream
|
||||||
|
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||||
|
.expect("setting reas timeout failed");
|
||||||
|
|
||||||
|
// Send telecommands
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||||
|
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||||
|
let packet_len = ping_tc
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
let tc_0 = buffer[..packet_len].to_vec();
|
||||||
|
stream
|
||||||
|
.write_all(&tc_0)
|
||||||
|
.expect("writing to TCP server failed");
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
|
||||||
|
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
|
||||||
|
let packet_len = action_tc
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
let tc_1 = buffer[..packet_len].to_vec();
|
||||||
|
stream
|
||||||
|
.write_all(&tc_1)
|
||||||
|
.expect("writing to TCP server failed");
|
||||||
|
|
||||||
|
// Done with writing.
|
||||||
|
stream
|
||||||
|
.shutdown(std::net::Shutdown::Write)
|
||||||
|
.expect("shutting down write failed");
|
||||||
|
let mut read_buf: [u8; 32] = [0; 32];
|
||||||
|
let mut current_idx = 0;
|
||||||
|
let mut read_len_total = 0;
|
||||||
|
// Timeout ensures this does not block forever.
|
||||||
|
while read_len_total < total_tm_len {
|
||||||
|
let read_len = stream
|
||||||
|
.read(&mut read_buf[current_idx..])
|
||||||
|
.expect("read failed");
|
||||||
|
current_idx += read_len;
|
||||||
|
read_len_total += read_len;
|
||||||
|
}
|
||||||
|
drop(stream);
|
||||||
|
assert_eq!(read_buf[..tm_0.len()], tm_0);
|
||||||
|
assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1);
|
||||||
|
|
||||||
|
// A certain amount of time is allowed for the transaction to complete.
|
||||||
|
for _ in 0..3 {
|
||||||
|
if !conn_handled.load(Ordering::Relaxed) {
|
||||||
|
thread::sleep(Duration::from_millis(5));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !conn_handled.load(Ordering::Relaxed) {
|
||||||
|
panic!("connection was not handled properly");
|
||||||
|
}
|
||||||
|
// Check that TC has arrived.
|
||||||
|
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||||
|
assert_eq!(tc_queue.len(), 2);
|
||||||
|
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
|
||||||
|
assert_eq!(tc_queue.pop_front().unwrap(), tc_1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -21,11 +21,16 @@ use std::{
|
|||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use hashbrown::HashSet;
|
||||||
use satrs_core::{
|
use satrs_core::{
|
||||||
encoding::cobs::encode_packet_with_cobs,
|
encoding::cobs::encode_packet_with_cobs,
|
||||||
hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer},
|
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer},
|
||||||
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
||||||
};
|
};
|
||||||
|
use spacepackets::{
|
||||||
|
ecss::{tc::PusTcCreator, SerializablePusPacket},
|
||||||
|
PacketId, SpHeader,
|
||||||
|
};
|
||||||
use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
|
use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
@ -79,15 +84,16 @@ impl TmPacketSourceCore for SyncTmSource {
|
|||||||
|
|
||||||
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
|
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
|
||||||
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1];
|
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1];
|
||||||
|
const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
|
|
||||||
fn main() {
|
#[test]
|
||||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
fn test_cobs_server() {
|
||||||
let tc_receiver = SyncTcCacher::default();
|
let tc_receiver = SyncTcCacher::default();
|
||||||
let mut tm_source = SyncTmSource::default();
|
let mut tm_source = SyncTmSource::default();
|
||||||
// Insert a telemetry packet which will be read back by the client at a later stage.
|
// Insert a telemetry packet which will be read back by the client at a later stage.
|
||||||
tm_source.add_tm(&INVERTED_PACKET);
|
tm_source.add_tm(&INVERTED_PACKET);
|
||||||
let mut tcp_server = TcpTmtcInCobsServer::new(
|
let mut tcp_server = TcpTmtcInCobsServer::new(
|
||||||
ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024),
|
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
|
||||||
Box::new(tm_source),
|
Box::new(tm_source),
|
||||||
Box::new(tc_receiver.clone()),
|
Box::new(tc_receiver.clone()),
|
||||||
)
|
)
|
||||||
@ -154,3 +160,85 @@ fn main() {
|
|||||||
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
||||||
drop(tc_queue);
|
drop(tc_queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const TEST_APID_0: u16 = 0x02;
|
||||||
|
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_ccsds_server() {
|
||||||
|
let mut buffer: [u8; 32] = [0; 32];
|
||||||
|
let tc_receiver = SyncTcCacher::default();
|
||||||
|
let mut tm_source = SyncTmSource::default();
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||||
|
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
|
||||||
|
let tm_packet_len = verif_tm
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
tm_source.add_tm(&buffer[..tm_packet_len]);
|
||||||
|
let tm_vec = buffer[..tm_packet_len].to_vec();
|
||||||
|
let mut packet_id_lookup = HashSet::new();
|
||||||
|
packet_id_lookup.insert(TEST_PACKET_ID_0);
|
||||||
|
let mut tcp_server = TcpSpacepacketsServer::new(
|
||||||
|
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
|
||||||
|
Box::new(tm_source),
|
||||||
|
Box::new(tc_receiver.clone()),
|
||||||
|
Box::new(packet_id_lookup),
|
||||||
|
)
|
||||||
|
.expect("TCP server generation failed");
|
||||||
|
let dest_addr = tcp_server
|
||||||
|
.local_addr()
|
||||||
|
.expect("retrieving dest addr failed");
|
||||||
|
let conn_handled: Arc<AtomicBool> = Default::default();
|
||||||
|
let set_if_done = conn_handled.clone();
|
||||||
|
// Call the connection handler in separate thread, does block.
|
||||||
|
thread::spawn(move || {
|
||||||
|
let result = tcp_server.handle_next_connection();
|
||||||
|
if result.is_err() {
|
||||||
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
|
}
|
||||||
|
let conn_result = result.unwrap();
|
||||||
|
assert_eq!(conn_result.num_received_tcs, 1);
|
||||||
|
assert_eq!(conn_result.num_sent_tms, 1);
|
||||||
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
|
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||||
|
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||||
|
stream
|
||||||
|
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||||
|
.expect("setting reas timeout failed");
|
||||||
|
let packet_len = ping_tc
|
||||||
|
.write_to_bytes(&mut buffer)
|
||||||
|
.expect("writing packet failed");
|
||||||
|
stream
|
||||||
|
.write_all(&buffer[..packet_len])
|
||||||
|
.expect("writing to TCP server failed");
|
||||||
|
|
||||||
|
// Done with writing.
|
||||||
|
stream
|
||||||
|
.shutdown(std::net::Shutdown::Write)
|
||||||
|
.expect("shutting down write failed");
|
||||||
|
let mut read_buf: [u8; 16] = [0; 16];
|
||||||
|
let mut read_len_total = 0;
|
||||||
|
// Timeout ensures this does not block forever.
|
||||||
|
while read_len_total < tm_packet_len {
|
||||||
|
let read_len = stream.read(&mut read_buf).expect("read failed");
|
||||||
|
read_len_total += read_len;
|
||||||
|
assert_eq!(read_buf[..read_len], tm_vec);
|
||||||
|
}
|
||||||
|
drop(stream);
|
||||||
|
|
||||||
|
// A certain amount of time is allowed for the transaction to complete.
|
||||||
|
for _ in 0..3 {
|
||||||
|
if !conn_handled.load(Ordering::Relaxed) {
|
||||||
|
thread::sleep(Duration::from_millis(5));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !conn_handled.load(Ordering::Relaxed) {
|
||||||
|
panic!("connection was not handled properly");
|
||||||
|
}
|
||||||
|
// Check that TC has arrived.
|
||||||
|
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||||
|
assert_eq!(tc_queue.len(), 1);
|
||||||
|
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]);
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user