link corrections
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2023-09-26 15:57:51 +02:00
parent 0117482da1
commit 35cef32ebf
Signed by: muellerr
GPG Key ID: A649FB78196E3849
5 changed files with 118 additions and 105 deletions

View File

@ -2,5 +2,5 @@
pub mod tcp_server;
pub mod udp_server;
mod tcp_cobs_server;
mod tcp_spacepackets_server;
mod tcp_with_cobs_server;

View File

@ -109,7 +109,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
///
/// ## Example
///
/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs)
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
/// test also serves as the example application for this module.
pub struct TcpTmtcInCobsServer<TmError, TcError: 'static> {
generic_server: TcpTmtcGenericServer<TmError, TcError, CobsTmSender, CobsTcParser>,

View File

@ -12,10 +12,10 @@ use crate::tmtc::{ReceivesTc, TmPacketSource};
use thiserror::Error;
// Re-export the TMTC in COBS server.
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
pub use crate::hal::std::tcp_spacepackets_server::{
CcsdsTcParser, CcsdsTmSender, TcpSpacepacketsServer,
SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
};
pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
/// Configuration struct for the generic TCP TMTC server
///

View File

@ -15,18 +15,18 @@ use super::tcp_server::{
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
};
/// Concrete [TcpTcParser] implementation for the [].
pub struct CcsdsTcParser {
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
pub struct SpacepacketsTcParser {
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
}
impl CcsdsTcParser {
impl SpacepacketsTcParser {
pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
Self { packet_id_lookup }
}
}
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CcsdsTcParser {
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for SpacepacketsTcParser {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
@ -47,11 +47,11 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CcsdsTcParser
}
}
/// Concrete [TcpTmSender] implementation for the [].
/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer].
#[derive(Default)]
pub struct CcsdsTmSender {}
pub struct SpacepacketsTmSender {}
impl<TmError, TcError> TcpTmSender<TmError, TcError> for CcsdsTmSender {
impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
@ -78,24 +78,26 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CcsdsTmSender {
}
}
/// TCP TMTC server implementation for exchange of tightly stuffed CCSDS space packets.
/// TCP TMTC server implementation for exchange of tightly stuffed
/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf).
///
/// This serves only works if CCSDS space packets are the only packet type being exchanged.
/// It uses the CCSDS [spacepackets::PacketId] as the packet delimiter and start marker when
/// parsing for packets. The user specifies a set of expected [spacepackets::PacketId]s as part
/// of the server configuration for that purpose.
/// This serves only works if
/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only
/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter
/// and start marker when parsing for packets. The user specifies a set of expected
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
///
/// ## Example
///
/// The [TCP integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs)
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
/// also serves as the example application for this module.
pub struct TcpSpacepacketsServer<TmError, TcError: 'static> {
generic_server: TcpTmtcGenericServer<TmError, TcError, CcsdsTmSender, CcsdsTcParser>,
generic_server:
TcpTmtcGenericServer<TmError, TcError, SpacepacketsTmSender, SpacepacketsTcParser>,
}
impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError> {
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
/// Create a new TCP TMTC server which exchanges CCSDS space packets.
///
/// ## Parameter
///
@ -104,6 +106,8 @@ impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError>
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
/// forwarded to this TC receiver.
/// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
pub fn new(
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
@ -113,8 +117,8 @@ impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError>
Ok(Self {
generic_server: TcpTmtcGenericServer::new(
cfg,
CcsdsTcParser::new(packet_id_lookup),
CcsdsTmSender::default(),
SpacepacketsTcParser::new(packet_id_lookup),
SpacepacketsTmSender::default(),
tm_source,
tc_receiver,
)?,
@ -241,85 +245,6 @@ mod tests {
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]);
}
#[test]
fn test_basic_tc_and_tm() {
let mut buffer: [u8; 32] = [0; 32];
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
let tm_packet_len = verif_tm
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
tm_source.add_tm(&buffer[..tm_packet_len]);
let tm_vec = buffer[..tm_packet_len].to_vec();
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source,
packet_id_lookup,
);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 1);
set_if_done.store(true, Ordering::Relaxed);
});
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting reas timeout failed");
let packet_len = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
stream
.write_all(&buffer[..packet_len])
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 16] = [0; 16];
let mut read_len_total = 0;
// Timeout ensures this does not block forever.
while read_len_total < tm_packet_len {
let read_len = stream.read(&mut read_buf).expect("read failed");
read_len_total += read_len;
assert_eq!(read_buf[..read_len], tm_vec);
}
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]);
}
#[test]
fn test_multi_tc_multi_tm() {
let mut buffer: [u8; 32] = [0; 32];

View File

@ -21,11 +21,16 @@ use std::{
thread,
};
use hashbrown::HashSet;
use satrs_core::{
encoding::cobs::encode_packet_with_cobs,
hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer},
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer},
tmtc::{ReceivesTcCore, TmPacketSourceCore},
};
use spacepackets::{
ecss::{tc::PusTcCreator, SerializablePusPacket},
PacketId, SpHeader,
};
use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
#[derive(Default, Clone)]
@ -79,15 +84,16 @@ impl TmPacketSourceCore for SyncTmSource {
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1];
const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
fn main() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
#[test]
fn test_cobs_server() {
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
// Insert a telemetry packet which will be read back by the client at a later stage.
tm_source.add_tm(&INVERTED_PACKET);
let mut tcp_server = TcpTmtcInCobsServer::new(
ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024),
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
Box::new(tc_receiver.clone()),
)
@ -154,3 +160,85 @@ fn main() {
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
}
const TEST_APID_0: u16 = 0x02;
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
#[test]
fn test_ccsds_server() {
let mut buffer: [u8; 32] = [0; 32];
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
let tm_packet_len = verif_tm
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
tm_source.add_tm(&buffer[..tm_packet_len]);
let tm_vec = buffer[..tm_packet_len].to_vec();
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = TcpSpacepacketsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
Box::new(tc_receiver.clone()),
Box::new(packet_id_lookup),
)
.expect("TCP server generation failed");
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 1);
set_if_done.store(true, Ordering::Relaxed);
});
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting reas timeout failed");
let packet_len = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
stream
.write_all(&buffer[..packet_len])
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 16] = [0; 16];
let mut read_len_total = 0;
// Timeout ensures this does not block forever.
while read_len_total < tm_packet_len {
let read_len = stream.read(&mut read_buf).expect("read failed");
read_len_total += read_len;
assert_eq!(read_buf[..read_len], tm_vec);
}
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]);
}