update docs

This commit is contained in:
Robin Müller 2024-04-14 19:16:56 +02:00
parent 12320c04ae
commit b2c00103db
Signed by: muellerr
GPG Key ID: A649FB78196E3849
16 changed files with 135 additions and 167 deletions

View File

@ -17,7 +17,7 @@ it is still centered around small packets. `sat-rs` provides support for these E
standards and also attempts to fill the gap to the internet protocol by providing the following standards and also attempts to fill the gap to the internet protocol by providing the following
components. components.
1. [UDP TMTC Server](https://docs.rs/satrs/latest/satrs/hal/host/udp_server/index.html). 1. [UDP TMTC Server](https://docs.rs/satrs/latest/satrs/hal/std/udp_server/index.html).
UDP is already packet based which makes it an excellent fit for exchanging space packets. UDP is already packet based which makes it an excellent fit for exchanging space packets.
2. [TCP TMTC Server Components](https://docs.rs/satrs/latest/satrs/hal/std/tcp_server/index.html). 2. [TCP TMTC Server Components](https://docs.rs/satrs/latest/satrs/hal/std/tcp_server/index.html).
TCP is a stream based protocol, so the library provides building blocks to parse telemetry TCP is a stream based protocol, so the library provides building blocks to parse telemetry
@ -39,8 +39,12 @@ task might be to store all arriving telemetry persistently. This is especially i
space systems which do not have permanent contact like low-earth-orbit (LEO) satellites. space systems which do not have permanent contact like low-earth-orbit (LEO) satellites.
The most important task of a TC source is to deliver the telecommands to the correct recipients. The most important task of a TC source is to deliver the telecommands to the correct recipients.
For modern component oriented software using message passing, this usually includes staged For component oriented software using message passing, this usually includes staged demultiplexing
demultiplexing components to determine where a command needs to be sent. components to determine where a command needs to be sent.
Using a generic concept of a TC source and a TM sink as part of the software design simplifies
the flexibility of the TMTC infrastructure: Newly added TM generators and TC receiver only have to
forward their generated or received packets to those handler objects.
# Low-level protocols and the bridge to the communcation subsystem # Low-level protocols and the bridge to the communcation subsystem

View File

@ -116,6 +116,7 @@ impl<
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
cell::RefCell,
collections::VecDeque, collections::VecDeque,
net::IpAddr, net::IpAddr,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
@ -134,13 +135,14 @@ mod tests {
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct TestSender { pub struct TestSender {
tc_vec: VecDeque<Vec<u8>>, tc_vec: RefCell<VecDeque<Vec<u8>>>,
} }
impl PacketSenderRaw for TestSender { impl PacketSenderRaw for TestSender {
type Error = (); type Error = ();
fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_vec.push_back(tc_raw.to_vec()); let mut mut_queue = self.tc_vec.borrow_mut();
mut_queue.push_back(tc_raw.to_vec());
Ok(()) Ok(())
} }
} }
@ -169,7 +171,8 @@ mod tests {
tm_handler, tm_handler,
}; };
udp_dyn_server.periodic_operation(); udp_dyn_server.periodic_operation();
assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); let queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow();
assert!(queue.is_empty());
assert!(tm_handler_calls.lock().unwrap().is_empty()); assert!(tm_handler_calls.lock().unwrap().is_empty());
} }
@ -196,15 +199,9 @@ mod tests {
client.send(&ping_tc).unwrap(); client.send(&ping_tc).unwrap();
udp_dyn_server.periodic_operation(); udp_dyn_server.periodic_operation();
{ {
//let mut tc_queue = tc_queue.lock().unwrap(); let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();
assert!(!udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); assert!(!queue.is_empty());
let received_tc = udp_dyn_server assert_eq!(queue.pop_front().unwrap(), ping_tc);
.udp_tc_server
.tc_sender
.tc_vec
.pop_front()
.unwrap();
assert_eq!(received_tc, ping_tc);
} }
{ {
@ -215,7 +212,9 @@ mod tests {
assert_eq!(received_addr, client_addr); assert_eq!(received_addr, client_addr);
} }
udp_dyn_server.periodic_operation(); udp_dyn_server.periodic_operation();
assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); let queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow();
assert!(queue.is_empty());
drop(queue);
// Still tries to send to the same client. // Still tries to send to the same client.
{ {
let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); let mut tm_handler_calls = tm_handler_calls.lock().unwrap();

View File

@ -180,8 +180,9 @@ pub trait TargetedPusService {
/// ///
/// The handler exposes the following API: /// The handler exposes the following API:
/// ///
/// 1. [Self::handle_one_tc] which tries to poll and handle one TC packet, covering steps 1-5. /// 1. [Self::poll_and_handle_next_tc] which tries to poll and handle one TC packet, covering
/// 2. [Self::check_one_reply] which tries to poll and handle one reply, covering step 6. /// steps 1-5.
/// 2. [Self::poll_and_check_next_reply] which tries to poll and handle one reply, covering step 6.
/// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7.
pub struct PusTargetedRequestService< pub struct PusTargetedRequestService<
TcReceiver: EcssTcReceiverCore, TcReceiver: EcssTcReceiverCore,

View File

@ -9,9 +9,9 @@ use crate::{tmtc::PacketSenderRaw, ValidatorU16Id};
/// If broken tail packets are detected, they are moved to the front of the buffer, and the write /// If broken tail packets are detected, they are moved to the front of the buffer, and the write
/// index for future write operations will be written to the `next_write_idx` argument. /// index for future write operations will be written to the `next_write_idx` argument.
/// ///
/// The parser will write all packets which were decoded successfully to the given `tc_receiver` /// The parser will forward all packets which were decoded successfully to the given
/// and return the number of packets found. If the [ReceivesTc::pass_tc] calls fails, the /// `packet_sender` and return the number of packets found. If the [PacketSenderRaw::send_raw_tc]
/// error will be returned. /// calls fails, the error will be returned.
pub fn parse_buffer_for_ccsds_space_packets<SendError>( pub fn parse_buffer_for_ccsds_space_packets<SendError>(
buf: &mut [u8], buf: &mut [u8],
packet_id_validator: &(impl ValidatorU16Id + ?Sized), packet_id_validator: &(impl ValidatorU16Id + ?Sized),
@ -74,12 +74,12 @@ mod tests {
.write_to_bytes(&mut buffer) .write_to_bytes(&mut buffer)
.expect("writing packet failed"); .expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0]; let valid_packet_ids = [TEST_PACKET_ID_0];
let mut tc_cacher = TcCacher::default(); let tc_cacher = TcCacher::default();
let mut next_write_idx = 0; let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets( let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer, &mut buffer,
valid_packet_ids.as_slice(), valid_packet_ids.as_slice(),
&mut tc_cacher, &tc_cacher,
&mut next_write_idx, &mut next_write_idx,
); );
assert!(parse_result.is_ok()); assert!(parse_result.is_ok());
@ -103,12 +103,12 @@ mod tests {
.write_to_bytes(&mut buffer[packet_len_ping..]) .write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed"); .expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0]; let valid_packet_ids = [TEST_PACKET_ID_0];
let mut tc_cacher = TcCacher::default(); let tc_cacher = TcCacher::default();
let mut next_write_idx = 0; let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets( let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer, &mut buffer,
valid_packet_ids.as_slice(), valid_packet_ids.as_slice(),
&mut tc_cacher, &tc_cacher,
&mut next_write_idx, &mut next_write_idx,
); );
assert!(parse_result.is_ok()); assert!(parse_result.is_ok());
@ -137,12 +137,12 @@ mod tests {
.write_to_bytes(&mut buffer[packet_len_ping..]) .write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed"); .expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let mut tc_cacher = TcCacher::default(); let tc_cacher = TcCacher::default();
let mut next_write_idx = 0; let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets( let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer, &mut buffer,
valid_packet_ids.as_slice(), valid_packet_ids.as_slice(),
&mut tc_cacher, &tc_cacher,
&mut next_write_idx, &mut next_write_idx,
); );
assert!(parse_result.is_ok()); assert!(parse_result.is_ok());
@ -171,12 +171,12 @@ mod tests {
.write_to_bytes(&mut buffer[packet_len_ping..]) .write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed"); .expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let mut tc_cacher = TcCacher::default(); let tc_cacher = TcCacher::default();
let mut next_write_idx = 0; let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets( let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer[..packet_len_ping + packet_len_action - 4], &mut buffer[..packet_len_ping + packet_len_action - 4],
valid_packet_ids.as_slice(), valid_packet_ids.as_slice(),
&mut tc_cacher, &tc_cacher,
&mut next_write_idx, &mut next_write_idx,
); );
assert!(parse_result.is_ok()); assert!(parse_result.is_ok());
@ -199,12 +199,12 @@ mod tests {
.write_to_bytes(&mut buffer) .write_to_bytes(&mut buffer)
.expect("writing packet failed"); .expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let mut tc_cacher = TcCacher::default(); let tc_cacher = TcCacher::default();
let mut next_write_idx = 0; let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets( let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer[..packet_len_ping - 4], &mut buffer[..packet_len_ping - 4],
valid_packet_ids.as_slice(), valid_packet_ids.as_slice(),
&mut tc_cacher, &tc_cacher,
&mut next_write_idx, &mut next_write_idx,
); );
assert_eq!(next_write_idx, 0); assert_eq!(next_write_idx, 0);

View File

@ -106,19 +106,19 @@ pub(crate) mod tests {
#[test] #[test]
fn test_parsing_simple_packet() { fn test_parsing_simple_packet() {
let mut test_sender = TcCacher::default(); let test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut next_read_idx = 0; let mut next_read_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
&mut encoded_buf[0..current_idx], &mut encoded_buf[0..current_idx],
&mut test_sender, &test_sender,
&mut next_read_idx, &mut next_read_idx,
) )
.unwrap(); .unwrap();
assert_eq!(packets, 1); assert_eq!(packets, 1);
let mut queue = test_sender.tc_queue.borrow_mut(); let queue = test_sender.tc_queue.borrow();
assert_eq!(queue.len(), 1); assert_eq!(queue.len(), 1);
let packet = &queue[0]; let packet = &queue[0];
assert_eq!(packet, &SIMPLE_PACKET); assert_eq!(packet, &SIMPLE_PACKET);
@ -126,7 +126,7 @@ pub(crate) mod tests {
#[test] #[test]
fn test_parsing_consecutive_packets() { fn test_parsing_consecutive_packets() {
let mut test_sender = TcCacher::default(); let test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -140,12 +140,12 @@ pub(crate) mod tests {
let mut next_read_idx = 0; let mut next_read_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
&mut encoded_buf[0..current_idx], &mut encoded_buf[0..current_idx],
&mut test_sender, &test_sender,
&mut next_read_idx, &mut next_read_idx,
) )
.unwrap(); .unwrap();
assert_eq!(packets, 2); assert_eq!(packets, 2);
let mut queue = test_sender.tc_queue.borrow_mut(); let queue = test_sender.tc_queue.borrow();
assert_eq!(queue.len(), 2); assert_eq!(queue.len(), 2);
let packet0 = &queue[0]; let packet0 = &queue[0];
assert_eq!(packet0, &SIMPLE_PACKET); assert_eq!(packet0, &SIMPLE_PACKET);
@ -155,7 +155,7 @@ pub(crate) mod tests {
#[test] #[test]
fn test_split_tail_packet_only() { fn test_split_tail_packet_only() {
let mut test_sender = TcCacher::default(); let test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0; let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx); encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -163,18 +163,18 @@ pub(crate) mod tests {
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end. // Cut off the sentinel byte at the end.
&mut encoded_buf[0..current_idx - 1], &mut encoded_buf[0..current_idx - 1],
&mut test_sender, &test_sender,
&mut next_read_idx, &mut next_read_idx,
) )
.unwrap(); .unwrap();
assert_eq!(packets, 0); assert_eq!(packets, 0);
let mut queue = test_sender.tc_queue.borrow_mut(); let queue = test_sender.tc_queue.borrow();
assert_eq!(queue.len(), 0); assert_eq!(queue.len(), 0);
assert_eq!(next_read_idx, 0); assert_eq!(next_read_idx, 0);
} }
fn generic_test_split_packet(cut_off: usize) { fn generic_test_split_packet(cut_off: usize) {
let mut test_sender = TcCacher::default(); let test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
assert!(cut_off < INVERTED_PACKET.len() + 1); assert!(cut_off < INVERTED_PACKET.len() + 1);
let mut current_idx = 0; let mut current_idx = 0;
@ -196,12 +196,12 @@ pub(crate) mod tests {
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end. // Cut off the sentinel byte at the end.
&mut encoded_buf[0..current_idx - cut_off], &mut encoded_buf[0..current_idx - cut_off],
&mut test_sender, &test_sender,
&mut next_write_idx, &mut next_write_idx,
) )
.unwrap(); .unwrap();
assert_eq!(packets, 1); assert_eq!(packets, 1);
let mut queue = test_sender.tc_queue.borrow_mut(); let queue = test_sender.tc_queue.borrow();
assert_eq!(queue.len(), 1); assert_eq!(queue.len(), 1);
assert_eq!(&queue[0], &SIMPLE_PACKET); assert_eq!(&queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, next_expected_write_idx); assert_eq!(next_write_idx, next_expected_write_idx);
@ -225,7 +225,7 @@ pub(crate) mod tests {
#[test] #[test]
fn test_zero_at_end() { fn test_zero_at_end() {
let mut test_sender = TcCacher::default(); let test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16]; let mut encoded_buf: [u8; 16] = [0; 16];
let mut next_write_idx = 0; let mut next_write_idx = 0;
let mut current_idx = 0; let mut current_idx = 0;
@ -237,12 +237,12 @@ pub(crate) mod tests {
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end. // Cut off the sentinel byte at the end.
&mut encoded_buf[0..current_idx], &mut encoded_buf[0..current_idx],
&mut test_sender, &test_sender,
&mut next_write_idx, &mut next_write_idx,
) )
.unwrap(); .unwrap();
assert_eq!(packets, 1); assert_eq!(packets, 1);
let mut queue = test_sender.tc_queue.borrow_mut(); let queue = test_sender.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1); assert_eq!(queue.len(), 1);
assert_eq!(&queue[0], &SIMPLE_PACKET); assert_eq!(&queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, 1); assert_eq!(next_write_idx, 1);
@ -251,18 +251,18 @@ pub(crate) mod tests {
#[test] #[test]
fn test_all_zeroes() { fn test_all_zeroes() {
let mut test_sender = TcCacher::default(); let test_sender = TcCacher::default();
let mut all_zeroes: [u8; 5] = [0; 5]; let mut all_zeroes: [u8; 5] = [0; 5];
let mut next_write_idx = 0; let mut next_write_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets( let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end. // Cut off the sentinel byte at the end.
&mut all_zeroes, &mut all_zeroes,
&mut test_sender, &test_sender,
&mut next_write_idx, &mut next_write_idx,
) )
.unwrap(); .unwrap();
assert_eq!(packets, 0); assert_eq!(packets, 0);
let mut queue = test_sender.tc_queue.borrow_mut(); let queue = test_sender.tc_queue.borrow();
assert!(queue.is_empty()); assert!(queue.is_empty());
assert_eq!(next_write_idx, 0); assert_eq!(next_write_idx, 0);
} }

View File

@ -28,14 +28,14 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
fn handle_tc_parsing( fn handle_tc_parsing(
&mut self, &mut self,
tc_buffer: &mut [u8], tc_buffer: &mut [u8],
tc_receiver: &mut (impl PacketSenderRaw<Error = TcError> + ?Sized), tc_sender: &(impl PacketSenderRaw<Error = TcError> + ?Sized),
conn_result: &mut HandledConnectionInfo, conn_result: &mut HandledConnectionInfo,
current_write_idx: usize, current_write_idx: usize,
next_write_idx: &mut usize, next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> { ) -> Result<(), TcpTmtcError<TmError, TcError>> {
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut tc_buffer[..current_write_idx], &mut tc_buffer[..current_write_idx],
tc_receiver, tc_sender,
next_write_idx, next_write_idx,
) )
.map_err(|e| TcpTmtcError::TcError(e))?; .map_err(|e| TcpTmtcError::TcError(e))?;

View File

@ -116,17 +116,17 @@ pub trait HandledConnectionHandler {
} }
/// Generic parser abstraction for an object which can parse for telecommands given a raw /// Generic parser abstraction for an object which can parse for telecommands given a raw
/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand /// bytestream received from a TCP socket and send them using a generic [PacketSenderRaw]
/// receiver. This allows different encoding schemes for telecommands. /// implementation. This allows different encoding schemes for telecommands.
pub trait TcpTcParser<TmError, TcError> { pub trait TcpTcParser<TmError, SendError> {
fn handle_tc_parsing( fn handle_tc_parsing(
&mut self, &mut self,
tc_buffer: &mut [u8], tc_buffer: &mut [u8],
tc_receiver: &mut (impl PacketSenderRaw<Error = TcError> + ?Sized), tc_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
conn_result: &mut HandledConnectionInfo, conn_result: &mut HandledConnectionInfo,
current_write_idx: usize, current_write_idx: usize,
next_write_idx: &mut usize, next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>>; ) -> Result<(), TcpTmtcError<TmError, SendError>>;
} }
/// Generic sender abstraction for an object which can pull telemetry from a given TM source /// Generic sender abstraction for an object which can pull telemetry from a given TM source
@ -150,7 +150,7 @@ pub trait TcpTmSender<TmError, TcError> {
/// through the following 4 core abstractions: /// through the following 4 core abstractions:
/// ///
/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client. /// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client.
/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver. /// 2. Parsed telecommands will be sent using the [PacketSenderRaw] object.
/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client. /// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client.
/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender]. /// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender].
/// ///
@ -162,19 +162,19 @@ pub trait TcpTmSender<TmError, TcError> {
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol. /// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
pub struct TcpTmtcGenericServer< pub struct TcpTmtcGenericServer<
TmSource: TmPacketSource<Error = TmError>, TmSource: TmPacketSource<Error = TmError>,
TcReceiver: PacketSenderRaw<Error = TcError>, TcSender: PacketSenderRaw<Error = TcSendError>,
TmSender: TcpTmSender<TmError, TcError>, TmSender: TcpTmSender<TmError, TcSendError>,
TcParser: TcpTcParser<TmError, TcError>, TcParser: TcpTcParser<TmError, TcSendError>,
HandledConnection: HandledConnectionHandler, HandledConnection: HandledConnectionHandler,
TmError, TmError,
TcError, TcSendError,
> { > {
pub finished_handler: HandledConnection, pub finished_handler: HandledConnection,
pub(crate) listener: TcpListener, pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration, pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: TmSource, pub(crate) tm_source: TmSource,
pub(crate) tm_buffer: Vec<u8>, pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: TcReceiver, pub(crate) tc_receiver: TcSender,
pub(crate) tc_buffer: Vec<u8>, pub(crate) tc_buffer: Vec<u8>,
poll: Poll, poll: Poll,
events: Events, events: Events,
@ -185,21 +185,21 @@ pub struct TcpTmtcGenericServer<
impl< impl<
TmSource: TmPacketSource<Error = TmError>, TmSource: TmPacketSource<Error = TmError>,
TcReceiver: PacketSenderRaw<Error = TcError>, TcSender: PacketSenderRaw<Error = TcSendError>,
TmSender: TcpTmSender<TmError, TcError>, TmSender: TcpTmSender<TmError, TcSendError>,
TcParser: TcpTcParser<TmError, TcError>, TcParser: TcpTcParser<TmError, TcSendError>,
HandledConnection: HandledConnectionHandler, HandledConnection: HandledConnectionHandler,
TmError: 'static, TmError: 'static,
TcError: 'static, TcSendError: 'static,
> >
TcpTmtcGenericServer< TcpTmtcGenericServer<
TmSource, TmSource,
TcReceiver, TcSender,
TmSender, TmSender,
TcParser, TcParser,
HandledConnection, HandledConnection,
TmError, TmError,
TcError, TcSendError,
> >
{ {
/// Create a new generic TMTC server instance. /// Create a new generic TMTC server instance.
@ -212,15 +212,15 @@ impl<
/// * `tm_sender` - Sends back telemetry to the client using the specified TM source. /// * `tm_sender` - Sends back telemetry to the client using the specified TM source.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client. /// then sent back to the client.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded /// * `tc_sender` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver. /// using this TC sender.
/// * `stop_signal` - Can be used to stop the server even if a connection is ongoing. /// * `stop_signal` - Can be used to stop the server even if a connection is ongoing.
pub fn new( pub fn new(
cfg: ServerConfig, cfg: ServerConfig,
tc_parser: TcParser, tc_parser: TcParser,
tm_sender: TmSender, tm_sender: TmSender,
tm_source: TmSource, tm_source: TmSource,
tc_receiver: TcReceiver, tc_receiver: TcSender,
finished_handler: HandledConnection, finished_handler: HandledConnection,
stop_signal: Option<Arc<AtomicBool>>, stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
@ -290,7 +290,7 @@ impl<
pub fn handle_next_connection( pub fn handle_next_connection(
&mut self, &mut self,
poll_timeout: Option<Duration>, poll_timeout: Option<Duration>,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> { ) -> Result<ConnectionResult, TcpTmtcError<TmError, TcSendError>> {
let mut handled_connections = 0; let mut handled_connections = 0;
// Poll Mio for events. // Poll Mio for events.
self.poll.poll(&mut self.events, poll_timeout)?; self.poll.poll(&mut self.events, poll_timeout)?;
@ -329,7 +329,7 @@ impl<
&mut self, &mut self,
mut stream: TcpStream, mut stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
) -> Result<(), TcpTmtcError<TmError, TcError>> { ) -> Result<(), TcpTmtcError<TmError, TcSendError>> {
let mut current_write_idx; let mut current_write_idx;
let mut next_write_idx = 0; let mut next_write_idx = 0;
let mut connection_result = HandledConnectionInfo::new(addr); let mut connection_result = HandledConnectionInfo::new(addr);
@ -343,7 +343,7 @@ impl<
if current_write_idx > 0 { if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.tc_buffer, &mut self.tc_buffer,
&mut self.tc_receiver, &self.tc_receiver,
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -357,7 +357,7 @@ impl<
if current_write_idx == self.tc_buffer.capacity() { if current_write_idx == self.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.tc_buffer, &mut self.tc_buffer,
&mut self.tc_receiver, &self.tc_receiver,
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -371,7 +371,7 @@ impl<
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.tc_buffer, &mut self.tc_buffer,
&mut self.tc_receiver, &self.tc_receiver,
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -420,11 +420,11 @@ impl<
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use std::sync::{mpsc, Mutex}; use std::sync::Mutex;
use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use crate::tmtc::{PacketSenderRaw, TmPacketSource}; use crate::tmtc::TmPacketSource;
use super::*; use super::*;

View File

@ -32,7 +32,7 @@ impl<PacketIdChecker: ValidatorU16Id, TmError, TcError: 'static> TcpTcParser<TmE
fn handle_tc_parsing( fn handle_tc_parsing(
&mut self, &mut self,
tc_buffer: &mut [u8], tc_buffer: &mut [u8],
tc_receiver: &mut (impl PacketSenderRaw<Error = TcError> + ?Sized), tc_sender: &(impl PacketSenderRaw<Error = TcError> + ?Sized),
conn_result: &mut HandledConnectionInfo, conn_result: &mut HandledConnectionInfo,
current_write_idx: usize, current_write_idx: usize,
next_write_idx: &mut usize, next_write_idx: &mut usize,
@ -41,7 +41,7 @@ impl<PacketIdChecker: ValidatorU16Id, TmError, TcError: 'static> TcpTcParser<TmE
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets( conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
&mut tc_buffer[..current_write_idx], &mut tc_buffer[..current_write_idx],
&self.packet_id_lookup, &self.packet_id_lookup,
tc_receiver, tc_sender,
next_write_idx, next_write_idx,
) )
.map_err(|e| TcpTmtcError::TcError(e))?; .map_err(|e| TcpTmtcError::TcError(e))?;

View File

@ -11,46 +11,42 @@ use std::vec::Vec;
/// ///
/// It caches all received telecomands into a vector. The maximum expected telecommand size should /// It caches all received telecomands into a vector. The maximum expected telecommand size should
/// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC /// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC
/// sender in form of a special trait object which implements [ReceivesTc]. For example, this can /// sender in form of a special trait object which implements [PacketSenderRaw]. For example, this
/// be used to send the telecommands to a centralized TC source. Please note that the /// can be used to send the telecommands to a centralized TC source component for further
/// receiver should copy out the received data if it the data is required past the /// processing and routing.
/// [ReceivesTc::pass_tc] call and no message passing is used to process the packet.
/// ///
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
/// use std::sync::mpsc;
/// use spacepackets::ecss::WritablePusPacket; /// use spacepackets::ecss::WritablePusPacket;
/// use satrs::hal::std::udp_server::UdpTcServer; /// use satrs::hal::std::udp_server::UdpTcServer;
/// use satrs::tmtc::ReceivesTc; /// use satrs::tmtc::PacketSenderRaw;
/// use spacepackets::SpHeader; /// use spacepackets::SpHeader;
/// use spacepackets::ecss::tc::PusTcCreator; /// use spacepackets::ecss::tc::PusTcCreator;
/// ///
/// #[derive (Default)] /// let (packet_sender, packet_receiver) = mpsc::channel();
/// struct PingReceiver {}
/// impl ReceivesTc for PingReceiver {
/// type Error = ();
/// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
/// assert_eq!(tc_raw.len(), 13);
/// Ok(())
/// }
/// }
///
/// let mut buf = [0; 32];
/// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); /// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
/// let ping_receiver = PingReceiver::default(); /// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, packet_sender)
/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver)
/// .expect("Creating UDP TMTC server failed"); /// .expect("Creating UDP TMTC server failed");
/// let sph = SpHeader::new_from_apid(0x02); /// let sph = SpHeader::new_from_apid(0x02);
/// let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); /// let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true);
/// let len = pus_tc /// // Can not fail.
/// .write_to_bytes(&mut buf) /// let ping_tc_raw = pus_tc.to_vec().unwrap();
/// .expect("Error writing PUS TC packet"); ///
/// assert_eq!(len, 13); /// // Now create a UDP client and send the ping telecommand to the server.
/// let client = UdpSocket::bind("127.0.0.1:7778").expect("Connecting to UDP server failed"); /// let client = UdpSocket::bind("127.0.0.1:0").expect("creating UDP client failed");
/// client /// client
/// .send_to(&buf[0..len], dest_addr) /// .send_to(&ping_tc_raw, dest_addr)
/// .expect("Error sending PUS TC via UDP"); /// .expect("Error sending PUS TC via UDP");
/// let recv_result = udp_tc_server.try_recv_tc();
/// assert!(recv_result.is_ok());
/// // The packet is received by the UDP TC server and sent via the mpsc channel.
/// let sent_packet = packet_receiver.try_recv().expect("expected telecommand");
/// assert_eq!(sent_packet, ping_tc_raw);
/// // No more packets received.
/// matches!(packet_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
/// ``` /// ```
/// ///
/// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example) /// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example)

View File

@ -269,14 +269,8 @@ pub trait ModeReplySender {
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub mod alloc_mod { pub mod alloc_mod {
use crate::{ use crate::request::{
mode::ModeRequest, MessageSender, MessageSenderAndReceiver, MessageSenderMap, RequestAndReplySenderAndReceiver,
queue::GenericTargetedMessagingError,
request::{
MessageMetadata, MessageSender, MessageSenderAndReceiver, MessageSenderMap,
RequestAndReplySenderAndReceiver, RequestId,
},
ComponentId,
}; };
use super::*; use super::*;
@ -558,8 +552,6 @@ pub mod alloc_mod {
pub mod std_mod { pub mod std_mod {
use std::sync::mpsc; use std::sync::mpsc;
use crate::request::GenericMessage;
use super::*; use super::*;
pub type ModeRequestHandlerMpsc = ModeRequestHandlerInterface< pub type ModeRequestHandlerMpsc = ModeRequestHandlerInterface<

View File

@ -349,8 +349,6 @@ pub mod alloc_mod {
use super::*; use super::*;
use crate::pus::verification::VerificationReportingProvider;
/// Extension trait for [EcssTmSenderCore]. /// Extension trait for [EcssTmSenderCore].
/// ///
/// It provides additional functionality, for example by implementing the [Downcast] trait /// It provides additional functionality, for example by implementing the [Downcast] trait

View File

@ -345,10 +345,7 @@ pub mod alloc_mod {
}, },
vec::Vec, vec::Vec,
}; };
use spacepackets::time::{ use spacepackets::time::cds::{self, DaysLen24Bits};
cds::{self, DaysLen24Bits},
UnixTime,
};
use crate::pool::StoreAddr; use crate::pool::StoreAddr;

View File

@ -886,7 +886,7 @@ pub mod alloc_mod {
use spacepackets::ecss::PusError; use spacepackets::ecss::PusError;
use super::*; use super::*;
use crate::{pus::PusTmVariant, ComponentId}; use crate::pus::PusTmVariant;
use core::cell::RefCell; use core::cell::RefCell;
#[derive(Clone)] #[derive(Clone)]

View File

@ -193,8 +193,6 @@ impl<MSG, R: MessageReceiver<MSG>> MessageReceiverWithId<MSG, R> {
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub mod alloc_mod { pub mod alloc_mod {
use core::marker::PhantomData;
use crate::queue::GenericSendError; use crate::queue::GenericSendError;
use super::*; use super::*;
@ -333,7 +331,7 @@ pub mod std_mod {
use super::*; use super::*;
use std::sync::mpsc; use std::sync::mpsc;
use crate::queue::{GenericReceiveError, GenericSendError, GenericTargetedMessagingError}; use crate::queue::{GenericReceiveError, GenericSendError};
impl<MSG: Send> MessageSender<MSG> for mpsc::Sender<GenericMessage<MSG>> { impl<MSG: Send> MessageSender<MSG> for mpsc::Sender<GenericMessage<MSG>> {
fn send(&self, message: GenericMessage<MSG>) -> Result<(), GenericTargetedMessagingError> { fn send(&self, message: GenericMessage<MSG>) -> Result<(), GenericTargetedMessagingError> {

View File

@ -1,10 +1,12 @@
//! Telemetry and Telecommanding (TMTC) module. Contains packet routing components with special //! Telemetry and Telecommanding (TMTC) module. Contains packet routing components with special
//! support for CCSDS and ECSS packets. //! support for CCSDS and ECSS packets.
//! //!
//! The distributor modules provided by this module use trait objects provided by the user to //! It is recommended to read the [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/communication.html)
//! directly dispatch received packets to packet listeners based on packet fields like the CCSDS //! about communication first. The TMTC abstractions provided by this framework are based on the
//! Application Process ID (APID) or the ECSS PUS service type. This allows for fast packet //! assumption that all telemetry is sent to a special handler object called the TM sink while
//! routing without the overhead and complication of using message queues. However, it also requires //! all received telecommands are sent to a special handler object called TC source. Using
//! a design like this makes it simpler to add new TC packet sources or new telemetry generators:
//! They only need to send the received and generated data to these objects.
use std::sync::mpsc; use std::sync::mpsc;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
@ -49,9 +51,9 @@ impl PacketSenderRaw for mpsc::SyncSender<alloc::vec::Vec<u8>> {
} }
} }
/// Extension trait of [ReceivesTc] which allows downcasting by implementing [Downcast]. /// Extension trait of [PacketSenderRaw] which allows downcasting by implementing [Downcast].
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub trait TcSenderRawExt: PacketSenderRaw + Downcast { pub trait PacketSenderRawExt: PacketSenderRaw + Downcast {
// Remove this once trait upcasting coercion has been implemented. // Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991 // Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn PacketSenderRaw<Error = Self::Error>; fn upcast(&self) -> &dyn PacketSenderRaw<Error = Self::Error>;
@ -60,10 +62,10 @@ pub trait TcSenderRawExt: PacketSenderRaw + Downcast {
fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw<Error = Self::Error>; fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw<Error = Self::Error>;
} }
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// Blanket implementation to automatically implement [PacketSenderRawExt] when the [alloc]
/// is enabled. /// feature is enabled.
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl<T> TcSenderRawExt for T impl<T> PacketSenderRawExt for T
where where
T: PacketSenderRaw + Send + 'static, T: PacketSenderRaw + Send + 'static,
{ {
@ -80,7 +82,7 @@ where
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl_downcast!(TcSenderRawExt assoc Error); impl_downcast!(PacketSenderRawExt assoc Error);
/// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets /// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets
/// for CCSDS File Delivery Protocol (CFDP) packets. /// for CCSDS File Delivery Protocol (CFDP) packets.
@ -132,7 +134,7 @@ pub trait TmPacketSourceExt: TmPacketSource + Downcast {
fn upcast_mut(&mut self) -> &mut dyn TmPacketSource<Error = Self::Error>; fn upcast_mut(&mut self) -> &mut dyn TmPacketSource<Error = Self::Error>;
} }
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// Blanket implementation to automatically implement [TmPacketSourceExt] when the [alloc] feature
/// is enabled. /// is enabled.
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl<T> TmPacketSourceExt for T impl<T> TmPacketSourceExt for T

View File

@ -17,7 +17,7 @@ use core::{
use std::{ use std::{
io::{Read, Write}, io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::Mutex, sync::{mpsc, Mutex},
thread, thread,
}; };
@ -28,7 +28,7 @@ use satrs::{
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig, ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig,
TcpSpacepacketsServer, TcpTmtcInCobsServer, TcpSpacepacketsServer, TcpTmtcInCobsServer,
}, },
tmtc::{PacketSenderRaw, TmPacketSource}, tmtc::TmPacketSource,
}; };
use spacepackets::{ use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket}, ecss::{tc::PusTcCreator, WritablePusPacket},
@ -61,21 +61,6 @@ impl ConnectionFinishedHandler {
assert!(self.connection_info.is_empty()); assert!(self.connection_info.is_empty());
} }
} }
#[derive(Default, Clone)]
struct SyncTcCacher {
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl PacketSenderRaw for SyncTcCacher {
type Error = ();
fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
println!("Received TC: {:x?}", tc_raw);
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct SyncTmSource { struct SyncTmSource {
@ -117,14 +102,14 @@ const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127,
#[test] #[test]
fn test_cobs_server() { fn test_cobs_server() {
let tc_receiver = SyncTcCacher::default(); let (tc_sender, tc_receiver) = mpsc::channel();
let mut tm_source = SyncTmSource::default(); let mut tm_source = SyncTmSource::default();
// Insert a telemetry packet which will be read back by the client at a later stage. // Insert a telemetry packet which will be read back by the client at a later stage.
tm_source.add_tm(&INVERTED_PACKET); tm_source.add_tm(&INVERTED_PACKET);
let mut tcp_server = TcpTmtcInCobsServer::new( let mut tcp_server = TcpTmtcInCobsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
tm_source, tm_source,
tc_receiver.clone(), tc_sender.clone(),
ConnectionFinishedHandler::default(), ConnectionFinishedHandler::default(),
None, None,
) )
@ -190,13 +175,9 @@ fn test_cobs_server() {
panic!("connection was not handled properly"); panic!("connection was not handled properly");
} }
// Check that the packet was received and decoded successfully. // Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver let tc = tc_receiver.try_recv().expect("no TC received");
.tc_queue assert_eq!(tc, SIMPLE_PACKET);
.lock() matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
} }
const TEST_APID_0: u16 = 0x02; const TEST_APID_0: u16 = 0x02;
@ -204,7 +185,7 @@ const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0);
#[test] #[test]
fn test_ccsds_server() { fn test_ccsds_server() {
let tc_receiver = SyncTcCacher::default(); let (tc_sender, tc_receiver) = mpsc::channel();
let mut tm_source = SyncTmSource::default(); let mut tm_source = SyncTmSource::default();
let sph = SpHeader::new_for_unseg_tc(TEST_APID_0, 0, 0); let sph = SpHeader::new_for_unseg_tc(TEST_APID_0, 0, 0);
let verif_tm = PusTcCreator::new_simple(sph, 1, 1, &[], true); let verif_tm = PusTcCreator::new_simple(sph, 1, 1, &[], true);
@ -215,7 +196,7 @@ fn test_ccsds_server() {
let mut tcp_server = TcpSpacepacketsServer::new( let mut tcp_server = TcpSpacepacketsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
tm_source, tm_source,
tc_receiver.clone(), tc_sender,
packet_id_lookup, packet_id_lookup,
ConnectionFinishedHandler::default(), ConnectionFinishedHandler::default(),
None, None,
@ -282,7 +263,7 @@ fn test_ccsds_server() {
panic!("connection was not handled properly"); panic!("connection was not handled properly");
} }
// Check that TC has arrived. // Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); let tc = tc_receiver.try_recv().expect("no TC received");
assert_eq!(tc_queue.len(), 1); assert_eq!(tc, tc_0);
assert_eq!(tc_queue.pop_front().unwrap(), tc_0); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
} }