Merge remote-tracking branch 'origin/main' into TargetIdWithApid
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
This commit is contained in:
@ -30,6 +30,12 @@ impl PacketIdLookup for [u16] {
|
||||
}
|
||||
}
|
||||
|
||||
impl PacketIdLookup for &[u16] {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
self.binary_search(&packet_id).is_ok()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
impl PacketIdLookup for Vec<PacketId> {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
@ -49,6 +55,12 @@ impl PacketIdLookup for [PacketId] {
|
||||
}
|
||||
}
|
||||
|
||||
impl PacketIdLookup for &[PacketId] {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
self.binary_search(&PacketId::from(packet_id)).is_ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
|
||||
/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then
|
||||
/// uses the length field of the packet to extract CCSDS packets.
|
||||
|
@ -1,4 +1,3 @@
|
||||
use alloc::boxed::Box;
|
||||
use alloc::vec;
|
||||
use cobs::encode;
|
||||
use delegate::delegate;
|
||||
@ -29,7 +28,6 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
|
||||
current_write_idx: usize,
|
||||
next_write_idx: &mut usize,
|
||||
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||
// Reader vec full, need to parse for packets.
|
||||
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
|
||||
&mut tc_buffer[..current_write_idx],
|
||||
tc_receiver.upcast_mut(),
|
||||
@ -111,11 +109,23 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
||||
///
|
||||
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
||||
/// test also serves as the example application for this module.
|
||||
pub struct TcpTmtcInCobsServer<TmError, TcError: 'static> {
|
||||
generic_server: TcpTmtcGenericServer<TmError, TcError, CobsTmSender, CobsTcParser>,
|
||||
pub struct TcpTmtcInCobsServer<
|
||||
TmError,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> {
|
||||
generic_server:
|
||||
TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, CobsTmSender, CobsTcParser>,
|
||||
}
|
||||
|
||||
impl<TmError: 'static, TcError: 'static> TcpTmtcInCobsServer<TmError, TcError> {
|
||||
impl<
|
||||
TmError: 'static,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> TcpTmtcInCobsServer<TmError, TcError, TmSource, TcReceiver>
|
||||
{
|
||||
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
|
||||
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
||||
///
|
||||
@ -128,9 +138,9 @@ impl<TmError: 'static, TcError: 'static> TcpTmtcInCobsServer<TmError, TcError> {
|
||||
/// forwarded to this TC receiver.
|
||||
pub fn new(
|
||||
cfg: ServerConfig,
|
||||
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||
) -> Result<Self, TcpTmtcError<TmError, TcError>> {
|
||||
tm_source: TmSource,
|
||||
tc_receiver: TcReceiver,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
generic_server: TcpTmtcGenericServer::new(
|
||||
cfg,
|
||||
@ -177,7 +187,7 @@ mod tests {
|
||||
ServerConfig,
|
||||
},
|
||||
};
|
||||
use alloc::{boxed::Box, sync::Arc};
|
||||
use alloc::sync::Arc;
|
||||
use cobs::encode;
|
||||
|
||||
use super::TcpTmtcInCobsServer;
|
||||
@ -202,11 +212,11 @@ mod tests {
|
||||
addr: &SocketAddr,
|
||||
tc_receiver: SyncTcCacher,
|
||||
tm_source: SyncTmSource,
|
||||
) -> TcpTmtcInCobsServer<(), ()> {
|
||||
) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
|
||||
TcpTmtcInCobsServer::new(
|
||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||
Box::new(tm_source),
|
||||
Box::new(tc_receiver),
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
)
|
||||
.expect("TCP server generation failed")
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! Generic TCP TMTC servers with different TMTC format flavours.
|
||||
use alloc::vec;
|
||||
use alloc::{boxed::Box, vec::Vec};
|
||||
use alloc::vec::Vec;
|
||||
use core::time::Duration;
|
||||
use socket2::{Domain, Socket, Type};
|
||||
use std::io::Read;
|
||||
@ -134,20 +134,29 @@ pub trait TcpTmSender<TmError, TcError> {
|
||||
pub struct TcpTmtcGenericServer<
|
||||
TmError,
|
||||
TcError,
|
||||
TmHandler: TcpTmSender<TmError, TcError>,
|
||||
TcHandler: TcpTcParser<TmError, TcError>,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
TmSender: TcpTmSender<TmError, TcError>,
|
||||
TcParser: TcpTcParser<TmError, TcError>,
|
||||
> {
|
||||
base: TcpTmtcServerBase<TmError, TcError>,
|
||||
tc_handler: TcHandler,
|
||||
tm_handler: TmHandler,
|
||||
pub(crate) listener: TcpListener,
|
||||
pub(crate) inner_loop_delay: Duration,
|
||||
pub(crate) tm_source: TmSource,
|
||||
pub(crate) tm_buffer: Vec<u8>,
|
||||
pub(crate) tc_receiver: TcReceiver,
|
||||
pub(crate) tc_buffer: Vec<u8>,
|
||||
tc_handler: TcParser,
|
||||
tm_handler: TmSender,
|
||||
}
|
||||
|
||||
impl<
|
||||
TmError: 'static,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
TmSender: TcpTmSender<TmError, TcError>,
|
||||
TcParser: TcpTcParser<TmError, TcError>,
|
||||
> TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>
|
||||
> TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, TmSender, TcParser>
|
||||
{
|
||||
/// Create a new generic TMTC server instance.
|
||||
///
|
||||
@ -165,25 +174,38 @@ impl<
|
||||
cfg: ServerConfig,
|
||||
tc_parser: TcParser,
|
||||
tm_sender: TmSender,
|
||||
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>, std::io::Error> {
|
||||
tm_source: TmSource,
|
||||
tc_receiver: TcReceiver,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
// Create a TCP listener bound to two addresses.
|
||||
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
|
||||
socket.set_reuse_address(cfg.reuse_addr)?;
|
||||
#[cfg(unix)]
|
||||
socket.set_reuse_port(cfg.reuse_port)?;
|
||||
let addr = (cfg.addr).into();
|
||||
socket.bind(&addr)?;
|
||||
socket.listen(128)?;
|
||||
Ok(Self {
|
||||
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
|
||||
tc_handler: tc_parser,
|
||||
tm_handler: tm_sender,
|
||||
listener: socket.into(),
|
||||
inner_loop_delay: cfg.inner_loop_delay,
|
||||
tm_source,
|
||||
tm_buffer: vec![0; cfg.tm_buffer_size],
|
||||
tc_receiver,
|
||||
tc_buffer: vec![0; cfg.tc_buffer_size],
|
||||
})
|
||||
}
|
||||
|
||||
/// Retrieve the internal [TcpListener] class.
|
||||
pub fn listener(&mut self) -> &mut TcpListener {
|
||||
self.base.listener()
|
||||
&mut self.listener
|
||||
}
|
||||
|
||||
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
||||
/// useful if using the port number 0 for OS auto-assignment.
|
||||
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
|
||||
self.base.local_addr()
|
||||
self.listener.local_addr()
|
||||
}
|
||||
|
||||
/// This call is used to handle the next connection to a client. Right now, it performs
|
||||
@ -205,20 +227,20 @@ impl<
|
||||
let mut connection_result = ConnectionResult::default();
|
||||
let mut current_write_idx;
|
||||
let mut next_write_idx = 0;
|
||||
let (mut stream, addr) = self.base.listener.accept()?;
|
||||
let (mut stream, addr) = self.listener.accept()?;
|
||||
stream.set_nonblocking(true)?;
|
||||
connection_result.addr = Some(addr);
|
||||
current_write_idx = next_write_idx;
|
||||
loop {
|
||||
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]);
|
||||
let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
|
||||
match read_result {
|
||||
Ok(0) => {
|
||||
// Connection closed by client. If any TC was read, parse for complete packets.
|
||||
// After that, break the outer loop.
|
||||
if current_write_idx > 0 {
|
||||
self.tc_handler.handle_tc_parsing(
|
||||
&mut self.base.tc_buffer,
|
||||
self.base.tc_receiver.as_mut(),
|
||||
&mut self.tc_buffer,
|
||||
&mut self.tc_receiver,
|
||||
&mut connection_result,
|
||||
current_write_idx,
|
||||
&mut next_write_idx,
|
||||
@ -229,10 +251,10 @@ impl<
|
||||
Ok(read_len) => {
|
||||
current_write_idx += read_len;
|
||||
// TC buffer is full, we must parse for complete packets now.
|
||||
if current_write_idx == self.base.tc_buffer.capacity() {
|
||||
if current_write_idx == self.tc_buffer.capacity() {
|
||||
self.tc_handler.handle_tc_parsing(
|
||||
&mut self.base.tc_buffer,
|
||||
self.base.tc_receiver.as_mut(),
|
||||
&mut self.tc_buffer,
|
||||
&mut self.tc_receiver,
|
||||
&mut connection_result,
|
||||
current_write_idx,
|
||||
&mut next_write_idx,
|
||||
@ -245,8 +267,8 @@ impl<
|
||||
// both UNIX and Windows.
|
||||
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
|
||||
self.tc_handler.handle_tc_parsing(
|
||||
&mut self.base.tc_buffer,
|
||||
self.base.tc_receiver.as_mut(),
|
||||
&mut self.tc_buffer,
|
||||
&mut self.tc_receiver,
|
||||
&mut connection_result,
|
||||
current_write_idx,
|
||||
&mut next_write_idx,
|
||||
@ -254,14 +276,14 @@ impl<
|
||||
current_write_idx = next_write_idx;
|
||||
|
||||
if !self.tm_handler.handle_tm_sending(
|
||||
&mut self.base.tm_buffer,
|
||||
self.base.tm_source.as_mut(),
|
||||
&mut self.tm_buffer,
|
||||
&mut self.tm_source,
|
||||
&mut connection_result,
|
||||
&mut stream,
|
||||
)? {
|
||||
// No TC read, no TM was sent, but the client has not disconnected.
|
||||
// Perform an inner delay to avoid burning CPU time.
|
||||
thread::sleep(self.base.inner_loop_delay);
|
||||
thread::sleep(self.inner_loop_delay);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
@ -271,8 +293,8 @@ impl<
|
||||
}
|
||||
}
|
||||
self.tm_handler.handle_tm_sending(
|
||||
&mut self.base.tm_buffer,
|
||||
self.base.tm_source.as_mut(),
|
||||
&mut self.tm_buffer,
|
||||
&mut self.tm_source,
|
||||
&mut connection_result,
|
||||
&mut stream,
|
||||
)?;
|
||||
@ -280,47 +302,6 @@ impl<
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TcpTmtcServerBase<TmError, TcError> {
|
||||
pub(crate) listener: TcpListener,
|
||||
pub(crate) inner_loop_delay: Duration,
|
||||
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||
pub(crate) tm_buffer: Vec<u8>,
|
||||
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||
pub(crate) tc_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
|
||||
pub(crate) fn new(
|
||||
cfg: ServerConfig,
|
||||
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
// Create a TCP listener bound to two addresses.
|
||||
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
|
||||
socket.set_reuse_address(cfg.reuse_addr)?;
|
||||
socket.set_reuse_port(cfg.reuse_port)?;
|
||||
let addr = (cfg.addr).into();
|
||||
socket.bind(&addr)?;
|
||||
socket.listen(128)?;
|
||||
Ok(Self {
|
||||
listener: socket.into(),
|
||||
inner_loop_delay: cfg.inner_loop_delay,
|
||||
tm_source,
|
||||
tm_buffer: vec![0; cfg.tm_buffer_size],
|
||||
tc_receiver,
|
||||
tc_buffer: vec![0; cfg.tc_buffer_size],
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn listener(&mut self) -> &mut TcpListener {
|
||||
&mut self.listener
|
||||
}
|
||||
|
||||
pub(crate) fn local_addr(&self) -> std::io::Result<SocketAddr> {
|
||||
self.listener.local_addr()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::sync::Mutex;
|
||||
|
@ -88,16 +88,31 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
||||
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
||||
/// also serves as the example application for this module.
|
||||
pub struct TcpSpacepacketsServer<TmError, TcError: 'static> {
|
||||
generic_server:
|
||||
TcpTmtcGenericServer<TmError, TcError, SpacepacketsTmSender, SpacepacketsTcParser>,
|
||||
pub struct TcpSpacepacketsServer<
|
||||
TmError,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> {
|
||||
generic_server: TcpTmtcGenericServer<
|
||||
TmError,
|
||||
TcError,
|
||||
TmSource,
|
||||
TcReceiver,
|
||||
SpacepacketsTmSender,
|
||||
SpacepacketsTcParser,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError> {
|
||||
/// Create a new TCP TMTC server which exchanges CCSDS space packets.
|
||||
impl<
|
||||
TmError: 'static,
|
||||
TcError: 'static,
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
> TcpSpacepacketsServer<TmError, TcError, TmSource, TcReceiver>
|
||||
{
|
||||
///
|
||||
/// ## Parameter
|
||||
///
|
||||
@ -110,10 +125,10 @@ impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError>
|
||||
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
|
||||
pub fn new(
|
||||
cfg: ServerConfig,
|
||||
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||
tm_source: TmSource,
|
||||
tc_receiver: TcReceiver,
|
||||
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
|
||||
) -> Result<Self, TcpTmtcError<TmError, TcError>> {
|
||||
) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
generic_server: TcpTmtcGenericServer::new(
|
||||
cfg,
|
||||
@ -179,11 +194,11 @@ mod tests {
|
||||
tc_receiver: SyncTcCacher,
|
||||
tm_source: SyncTmSource,
|
||||
packet_id_lookup: HashSet<PacketId>,
|
||||
) -> TcpSpacepacketsServer<(), ()> {
|
||||
) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> {
|
||||
TcpSpacepacketsServer::new(
|
||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||
Box::new(tm_source),
|
||||
Box::new(tc_receiver),
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
Box::new(packet_id_lookup),
|
||||
)
|
||||
.expect("TCP server generation failed")
|
||||
@ -220,13 +235,10 @@ mod tests {
|
||||
});
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||
let mut buffer: [u8; 32] = [0; 32];
|
||||
let packet_len_ping = ping_tc
|
||||
.write_to_bytes(&mut buffer)
|
||||
.expect("writing packet failed");
|
||||
let tc_0 = ping_tc.to_vec().expect("packet generation failed");
|
||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||
stream
|
||||
.write_all(&buffer[..packet_len_ping])
|
||||
.write_all(&tc_0)
|
||||
.expect("writing to TCP server failed");
|
||||
drop(stream);
|
||||
|
||||
@ -242,12 +254,11 @@ mod tests {
|
||||
// Check that TC has arrived.
|
||||
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||
assert_eq!(tc_queue.len(), 1);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multi_tc_multi_tm() {
|
||||
let mut buffer: [u8; 32] = [0; 32];
|
||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let mut tm_source = SyncTmSource::default();
|
||||
@ -256,19 +267,13 @@ mod tests {
|
||||
let mut total_tm_len = 0;
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
|
||||
let tm_packet_len = verif_tm
|
||||
.write_to_bytes(&mut buffer)
|
||||
.expect("writing packet failed");
|
||||
total_tm_len += tm_packet_len;
|
||||
let tm_0 = buffer[..tm_packet_len].to_vec();
|
||||
let tm_0 = verif_tm.to_vec().expect("writing packet failed");
|
||||
total_tm_len += tm_0.len();
|
||||
tm_source.add_tm(&tm_0);
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
|
||||
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true);
|
||||
let tm_packet_len = verif_tm
|
||||
.write_to_bytes(&mut buffer)
|
||||
.expect("writing packet failed");
|
||||
total_tm_len += tm_packet_len;
|
||||
let tm_1 = buffer[..tm_packet_len].to_vec();
|
||||
let tm_1 = verif_tm.to_vec().expect("writing packet failed");
|
||||
total_tm_len += tm_1.len();
|
||||
tm_source.add_tm(&tm_1);
|
||||
|
||||
// Set up server
|
||||
@ -309,19 +314,13 @@ mod tests {
|
||||
// Send telecommands
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
|
||||
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
|
||||
let packet_len = ping_tc
|
||||
.write_to_bytes(&mut buffer)
|
||||
.expect("writing packet failed");
|
||||
let tc_0 = buffer[..packet_len].to_vec();
|
||||
let tc_0 = ping_tc.to_vec().expect("ping tc creation failed");
|
||||
stream
|
||||
.write_all(&tc_0)
|
||||
.expect("writing to TCP server failed");
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
|
||||
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
|
||||
let packet_len = action_tc
|
||||
.write_to_bytes(&mut buffer)
|
||||
.expect("writing packet failed");
|
||||
let tc_1 = buffer[..packet_len].to_vec();
|
||||
let tc_1 = action_tc.to_vec().expect("action tc creation failed");
|
||||
stream
|
||||
.write_all(&tc_1)
|
||||
.expect("writing to TCP server failed");
|
||||
|
Reference in New Issue
Block a user