From b2c00103dbcf521c05ff843de9b0f7df1e780ce8 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sun, 14 Apr 2024 19:16:56 +0200 Subject: [PATCH] update docs --- satrs-book/src/communication.md | 10 ++-- satrs-example/src/interface/udp.rs | 27 +++++----- satrs-example/src/pus/mod.rs | 5 +- satrs/src/encoding/ccsds.rs | 26 +++++----- satrs/src/encoding/cobs.rs | 36 ++++++------- satrs/src/hal/std/tcp_cobs_server.rs | 4 +- satrs/src/hal/std/tcp_server.rs | 54 ++++++++++---------- satrs/src/hal/std/tcp_spacepackets_server.rs | 4 +- satrs/src/hal/std/udp_server.rs | 44 ++++++++-------- satrs/src/mode.rs | 12 +---- satrs/src/pus/mod.rs | 2 - satrs/src/pus/scheduler.rs | 5 +- satrs/src/pus/verification.rs | 2 +- satrs/src/request.rs | 4 +- satrs/src/tmtc/mod.rs | 24 +++++---- satrs/tests/tcp_servers.rs | 43 +++++----------- 16 files changed, 135 insertions(+), 167 deletions(-) diff --git a/satrs-book/src/communication.md b/satrs-book/src/communication.md index f102f6b..5ab55e6 100644 --- a/satrs-book/src/communication.md +++ b/satrs-book/src/communication.md @@ -17,7 +17,7 @@ it is still centered around small packets. `sat-rs` provides support for these E standards and also attempts to fill the gap to the internet protocol by providing the following components. -1. [UDP TMTC Server](https://docs.rs/satrs/latest/satrs/hal/host/udp_server/index.html). +1. [UDP TMTC Server](https://docs.rs/satrs/latest/satrs/hal/std/udp_server/index.html). UDP is already packet based which makes it an excellent fit for exchanging space packets. 2. [TCP TMTC Server Components](https://docs.rs/satrs/latest/satrs/hal/std/tcp_server/index.html). TCP is a stream based protocol, so the library provides building blocks to parse telemetry @@ -39,8 +39,12 @@ task might be to store all arriving telemetry persistently. This is especially i space systems which do not have permanent contact like low-earth-orbit (LEO) satellites. The most important task of a TC source is to deliver the telecommands to the correct recipients. -For modern component oriented software using message passing, this usually includes staged -demultiplexing components to determine where a command needs to be sent. +For component oriented software using message passing, this usually includes staged demultiplexing +components to determine where a command needs to be sent. + +Using a generic concept of a TC source and a TM sink as part of the software design simplifies +the flexibility of the TMTC infrastructure: Newly added TM generators and TC receiver only have to +forward their generated or received packets to those handler objects. # Low-level protocols and the bridge to the communcation subsystem diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index d10a0f4..128dde2 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -116,6 +116,7 @@ impl< #[cfg(test)] mod tests { use std::{ + cell::RefCell, collections::VecDeque, net::IpAddr, sync::{Arc, Mutex}, @@ -134,13 +135,14 @@ mod tests { #[derive(Default, Debug)] pub struct TestSender { - tc_vec: VecDeque>, + tc_vec: RefCell>>, } impl PacketSenderRaw for TestSender { type Error = (); - fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.tc_vec.push_back(tc_raw.to_vec()); + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut mut_queue = self.tc_vec.borrow_mut(); + mut_queue.push_back(tc_raw.to_vec()); Ok(()) } } @@ -169,7 +171,8 @@ mod tests { tm_handler, }; udp_dyn_server.periodic_operation(); - assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); + let queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow(); + assert!(queue.is_empty()); assert!(tm_handler_calls.lock().unwrap().is_empty()); } @@ -196,15 +199,9 @@ mod tests { client.send(&ping_tc).unwrap(); udp_dyn_server.periodic_operation(); { - //let mut tc_queue = tc_queue.lock().unwrap(); - assert!(!udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); - let received_tc = udp_dyn_server - .udp_tc_server - .tc_sender - .tc_vec - .pop_front() - .unwrap(); - assert_eq!(received_tc, ping_tc); + let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut(); + assert!(!queue.is_empty()); + assert_eq!(queue.pop_front().unwrap(), ping_tc); } { @@ -215,7 +212,9 @@ mod tests { assert_eq!(received_addr, client_addr); } udp_dyn_server.periodic_operation(); - assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); + let queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow(); + assert!(queue.is_empty()); + drop(queue); // Still tries to send to the same client. { let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 8794a1e..00b0f3f 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -180,8 +180,9 @@ pub trait TargetedPusService { /// /// The handler exposes the following API: /// -/// 1. [Self::handle_one_tc] which tries to poll and handle one TC packet, covering steps 1-5. -/// 2. [Self::check_one_reply] which tries to poll and handle one reply, covering step 6. +/// 1. [Self::poll_and_handle_next_tc] which tries to poll and handle one TC packet, covering +/// steps 1-5. +/// 2. [Self::poll_and_check_next_reply] which tries to poll and handle one reply, covering step 6. /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. pub struct PusTargetedRequestService< TcReceiver: EcssTcReceiverCore, diff --git a/satrs/src/encoding/ccsds.rs b/satrs/src/encoding/ccsds.rs index 7cf97bd..971c25f 100644 --- a/satrs/src/encoding/ccsds.rs +++ b/satrs/src/encoding/ccsds.rs @@ -9,9 +9,9 @@ use crate::{tmtc::PacketSenderRaw, ValidatorU16Id}; /// If broken tail packets are detected, they are moved to the front of the buffer, and the write /// index for future write operations will be written to the `next_write_idx` argument. /// -/// The parser will write all packets which were decoded successfully to the given `tc_receiver` -/// and return the number of packets found. If the [ReceivesTc::pass_tc] calls fails, the -/// error will be returned. +/// The parser will forward all packets which were decoded successfully to the given +/// `packet_sender` and return the number of packets found. If the [PacketSenderRaw::send_raw_tc] +/// calls fails, the error will be returned. pub fn parse_buffer_for_ccsds_space_packets( buf: &mut [u8], packet_id_validator: &(impl ValidatorU16Id + ?Sized), @@ -74,12 +74,12 @@ mod tests { .write_to_bytes(&mut buffer) .expect("writing packet failed"); let valid_packet_ids = [TEST_PACKET_ID_0]; - let mut tc_cacher = TcCacher::default(); + let tc_cacher = TcCacher::default(); let mut next_write_idx = 0; let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer, valid_packet_ids.as_slice(), - &mut tc_cacher, + &tc_cacher, &mut next_write_idx, ); assert!(parse_result.is_ok()); @@ -103,12 +103,12 @@ mod tests { .write_to_bytes(&mut buffer[packet_len_ping..]) .expect("writing packet failed"); let valid_packet_ids = [TEST_PACKET_ID_0]; - let mut tc_cacher = TcCacher::default(); + let tc_cacher = TcCacher::default(); let mut next_write_idx = 0; let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer, valid_packet_ids.as_slice(), - &mut tc_cacher, + &tc_cacher, &mut next_write_idx, ); assert!(parse_result.is_ok()); @@ -137,12 +137,12 @@ mod tests { .write_to_bytes(&mut buffer[packet_len_ping..]) .expect("writing packet failed"); let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; - let mut tc_cacher = TcCacher::default(); + let tc_cacher = TcCacher::default(); let mut next_write_idx = 0; let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer, valid_packet_ids.as_slice(), - &mut tc_cacher, + &tc_cacher, &mut next_write_idx, ); assert!(parse_result.is_ok()); @@ -171,12 +171,12 @@ mod tests { .write_to_bytes(&mut buffer[packet_len_ping..]) .expect("writing packet failed"); let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; - let mut tc_cacher = TcCacher::default(); + let tc_cacher = TcCacher::default(); let mut next_write_idx = 0; let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer[..packet_len_ping + packet_len_action - 4], valid_packet_ids.as_slice(), - &mut tc_cacher, + &tc_cacher, &mut next_write_idx, ); assert!(parse_result.is_ok()); @@ -199,12 +199,12 @@ mod tests { .write_to_bytes(&mut buffer) .expect("writing packet failed"); let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; - let mut tc_cacher = TcCacher::default(); + let tc_cacher = TcCacher::default(); let mut next_write_idx = 0; let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer[..packet_len_ping - 4], valid_packet_ids.as_slice(), - &mut tc_cacher, + &tc_cacher, &mut next_write_idx, ); assert_eq!(next_write_idx, 0); diff --git a/satrs/src/encoding/cobs.rs b/satrs/src/encoding/cobs.rs index 47540e8..cea4162 100644 --- a/satrs/src/encoding/cobs.rs +++ b/satrs/src/encoding/cobs.rs @@ -106,19 +106,19 @@ pub(crate) mod tests { #[test] fn test_parsing_simple_packet() { - let mut test_sender = TcCacher::default(); + let test_sender = TcCacher::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut current_idx = 0; encode_simple_packet(&mut encoded_buf, &mut current_idx); let mut next_read_idx = 0; let packets = parse_buffer_for_cobs_encoded_packets( &mut encoded_buf[0..current_idx], - &mut test_sender, + &test_sender, &mut next_read_idx, ) .unwrap(); assert_eq!(packets, 1); - let mut queue = test_sender.tc_queue.borrow_mut(); + let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 1); let packet = &queue[0]; assert_eq!(packet, &SIMPLE_PACKET); @@ -126,7 +126,7 @@ pub(crate) mod tests { #[test] fn test_parsing_consecutive_packets() { - let mut test_sender = TcCacher::default(); + let test_sender = TcCacher::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut current_idx = 0; encode_simple_packet(&mut encoded_buf, &mut current_idx); @@ -140,12 +140,12 @@ pub(crate) mod tests { let mut next_read_idx = 0; let packets = parse_buffer_for_cobs_encoded_packets( &mut encoded_buf[0..current_idx], - &mut test_sender, + &test_sender, &mut next_read_idx, ) .unwrap(); assert_eq!(packets, 2); - let mut queue = test_sender.tc_queue.borrow_mut(); + let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 2); let packet0 = &queue[0]; assert_eq!(packet0, &SIMPLE_PACKET); @@ -155,7 +155,7 @@ pub(crate) mod tests { #[test] fn test_split_tail_packet_only() { - let mut test_sender = TcCacher::default(); + let test_sender = TcCacher::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut current_idx = 0; encode_simple_packet(&mut encoded_buf, &mut current_idx); @@ -163,18 +163,18 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut encoded_buf[0..current_idx - 1], - &mut test_sender, + &test_sender, &mut next_read_idx, ) .unwrap(); assert_eq!(packets, 0); - let mut queue = test_sender.tc_queue.borrow_mut(); + let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 0); assert_eq!(next_read_idx, 0); } fn generic_test_split_packet(cut_off: usize) { - let mut test_sender = TcCacher::default(); + let test_sender = TcCacher::default(); let mut encoded_buf: [u8; 16] = [0; 16]; assert!(cut_off < INVERTED_PACKET.len() + 1); let mut current_idx = 0; @@ -196,12 +196,12 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut encoded_buf[0..current_idx - cut_off], - &mut test_sender, + &test_sender, &mut next_write_idx, ) .unwrap(); assert_eq!(packets, 1); - let mut queue = test_sender.tc_queue.borrow_mut(); + let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 1); assert_eq!(&queue[0], &SIMPLE_PACKET); assert_eq!(next_write_idx, next_expected_write_idx); @@ -225,7 +225,7 @@ pub(crate) mod tests { #[test] fn test_zero_at_end() { - let mut test_sender = TcCacher::default(); + let test_sender = TcCacher::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut next_write_idx = 0; let mut current_idx = 0; @@ -237,12 +237,12 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut encoded_buf[0..current_idx], - &mut test_sender, + &test_sender, &mut next_write_idx, ) .unwrap(); assert_eq!(packets, 1); - let mut queue = test_sender.tc_queue.borrow_mut(); + let queue = test_sender.tc_queue.borrow_mut(); assert_eq!(queue.len(), 1); assert_eq!(&queue[0], &SIMPLE_PACKET); assert_eq!(next_write_idx, 1); @@ -251,18 +251,18 @@ pub(crate) mod tests { #[test] fn test_all_zeroes() { - let mut test_sender = TcCacher::default(); + let test_sender = TcCacher::default(); let mut all_zeroes: [u8; 5] = [0; 5]; let mut next_write_idx = 0; let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut all_zeroes, - &mut test_sender, + &test_sender, &mut next_write_idx, ) .unwrap(); assert_eq!(packets, 0); - let mut queue = test_sender.tc_queue.borrow_mut(); + let queue = test_sender.tc_queue.borrow(); assert!(queue.is_empty()); assert_eq!(next_write_idx, 0); } diff --git a/satrs/src/hal/std/tcp_cobs_server.rs b/satrs/src/hal/std/tcp_cobs_server.rs index a1bfcd2..fbf1022 100644 --- a/satrs/src/hal/std/tcp_cobs_server.rs +++ b/satrs/src/hal/std/tcp_cobs_server.rs @@ -28,14 +28,14 @@ impl TcpTcParser for CobsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut (impl PacketSenderRaw + ?Sized), + tc_sender: &(impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, next_write_idx: &mut usize, ) -> Result<(), TcpTmtcError> { conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( &mut tc_buffer[..current_write_idx], - tc_receiver, + tc_sender, next_write_idx, ) .map_err(|e| TcpTmtcError::TcError(e))?; diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index fe5b068..4e7b918 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -116,17 +116,17 @@ pub trait HandledConnectionHandler { } /// Generic parser abstraction for an object which can parse for telecommands given a raw -/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand -/// receiver. This allows different encoding schemes for telecommands. -pub trait TcpTcParser { +/// bytestream received from a TCP socket and send them using a generic [PacketSenderRaw] +/// implementation. This allows different encoding schemes for telecommands. +pub trait TcpTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut (impl PacketSenderRaw + ?Sized), + tc_sender: &(impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, next_write_idx: &mut usize, - ) -> Result<(), TcpTmtcError>; + ) -> Result<(), TcpTmtcError>; } /// Generic sender abstraction for an object which can pull telemetry from a given TM source @@ -150,7 +150,7 @@ pub trait TcpTmSender { /// through the following 4 core abstractions: /// /// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client. -/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver. +/// 2. Parsed telecommands will be sent using the [PacketSenderRaw] object. /// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client. /// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender]. /// @@ -162,19 +162,19 @@ pub trait TcpTmSender { /// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol. pub struct TcpTmtcGenericServer< TmSource: TmPacketSource, - TcReceiver: PacketSenderRaw, - TmSender: TcpTmSender, - TcParser: TcpTcParser, + TcSender: PacketSenderRaw, + TmSender: TcpTmSender, + TcParser: TcpTcParser, HandledConnection: HandledConnectionHandler, TmError, - TcError, + TcSendError, > { pub finished_handler: HandledConnection, pub(crate) listener: TcpListener, pub(crate) inner_loop_delay: Duration, pub(crate) tm_source: TmSource, pub(crate) tm_buffer: Vec, - pub(crate) tc_receiver: TcReceiver, + pub(crate) tc_receiver: TcSender, pub(crate) tc_buffer: Vec, poll: Poll, events: Events, @@ -185,21 +185,21 @@ pub struct TcpTmtcGenericServer< impl< TmSource: TmPacketSource, - TcReceiver: PacketSenderRaw, - TmSender: TcpTmSender, - TcParser: TcpTcParser, + TcSender: PacketSenderRaw, + TmSender: TcpTmSender, + TcParser: TcpTcParser, HandledConnection: HandledConnectionHandler, TmError: 'static, - TcError: 'static, + TcSendError: 'static, > TcpTmtcGenericServer< TmSource, - TcReceiver, + TcSender, TmSender, TcParser, HandledConnection, TmError, - TcError, + TcSendError, > { /// Create a new generic TMTC server instance. @@ -212,15 +212,15 @@ impl< /// * `tm_sender` - Sends back telemetry to the client using the specified TM source. /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are /// then sent back to the client. - /// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded - /// to this TC receiver. + /// * `tc_sender` - Any received telecommand which was decoded successfully will be forwarded + /// using this TC sender. /// * `stop_signal` - Can be used to stop the server even if a connection is ongoing. pub fn new( cfg: ServerConfig, tc_parser: TcParser, tm_sender: TmSender, tm_source: TmSource, - tc_receiver: TcReceiver, + tc_receiver: TcSender, finished_handler: HandledConnection, stop_signal: Option>, ) -> Result { @@ -290,7 +290,7 @@ impl< pub fn handle_next_connection( &mut self, poll_timeout: Option, - ) -> Result> { + ) -> Result> { let mut handled_connections = 0; // Poll Mio for events. self.poll.poll(&mut self.events, poll_timeout)?; @@ -329,7 +329,7 @@ impl< &mut self, mut stream: TcpStream, addr: SocketAddr, - ) -> Result<(), TcpTmtcError> { + ) -> Result<(), TcpTmtcError> { let mut current_write_idx; let mut next_write_idx = 0; let mut connection_result = HandledConnectionInfo::new(addr); @@ -343,7 +343,7 @@ impl< if current_write_idx > 0 { self.tc_handler.handle_tc_parsing( &mut self.tc_buffer, - &mut self.tc_receiver, + &self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -357,7 +357,7 @@ impl< if current_write_idx == self.tc_buffer.capacity() { self.tc_handler.handle_tc_parsing( &mut self.tc_buffer, - &mut self.tc_receiver, + &self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -371,7 +371,7 @@ impl< std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { self.tc_handler.handle_tc_parsing( &mut self.tc_buffer, - &mut self.tc_receiver, + &self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -420,11 +420,11 @@ impl< #[cfg(test)] pub(crate) mod tests { - use std::sync::{mpsc, Mutex}; + use std::sync::Mutex; use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; - use crate::tmtc::{PacketSenderRaw, TmPacketSource}; + use crate::tmtc::TmPacketSource; use super::*; diff --git a/satrs/src/hal/std/tcp_spacepackets_server.rs b/satrs/src/hal/std/tcp_spacepackets_server.rs index 8f5302f..6e0eae2 100644 --- a/satrs/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs/src/hal/std/tcp_spacepackets_server.rs @@ -32,7 +32,7 @@ impl TcpTcParser + ?Sized), + tc_sender: &(impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, next_write_idx: &mut usize, @@ -41,7 +41,7 @@ impl TcpTcParser Result<(), Self::Error> { -/// assert_eq!(tc_raw.len(), 13); -/// Ok(()) -/// } -/// } -/// -/// let mut buf = [0; 32]; +/// let (packet_sender, packet_receiver) = mpsc::channel(); /// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); -/// let ping_receiver = PingReceiver::default(); -/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver) +/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, packet_sender) /// .expect("Creating UDP TMTC server failed"); /// let sph = SpHeader::new_from_apid(0x02); /// let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); -/// let len = pus_tc -/// .write_to_bytes(&mut buf) -/// .expect("Error writing PUS TC packet"); -/// assert_eq!(len, 13); -/// let client = UdpSocket::bind("127.0.0.1:7778").expect("Connecting to UDP server failed"); +/// // Can not fail. +/// let ping_tc_raw = pus_tc.to_vec().unwrap(); +/// +/// // Now create a UDP client and send the ping telecommand to the server. +/// let client = UdpSocket::bind("127.0.0.1:0").expect("creating UDP client failed"); /// client -/// .send_to(&buf[0..len], dest_addr) +/// .send_to(&ping_tc_raw, dest_addr) /// .expect("Error sending PUS TC via UDP"); +/// let recv_result = udp_tc_server.try_recv_tc(); +/// assert!(recv_result.is_ok()); +/// // The packet is received by the UDP TC server and sent via the mpsc channel. +/// let sent_packet = packet_receiver.try_recv().expect("expected telecommand"); +/// assert_eq!(sent_packet, ping_tc_raw); +/// // No more packets received. +/// matches!(packet_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); /// ``` /// /// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example) diff --git a/satrs/src/mode.rs b/satrs/src/mode.rs index 65519a5..01beac9 100644 --- a/satrs/src/mode.rs +++ b/satrs/src/mode.rs @@ -269,14 +269,8 @@ pub trait ModeReplySender { #[cfg(feature = "alloc")] pub mod alloc_mod { - use crate::{ - mode::ModeRequest, - queue::GenericTargetedMessagingError, - request::{ - MessageMetadata, MessageSender, MessageSenderAndReceiver, MessageSenderMap, - RequestAndReplySenderAndReceiver, RequestId, - }, - ComponentId, + use crate::request::{ + MessageSender, MessageSenderAndReceiver, MessageSenderMap, RequestAndReplySenderAndReceiver, }; use super::*; @@ -558,8 +552,6 @@ pub mod alloc_mod { pub mod std_mod { use std::sync::mpsc; - use crate::request::GenericMessage; - use super::*; pub type ModeRequestHandlerMpsc = ModeRequestHandlerInterface< diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index fca3b77..adf3637 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -349,8 +349,6 @@ pub mod alloc_mod { use super::*; - use crate::pus::verification::VerificationReportingProvider; - /// Extension trait for [EcssTmSenderCore]. /// /// It provides additional functionality, for example by implementing the [Downcast] trait diff --git a/satrs/src/pus/scheduler.rs b/satrs/src/pus/scheduler.rs index b3b9ef2..2d43f78 100644 --- a/satrs/src/pus/scheduler.rs +++ b/satrs/src/pus/scheduler.rs @@ -345,10 +345,7 @@ pub mod alloc_mod { }, vec::Vec, }; - use spacepackets::time::{ - cds::{self, DaysLen24Bits}, - UnixTime, - }; + use spacepackets::time::cds::{self, DaysLen24Bits}; use crate::pool::StoreAddr; diff --git a/satrs/src/pus/verification.rs b/satrs/src/pus/verification.rs index 2cab962..71c4ae9 100644 --- a/satrs/src/pus/verification.rs +++ b/satrs/src/pus/verification.rs @@ -886,7 +886,7 @@ pub mod alloc_mod { use spacepackets::ecss::PusError; use super::*; - use crate::{pus::PusTmVariant, ComponentId}; + use crate::pus::PusTmVariant; use core::cell::RefCell; #[derive(Clone)] diff --git a/satrs/src/request.rs b/satrs/src/request.rs index f2104ed..3999278 100644 --- a/satrs/src/request.rs +++ b/satrs/src/request.rs @@ -193,8 +193,6 @@ impl> MessageReceiverWithId { #[cfg(feature = "alloc")] pub mod alloc_mod { - use core::marker::PhantomData; - use crate::queue::GenericSendError; use super::*; @@ -333,7 +331,7 @@ pub mod std_mod { use super::*; use std::sync::mpsc; - use crate::queue::{GenericReceiveError, GenericSendError, GenericTargetedMessagingError}; + use crate::queue::{GenericReceiveError, GenericSendError}; impl MessageSender for mpsc::Sender> { fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError> { diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index 9ac86fd..253c9f5 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -1,10 +1,12 @@ //! Telemetry and Telecommanding (TMTC) module. Contains packet routing components with special //! support for CCSDS and ECSS packets. //! -//! The distributor modules provided by this module use trait objects provided by the user to -//! directly dispatch received packets to packet listeners based on packet fields like the CCSDS -//! Application Process ID (APID) or the ECSS PUS service type. This allows for fast packet -//! routing without the overhead and complication of using message queues. However, it also requires +//! It is recommended to read the [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/communication.html) +//! about communication first. The TMTC abstractions provided by this framework are based on the +//! assumption that all telemetry is sent to a special handler object called the TM sink while +//! all received telecommands are sent to a special handler object called TC source. Using +//! a design like this makes it simpler to add new TC packet sources or new telemetry generators: +//! They only need to send the received and generated data to these objects. use std::sync::mpsc; #[cfg(feature = "alloc")] @@ -49,9 +51,9 @@ impl PacketSenderRaw for mpsc::SyncSender> { } } -/// Extension trait of [ReceivesTc] which allows downcasting by implementing [Downcast]. +/// Extension trait of [PacketSenderRaw] which allows downcasting by implementing [Downcast]. #[cfg(feature = "alloc")] -pub trait TcSenderRawExt: PacketSenderRaw + Downcast { +pub trait PacketSenderRawExt: PacketSenderRaw + Downcast { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 fn upcast(&self) -> &dyn PacketSenderRaw; @@ -60,10 +62,10 @@ pub trait TcSenderRawExt: PacketSenderRaw + Downcast { fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw; } -/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature -/// is enabled. +/// Blanket implementation to automatically implement [PacketSenderRawExt] when the [alloc] +/// feature is enabled. #[cfg(feature = "alloc")] -impl TcSenderRawExt for T +impl PacketSenderRawExt for T where T: PacketSenderRaw + Send + 'static, { @@ -80,7 +82,7 @@ where } #[cfg(feature = "alloc")] -impl_downcast!(TcSenderRawExt assoc Error); +impl_downcast!(PacketSenderRawExt assoc Error); /// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets /// for CCSDS File Delivery Protocol (CFDP) packets. @@ -132,7 +134,7 @@ pub trait TmPacketSourceExt: TmPacketSource + Downcast { fn upcast_mut(&mut self) -> &mut dyn TmPacketSource; } -/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature +/// Blanket implementation to automatically implement [TmPacketSourceExt] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] impl TmPacketSourceExt for T diff --git a/satrs/tests/tcp_servers.rs b/satrs/tests/tcp_servers.rs index fc2f3d1..209a57c 100644 --- a/satrs/tests/tcp_servers.rs +++ b/satrs/tests/tcp_servers.rs @@ -17,7 +17,7 @@ use core::{ use std::{ io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, - sync::Mutex, + sync::{mpsc, Mutex}, thread, }; @@ -28,7 +28,7 @@ use satrs::{ ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer, }, - tmtc::{PacketSenderRaw, TmPacketSource}, + tmtc::TmPacketSource, }; use spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -61,21 +61,6 @@ impl ConnectionFinishedHandler { assert!(self.connection_info.is_empty()); } } -#[derive(Default, Clone)] -struct SyncTcCacher { - tc_queue: Arc>>>, -} - -impl PacketSenderRaw for SyncTcCacher { - type Error = (); - - fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); - println!("Received TC: {:x?}", tc_raw); - tc_queue.push_back(tc_raw.to_vec()); - Ok(()) - } -} #[derive(Default, Clone)] struct SyncTmSource { @@ -117,14 +102,14 @@ const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, #[test] fn test_cobs_server() { - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, tc_receiver) = mpsc::channel(); let mut tm_source = SyncTmSource::default(); // Insert a telemetry packet which will be read back by the client at a later stage. tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), tm_source, - tc_receiver.clone(), + tc_sender.clone(), ConnectionFinishedHandler::default(), None, ) @@ -190,13 +175,9 @@ fn test_cobs_server() { panic!("connection was not handled properly"); } // Check that the packet was received and decoded successfully. - let mut tc_queue = tc_receiver - .tc_queue - .lock() - .expect("locking tc queue failed"); - assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); - drop(tc_queue); + let tc = tc_receiver.try_recv().expect("no TC received"); + assert_eq!(tc, SIMPLE_PACKET); + matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } const TEST_APID_0: u16 = 0x02; @@ -204,7 +185,7 @@ const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0); #[test] fn test_ccsds_server() { - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, tc_receiver) = mpsc::channel(); let mut tm_source = SyncTmSource::default(); let sph = SpHeader::new_for_unseg_tc(TEST_APID_0, 0, 0); let verif_tm = PusTcCreator::new_simple(sph, 1, 1, &[], true); @@ -215,7 +196,7 @@ fn test_ccsds_server() { let mut tcp_server = TcpSpacepacketsServer::new( ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), tm_source, - tc_receiver.clone(), + tc_sender, packet_id_lookup, ConnectionFinishedHandler::default(), None, @@ -282,7 +263,7 @@ fn test_ccsds_server() { panic!("connection was not handled properly"); } // Check that TC has arrived. - let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); - assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), tc_0); + let tc = tc_receiver.try_recv().expect("no TC received"); + assert_eq!(tc, tc_0); + matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); }