From 12320c04ae55ceadeca2de2834c37f49711a669f Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sun, 14 Apr 2024 13:07:39 +0200 Subject: [PATCH] thats a larger change --- satrs-example/src/interface/tcp.rs | 6 +- satrs-example/src/interface/udp.rs | 12 +-- satrs-example/src/pus/scheduler.rs | 4 +- satrs/CHANGELOG.md | 3 +- satrs/src/encoding/ccsds.rs | 45 +++++------ satrs/src/encoding/cobs.rs | 40 +++++----- satrs/src/encoding/mod.rs | 13 ++-- satrs/src/hal/std/tcp_cobs_server.rs | 80 ++++++++++---------- satrs/src/hal/std/tcp_server.rs | 26 ++----- satrs/src/hal/std/tcp_spacepackets_server.rs | 50 ++++++------ satrs/src/hal/std/udp_server.rs | 25 +++--- satrs/src/tmtc/mod.rs | 28 +++---- satrs/src/tmtc/tc_helper.rs | 31 ++++---- satrs/tests/tcp_servers.rs | 6 +- 14 files changed, 184 insertions(+), 185 deletions(-) diff --git a/satrs-example/src/interface/tcp.rs b/satrs-example/src/interface/tcp.rs index fc13493..3a0741d 100644 --- a/satrs-example/src/interface/tcp.rs +++ b/satrs-example/src/interface/tcp.rs @@ -9,7 +9,7 @@ use log::{info, warn}; use satrs::{ hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, spacepackets::PacketId, - tmtc::{ReceivesTc, TmPacketSource}, + tmtc::{PacketSenderRaw, TmPacketSource}, }; #[derive(Default)] @@ -89,12 +89,12 @@ pub type TcpServer = TcpSpacepacketsServer< SendError, >; -pub struct TcpTask, SendError: Debug + 'static>( +pub struct TcpTask, SendError: Debug + 'static>( pub TcpServer, PhantomData, ); -impl, SendError: Debug + 'static> +impl, SendError: Debug + 'static> TcpTask { pub fn new( diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index 0ba4b81..d10a0f4 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -4,7 +4,7 @@ use std::sync::mpsc; use log::{info, warn}; use satrs::pus::{PusTmAsVec, PusTmInPool}; -use satrs::tmtc::ReceivesTc; +use satrs::tmtc::PacketSenderRaw; use satrs::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, pool::{PoolProviderWithGuards, SharedStaticMemoryPool}, @@ -68,7 +68,7 @@ impl UdpTmHandler for DynamicUdpTmHandler { } pub struct UdpTmtcServer< - TcSender: ReceivesTc, + TcSender: PacketSenderRaw, TmHandler: UdpTmHandler, SendError, > { @@ -77,7 +77,7 @@ pub struct UdpTmtcServer< } impl< - TcSender: ReceivesTc, + TcSender: PacketSenderRaw, TmHandler: UdpTmHandler, SendError: Debug + 'static, > UdpTmtcServer @@ -126,7 +126,7 @@ mod tests { ecss::{tc::PusTcCreator, WritablePusPacket}, SpHeader, }, - tmtc::ReceivesTc, + tmtc::PacketSenderRaw, }; use satrs_example::config::{components, OBSW_SERVER_ADDR}; @@ -137,9 +137,9 @@ mod tests { tc_vec: VecDeque>, } - impl ReceivesTc for TestSender { + impl PacketSenderRaw for TestSender { type Error = (); - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { self.tc_vec.push_back(tc_raw.to_vec()); Ok(()) } diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 3b73036..65a4ea7 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -24,9 +24,9 @@ pub trait TcReleaser { impl TcReleaser for TcSenderSharedPool { fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool { if enabled { + let shared_pool = self.shared_pool.get_mut(); // Transfer TC from scheduler TC pool to shared TC pool. - let released_tc_addr = self - .shared_pool + let released_tc_addr = shared_pool .0 .write() .expect("locking pool failed") diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index f1c8a2e..51f0373 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -28,7 +28,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## Changed -- Renamed `ReceivesTcCore` to `ReceivesTc`. +- Renamed `ReceivesTcCore` to `PacketSenderRaw` to better show its primary purpose. It now contains + a `send_raw_tc` method which is not mutable anymore. - Renamed `TmPacketSourceCore` to `TmPacketSource`. - TCP server generics order. The error generics come last now. - `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root. diff --git a/satrs/src/encoding/ccsds.rs b/satrs/src/encoding/ccsds.rs index f7a2529..7cf97bd 100644 --- a/satrs/src/encoding/ccsds.rs +++ b/satrs/src/encoding/ccsds.rs @@ -1,4 +1,4 @@ -use crate::{tmtc::ReceivesTc, ValidatorU16Id}; +use crate::{tmtc::PacketSenderRaw, ValidatorU16Id}; /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the /// [spacepackets::PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet @@ -12,12 +12,12 @@ use crate::{tmtc::ReceivesTc, ValidatorU16Id}; /// The parser will write all packets which were decoded successfully to the given `tc_receiver` /// and return the number of packets found. If the [ReceivesTc::pass_tc] calls fails, the /// error will be returned. -pub fn parse_buffer_for_ccsds_space_packets( +pub fn parse_buffer_for_ccsds_space_packets( buf: &mut [u8], packet_id_validator: &(impl ValidatorU16Id + ?Sized), - tc_receiver: &mut (impl ReceivesTc + ?Sized), + packet_sender: &(impl PacketSenderRaw + ?Sized), next_write_idx: &mut usize, -) -> Result { +) -> Result { *next_write_idx = 0; let mut packets_found = 0; let mut current_idx = 0; @@ -32,7 +32,7 @@ pub fn parse_buffer_for_ccsds_space_packets( u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap()); let packet_size = length_field + 7; if (current_idx + packet_size as usize) <= buf_len { - tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?; + packet_sender.send_raw_tc(&buf[current_idx..current_idx + packet_size as usize])?; packets_found += 1; } else { // Move packet to start of buffer if applicable. @@ -85,11 +85,9 @@ mod tests { assert!(parse_result.is_ok()); let parsed_packets = parse_result.unwrap(); assert_eq!(parsed_packets, 1); - assert_eq!(tc_cacher.tc_queue.len(), 1); - assert_eq!( - tc_cacher.tc_queue.pop_front().unwrap(), - buffer[..packet_len] - ); + let mut queue = tc_cacher.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 1); + assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len]); } #[test] @@ -116,13 +114,11 @@ mod tests { assert!(parse_result.is_ok()); let parsed_packets = parse_result.unwrap(); assert_eq!(parsed_packets, 2); - assert_eq!(tc_cacher.tc_queue.len(), 2); + let mut queue = tc_cacher.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 2); + assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len_ping]); assert_eq!( - tc_cacher.tc_queue.pop_front().unwrap(), - buffer[..packet_len_ping] - ); - assert_eq!( - tc_cacher.tc_queue.pop_front().unwrap(), + queue.pop_front().unwrap(), buffer[packet_len_ping..packet_len_ping + packet_len_action] ); } @@ -152,13 +148,11 @@ mod tests { assert!(parse_result.is_ok()); let parsed_packets = parse_result.unwrap(); assert_eq!(parsed_packets, 2); - assert_eq!(tc_cacher.tc_queue.len(), 2); + let mut queue = tc_cacher.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 2); + assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len_ping]); assert_eq!( - tc_cacher.tc_queue.pop_front().unwrap(), - buffer[..packet_len_ping] - ); - assert_eq!( - tc_cacher.tc_queue.pop_front().unwrap(), + queue.pop_front().unwrap(), buffer[packet_len_ping..packet_len_ping + packet_len_action] ); } @@ -188,7 +182,9 @@ mod tests { assert!(parse_result.is_ok()); let parsed_packets = parse_result.unwrap(); assert_eq!(parsed_packets, 1); - assert_eq!(tc_cacher.tc_queue.len(), 1); + + let queue = tc_cacher.tc_queue.borrow(); + assert_eq!(queue.len(), 1); // The broken packet was moved to the start, so the next write index should be after the // last segment missing 4 bytes. assert_eq!(next_write_idx, packet_len_action - 4); @@ -215,6 +211,7 @@ mod tests { assert!(parse_result.is_ok()); let parsed_packets = parse_result.unwrap(); assert_eq!(parsed_packets, 0); - assert_eq!(tc_cacher.tc_queue.len(), 0); + let queue = tc_cacher.tc_queue.borrow(); + assert_eq!(queue.len(), 0); } } diff --git a/satrs/src/encoding/cobs.rs b/satrs/src/encoding/cobs.rs index 9f8d1df..47540e8 100644 --- a/satrs/src/encoding/cobs.rs +++ b/satrs/src/encoding/cobs.rs @@ -1,4 +1,4 @@ -use crate::tmtc::ReceivesTc; +use crate::tmtc::PacketSenderRaw; use cobs::{decode_in_place, encode, max_encoding_length}; /// This function encodes the given packet with COBS and also wraps the encoded packet with @@ -55,11 +55,11 @@ pub fn encode_packet_with_cobs( /// future write operations will be written to the `next_write_idx` argument. /// /// The parser will write all packets which were decoded successfully to the given `tc_receiver`. -pub fn parse_buffer_for_cobs_encoded_packets( +pub fn parse_buffer_for_cobs_encoded_packets( buf: &mut [u8], - tc_receiver: &mut (impl ReceivesTc + ?Sized), + packet_sender: &(impl PacketSenderRaw + ?Sized), next_write_idx: &mut usize, -) -> Result { +) -> Result { let mut start_index_packet = 0; let mut start_found = false; let mut last_byte = false; @@ -78,8 +78,8 @@ pub fn parse_buffer_for_cobs_encoded_packets( let decode_result = decode_in_place(&mut buf[start_index_packet..i]); if let Ok(packet_len) = decode_result { packets_found += 1; - tc_receiver - .pass_tc(&buf[start_index_packet..start_index_packet + packet_len])?; + packet_sender + .send_raw_tc(&buf[start_index_packet..start_index_packet + packet_len])?; } start_found = false; } else { @@ -118,8 +118,9 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(packets, 1); - assert_eq!(test_sender.tc_queue.len(), 1); - let packet = &test_sender.tc_queue[0]; + let mut queue = test_sender.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 1); + let packet = &queue[0]; assert_eq!(packet, &SIMPLE_PACKET); } @@ -144,10 +145,11 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(packets, 2); - assert_eq!(test_sender.tc_queue.len(), 2); - let packet0 = &test_sender.tc_queue[0]; + let mut queue = test_sender.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 2); + let packet0 = &queue[0]; assert_eq!(packet0, &SIMPLE_PACKET); - let packet1 = &test_sender.tc_queue[1]; + let packet1 = &queue[1]; assert_eq!(packet1, &INVERTED_PACKET); } @@ -166,7 +168,8 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(packets, 0); - assert_eq!(test_sender.tc_queue.len(), 0); + let mut queue = test_sender.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 0); assert_eq!(next_read_idx, 0); } @@ -198,8 +201,9 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(packets, 1); - assert_eq!(test_sender.tc_queue.len(), 1); - assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET); + let mut queue = test_sender.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 1); + assert_eq!(&queue[0], &SIMPLE_PACKET); assert_eq!(next_write_idx, next_expected_write_idx); assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start); } @@ -238,8 +242,9 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(packets, 1); - assert_eq!(test_sender.tc_queue.len(), 1); - assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET); + let mut queue = test_sender.tc_queue.borrow_mut(); + assert_eq!(queue.len(), 1); + assert_eq!(&queue[0], &SIMPLE_PACKET); assert_eq!(next_write_idx, 1); assert_eq!(encoded_buf[0], 0); } @@ -257,7 +262,8 @@ pub(crate) mod tests { ) .unwrap(); assert_eq!(packets, 0); - assert!(test_sender.tc_queue.is_empty()); + let mut queue = test_sender.tc_queue.borrow_mut(); + assert!(queue.is_empty()); assert_eq!(next_write_idx, 0); } } diff --git a/satrs/src/encoding/mod.rs b/satrs/src/encoding/mod.rs index 71bd169..ded8c9f 100644 --- a/satrs/src/encoding/mod.rs +++ b/satrs/src/encoding/mod.rs @@ -6,9 +6,11 @@ pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_e #[cfg(test)] pub(crate) mod tests { + use core::cell::RefCell; + use alloc::{collections::VecDeque, vec::Vec}; - use crate::tmtc::ReceivesTc; + use crate::tmtc::PacketSenderRaw; use super::cobs::encode_packet_with_cobs; @@ -17,14 +19,15 @@ pub(crate) mod tests { #[derive(Default)] pub(crate) struct TcCacher { - pub(crate) tc_queue: VecDeque>, + pub(crate) tc_queue: RefCell>>, } - impl ReceivesTc for TcCacher { + impl PacketSenderRaw for TcCacher { type Error = (); - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.tc_queue.push_back(tc_raw.to_vec()); + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut mut_queue = self.tc_queue.borrow_mut(); + mut_queue.push_back(tc_raw.to_vec()); Ok(()) } } diff --git a/satrs/src/hal/std/tcp_cobs_server.rs b/satrs/src/hal/std/tcp_cobs_server.rs index 6b334fc..a1bfcd2 100644 --- a/satrs/src/hal/std/tcp_cobs_server.rs +++ b/satrs/src/hal/std/tcp_cobs_server.rs @@ -10,7 +10,7 @@ use std::net::SocketAddr; use std::vec::Vec; use crate::encoding::parse_buffer_for_cobs_encoded_packets; -use crate::tmtc::ReceivesTc; +use crate::tmtc::PacketSenderRaw; use crate::tmtc::TmPacketSource; use crate::hal::std::tcp_server::{ @@ -28,7 +28,7 @@ impl TcpTcParser for CobsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut (impl ReceivesTc + ?Sized), + tc_receiver: &mut (impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, next_write_idx: &mut usize, @@ -116,25 +116,25 @@ impl TcpTmSender for CobsTmSender { /// test also serves as the example application for this module. pub struct TcpTmtcInCobsServer< TmSource: TmPacketSource, - TcReceiver: ReceivesTc, + TcSender: PacketSenderRaw, HandledConnection: HandledConnectionHandler, TmError, - TcError: 'static, + SendError: 'static, > { pub generic_server: TcpTmtcGenericServer< TmSource, - TcReceiver, + TcSender, CobsTmSender, CobsTcParser, HandledConnection, TmError, - TcError, + SendError, >, } impl< TmSource: TmPacketSource, - TcReceiver: ReceivesTc, + TcReceiver: PacketSenderRaw, HandledConnection: HandledConnectionHandler, TmError: 'static, TcError: 'static, @@ -196,18 +196,21 @@ mod tests { use std::{ io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, - panic, thread, + panic, + sync::mpsc, + thread, time::Instant, }; use crate::{ encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, hal::std::tcp_server::{ - tests::{ConnectionFinishedHandler, SyncTcCacher, SyncTmSource}, + tests::{ConnectionFinishedHandler, SyncTmSource}, ConnectionResult, ServerConfig, }, + queue::GenericSendError, }; - use alloc::sync::Arc; + use alloc::{sync::Arc, vec::Vec}; use cobs::encode; use super::TcpTmtcInCobsServer; @@ -230,14 +233,20 @@ mod tests { fn generic_tmtc_server( addr: &SocketAddr, - tc_receiver: SyncTcCacher, + tc_sender: mpsc::Sender>, tm_source: SyncTmSource, stop_signal: Option>, - ) -> TcpTmtcInCobsServer { + ) -> TcpTmtcInCobsServer< + SyncTmSource, + mpsc::Sender>, + ConnectionFinishedHandler, + (), + GenericSendError, + > { TcpTmtcInCobsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), tm_source, - tc_receiver, + tc_sender, ConnectionFinishedHandler::default(), stop_signal, ) @@ -247,10 +256,10 @@ mod tests { #[test] fn test_server_basic_no_tm() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, tc_receiver) = mpsc::channel(); let tm_source = SyncTmSource::default(); let mut tcp_server = - generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None); + generic_tmtc_server(&auto_port_addr, tc_sender.clone(), tm_source, None); let dest_addr = tcp_server .local_addr() .expect("retrieving dest addr failed"); @@ -293,28 +302,20 @@ mod tests { panic!("connection was not handled properly"); } // Check that the packet was received and decoded successfully. - let mut tc_queue = tc_receiver - .tc_queue - .lock() - .expect("locking tc queue failed"); - assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); - drop(tc_queue); + let packet = tc_receiver.recv().expect("receiving TC failed"); + assert_eq!(packet, &SIMPLE_PACKET); + matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } #[test] fn test_server_basic_multi_tm_multi_tc() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, tc_receiver) = mpsc::channel(); let mut tm_source = SyncTmSource::default(); tm_source.add_tm(&INVERTED_PACKET); tm_source.add_tm(&SIMPLE_PACKET); - let mut tcp_server = generic_tmtc_server( - &auto_port_addr, - tc_receiver.clone(), - tm_source.clone(), - None, - ); + let mut tcp_server = + generic_tmtc_server(&auto_port_addr, tc_sender.clone(), tm_source.clone(), None); let dest_addr = tcp_server .local_addr() .expect("retrieving dest addr failed"); @@ -409,23 +410,20 @@ mod tests { panic!("connection was not handled properly"); } // Check that the packet was received and decoded successfully. - let mut tc_queue = tc_receiver - .tc_queue - .lock() - .expect("locking tc queue failed"); - assert_eq!(tc_queue.len(), 2); - assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); - assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET); - drop(tc_queue); + let packet = tc_receiver.recv().expect("receiving TC failed"); + assert_eq!(packet, &SIMPLE_PACKET); + let packet = tc_receiver.recv().expect("receiving TC failed"); + assert_eq!(packet, &INVERTED_PACKET); + matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } #[test] fn test_server_accept_timeout() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, _tc_receiver) = mpsc::channel(); let tm_source = SyncTmSource::default(); let mut tcp_server = - generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None); + generic_tmtc_server(&auto_port_addr, tc_sender.clone(), tm_source, None); let start = Instant::now(); // Call the connection handler in separate thread, does block. let thread_jh = thread::spawn(move || loop { @@ -447,12 +445,12 @@ mod tests { #[test] fn test_server_stop_signal() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, _tc_receiver) = mpsc::channel(); let tm_source = SyncTmSource::default(); let stop_signal = Arc::new(AtomicBool::new(false)); let mut tcp_server = generic_tmtc_server( &auto_port_addr, - tc_receiver.clone(), + tc_sender.clone(), tm_source, Some(stop_signal.clone()), ); diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index e6b91ca..fe5b068 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -13,7 +13,7 @@ use std::net::SocketAddr; // use std::net::{SocketAddr, TcpStream}; use std::thread; -use crate::tmtc::{ReceivesTc, TmPacketSource}; +use crate::tmtc::{PacketSenderRaw, TmPacketSource}; use thiserror::Error; // Re-export the TMTC in COBS server. @@ -122,7 +122,7 @@ pub trait TcpTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut (impl ReceivesTc + ?Sized), + tc_receiver: &mut (impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, next_write_idx: &mut usize, @@ -162,7 +162,7 @@ pub trait TcpTmSender { /// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol. pub struct TcpTmtcGenericServer< TmSource: TmPacketSource, - TcReceiver: ReceivesTc, + TcReceiver: PacketSenderRaw, TmSender: TcpTmSender, TcParser: TcpTcParser, HandledConnection: HandledConnectionHandler, @@ -185,7 +185,7 @@ pub struct TcpTmtcGenericServer< impl< TmSource: TmPacketSource, - TcReceiver: ReceivesTc, + TcReceiver: PacketSenderRaw, TmSender: TcpTmSender, TcParser: TcpTcParser, HandledConnection: HandledConnectionHandler, @@ -420,28 +420,14 @@ impl< #[cfg(test)] pub(crate) mod tests { - use std::sync::Mutex; + use std::sync::{mpsc, Mutex}; use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; - use crate::tmtc::{ReceivesTc, TmPacketSource}; + use crate::tmtc::{PacketSenderRaw, TmPacketSource}; use super::*; - #[derive(Default, Clone)] - pub(crate) struct SyncTcCacher { - pub(crate) tc_queue: Arc>>>, - } - impl ReceivesTc for SyncTcCacher { - type Error = (); - - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); - tc_queue.push_back(tc_raw.to_vec()); - Ok(()) - } - } - #[derive(Default, Clone)] pub(crate) struct SyncTmSource { tm_queue: Arc>>>, diff --git a/satrs/src/hal/std/tcp_spacepackets_server.rs b/satrs/src/hal/std/tcp_spacepackets_server.rs index 17415b5..8f5302f 100644 --- a/satrs/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs/src/hal/std/tcp_spacepackets_server.rs @@ -6,7 +6,7 @@ use std::{io::Write, net::SocketAddr}; use crate::{ encoding::parse_buffer_for_ccsds_space_packets, - tmtc::{ReceivesTc, TmPacketSource}, + tmtc::{PacketSenderRaw, TmPacketSource}, ValidatorU16Id, }; @@ -32,7 +32,7 @@ impl TcpTcParser + ?Sized), + tc_receiver: &mut (impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, next_write_idx: &mut usize, @@ -94,7 +94,7 @@ impl TcpTmSender for SpacepacketsTmSender { /// also serves as the example application for this module. pub struct TcpSpacepacketsServer< TmSource: TmPacketSource, - TcSender: ReceivesTc, + TcSender: PacketSenderRaw, PacketIdChecker: ValidatorU16Id, HandledConnection: HandledConnectionHandler, TmError, @@ -113,7 +113,7 @@ pub struct TcpSpacepacketsServer< impl< TmSource: TmPacketSource, - TcReceiver: ReceivesTc, + TcReceiver: PacketSenderRaw, PacketIdChecker: ValidatorU16Id, HandledConnection: HandledConnectionHandler, TmError: 'static, @@ -187,19 +187,23 @@ mod tests { use std::{ io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + sync::mpsc, thread, }; - use alloc::sync::Arc; + use alloc::{sync::Arc, vec::Vec}; use hashbrown::HashSet; use spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, PacketId, SpHeader, }; - use crate::hal::std::tcp_server::{ - tests::{ConnectionFinishedHandler, SyncTcCacher, SyncTmSource}, - ConnectionResult, ServerConfig, + use crate::{ + hal::std::tcp_server::{ + tests::{ConnectionFinishedHandler, SyncTmSource}, + ConnectionResult, ServerConfig, + }, + queue::GenericSendError, }; use super::TcpSpacepacketsServer; @@ -211,17 +215,17 @@ mod tests { fn generic_tmtc_server( addr: &SocketAddr, - tc_receiver: SyncTcCacher, + tc_receiver: mpsc::Sender>, tm_source: SyncTmSource, packet_id_lookup: HashSet, stop_signal: Option>, ) -> TcpSpacepacketsServer< SyncTmSource, - SyncTcCacher, + mpsc::Sender>, HashSet, ConnectionFinishedHandler, (), - (), + GenericSendError, > { TcpSpacepacketsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), @@ -237,13 +241,13 @@ mod tests { #[test] fn test_basic_tc_only() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, tc_receiver) = mpsc::channel(); let tm_source = SyncTmSource::default(); let mut packet_id_lookup = HashSet::new(); packet_id_lookup.insert(TEST_PACKET_ID_0); let mut tcp_server = generic_tmtc_server( &auto_port_addr, - tc_receiver.clone(), + tc_sender.clone(), tm_source, packet_id_lookup, None, @@ -289,16 +293,15 @@ mod tests { if !conn_handled.load(Ordering::Relaxed) { panic!("connection was not handled properly"); } - // Check that TC has arrived. - let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); - assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), tc_0); + let packet = tc_receiver.try_recv().expect("receiving TC failed"); + assert_eq!(packet, tc_0); + matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } #[test] fn test_multi_tc_multi_tm() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); + let (tc_sender, tc_receiver) = mpsc::channel(); let mut tm_source = SyncTmSource::default(); // Add telemetry @@ -320,7 +323,7 @@ mod tests { packet_id_lookup.insert(TEST_PACKET_ID_1); let mut tcp_server = generic_tmtc_server( &auto_port_addr, - tc_receiver.clone(), + tc_sender.clone(), tm_source, packet_id_lookup, None, @@ -397,9 +400,10 @@ mod tests { panic!("connection was not handled properly"); } // Check that TC has arrived. - let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); - assert_eq!(tc_queue.len(), 2); - assert_eq!(tc_queue.pop_front().unwrap(), tc_0); - assert_eq!(tc_queue.pop_front().unwrap(), tc_1); + let packet_0 = tc_receiver.try_recv().expect("receiving TC failed"); + assert_eq!(packet_0, tc_0); + let packet_1 = tc_receiver.try_recv().expect("receiving TC failed"); + assert_eq!(packet_1, tc_1); + matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } } diff --git a/satrs/src/hal/std/udp_server.rs b/satrs/src/hal/std/udp_server.rs index 8bcae13..7c22bea 100644 --- a/satrs/src/hal/std/udp_server.rs +++ b/satrs/src/hal/std/udp_server.rs @@ -1,5 +1,5 @@ //! Generic UDP TC server. -use crate::tmtc::ReceivesTc; +use crate::tmtc::PacketSenderRaw; use core::fmt::Debug; use std::io::{self, ErrorKind}; use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; @@ -58,7 +58,7 @@ use std::vec::Vec; /// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67) /// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port /// and then forwards them to a generic CCSDS packet receiver. -pub struct UdpTcServer, SendError> { +pub struct UdpTcServer, SendError> { pub socket: UdpSocket, recv_buf: Vec, sender_addr: Option, @@ -75,7 +75,7 @@ pub enum ReceiveResult { Send(SendError), } -impl, SendError: Debug + 'static> +impl, SendError: Debug + 'static> UdpTcServer { pub fn new( @@ -107,7 +107,7 @@ impl, SendError: Debug + 'static> let (num_bytes, from) = res; self.sender_addr = Some(from); self.tc_sender - .pass_tc(&self.recv_buf[0..num_bytes]) + .send_raw_tc(&self.recv_buf[0..num_bytes]) .map_err(ReceiveResult::Send)?; Ok(res) } @@ -121,7 +121,8 @@ impl, SendError: Debug + 'static> mod tests { use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use crate::queue::GenericSendError; - use crate::tmtc::ReceivesTc; + use crate::tmtc::PacketSenderRaw; + use core::cell::RefCell; use spacepackets::ecss::tc::PusTcCreator; use spacepackets::ecss::WritablePusPacket; use spacepackets::SpHeader; @@ -133,16 +134,17 @@ mod tests { #[derive(Default)] struct PingReceiver { - pub sent_cmds: VecDeque>, + pub sent_cmds: RefCell>>, } - impl ReceivesTc for PingReceiver { + impl PacketSenderRaw for PingReceiver { type Error = GenericSendError; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { let mut sent_data = Vec::new(); sent_data.extend_from_slice(tc_raw); - self.sent_cmds.push_back(sent_data); + let mut queue = self.sent_cmds.borrow_mut(); + queue.push_back(sent_data); Ok(()) } } @@ -174,8 +176,9 @@ mod tests { local_addr ); let ping_receiver = &mut udp_tc_server.tc_sender; - assert_eq!(ping_receiver.sent_cmds.len(), 1); - let sent_cmd = ping_receiver.sent_cmds.pop_front().unwrap(); + let mut queue = ping_receiver.sent_cmds.borrow_mut(); + assert_eq!(queue.len(), 1); + let sent_cmd = queue.pop_front().unwrap(); assert_eq!(sent_cmd, buf[0..len]); } diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index 762aad0..9ac86fd 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -22,26 +22,26 @@ use crate::queue::GenericSendError; /// /// This trait can also be implemented for sender components which forward the packet. /// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender]. -pub trait ReceivesTc: Send { +pub trait PacketSenderRaw: Send { type Error; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error>; + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error>; } #[cfg(feature = "std")] -impl ReceivesTc for mpsc::Sender> { +impl PacketSenderRaw for mpsc::Sender> { type Error = GenericSendError; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { self.send(tc_raw.to_vec()) .map_err(|_| GenericSendError::RxDisconnected) } } #[cfg(feature = "std")] -impl ReceivesTc for mpsc::SyncSender> { +impl PacketSenderRaw for mpsc::SyncSender> { type Error = GenericSendError; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { self.try_send(tc_raw.to_vec()).map_err(|e| match e { mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, @@ -51,36 +51,36 @@ impl ReceivesTc for mpsc::SyncSender> { /// Extension trait of [ReceivesTc] which allows downcasting by implementing [Downcast]. #[cfg(feature = "alloc")] -pub trait ReceivesTcExt: ReceivesTc + Downcast { +pub trait TcSenderRawExt: PacketSenderRaw + Downcast { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn ReceivesTc; + fn upcast(&self) -> &dyn PacketSenderRaw; // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn ReceivesTc; + fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw; } /// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] -impl ReceivesTcExt for T +impl TcSenderRawExt for T where - T: ReceivesTc + Send + 'static, + T: PacketSenderRaw + Send + 'static, { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn ReceivesTc { + fn upcast(&self) -> &dyn PacketSenderRaw { self } // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn ReceivesTc { + fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw { self } } #[cfg(feature = "alloc")] -impl_downcast!(ReceivesTcExt assoc Error); +impl_downcast!(TcSenderRawExt assoc Error); /// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets /// for CCSDS File Delivery Protocol (CFDP) packets. diff --git a/satrs/src/tmtc/tc_helper.rs b/satrs/src/tmtc/tc_helper.rs index dd22989..47ee8d0 100644 --- a/satrs/src/tmtc/tc_helper.rs +++ b/satrs/src/tmtc/tc_helper.rs @@ -1,3 +1,4 @@ +use core::cell::RefCell; use std::sync::mpsc; use spacepackets::{ecss::tc::PusTcReader, SpHeader}; @@ -9,7 +10,7 @@ use crate::{ queue::GenericSendError, }; -use super::{ReceivesCcsdsTc, ReceivesTc}; +use super::{PacketSenderRaw, ReceivesCcsdsTc}; #[derive(Debug, Clone, PartialEq, Eq, Error)] pub enum StoreAndSendError { @@ -47,28 +48,31 @@ impl SharedTcPool { #[derive(Clone)] pub struct TcSenderSharedPool { pub tc_source: mpsc::SyncSender, - pub shared_pool: SharedTcPool, + pub shared_pool: RefCell, } impl TcSenderSharedPool { pub fn new(tc_source: mpsc::SyncSender, shared_pool: SharedTcPool) -> Self { Self { tc_source, - shared_pool, + shared_pool: RefCell::new(shared_pool), } } #[allow(dead_code)] pub fn shared_pool(&self) -> SharedStaticMemoryPool { - self.shared_pool.0.clone() + let pool = self.shared_pool.borrow(); + pool.0.clone() } } -impl ReceivesTc for TcSenderSharedPool { +impl PacketSenderRaw for TcSenderSharedPool { type Error = StoreAndSendError; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let addr = self.shared_pool.add_raw_tc(tc_raw)?; + fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut shared_pool = self.shared_pool.borrow_mut(); + let addr = shared_pool.add_raw_tc(tc_raw)?; + drop(shared_pool); self.tc_source.try_send(addr).map_err(|e| match e { mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, @@ -81,7 +85,9 @@ impl ReceivesEcssPusTc for TcSenderSharedPool { type Error = StoreAndSendError; fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - let addr = self.shared_pool.add_pus_tc(pus_tc)?; + let mut shared_pool = self.shared_pool.borrow_mut(); + let addr = shared_pool.add_raw_tc(pus_tc.raw_data())?; + drop(shared_pool); self.tc_source.try_send(addr).map_err(|e| match e { mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, @@ -93,13 +99,8 @@ impl ReceivesEcssPusTc for TcSenderSharedPool { impl ReceivesCcsdsTc for TcSenderSharedPool { type Error = StoreAndSendError; - fn pass_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - let addr = self.shared_pool.add_ccsds_tc(sp_header, tc_raw)?; - self.tc_source.try_send(addr).map_err(|e| match e { - mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), - mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, - })?; - Ok(()) + fn pass_ccsds(&mut self, _sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.send_raw_tc(tc_raw) } } diff --git a/satrs/tests/tcp_servers.rs b/satrs/tests/tcp_servers.rs index 9995e63..fc2f3d1 100644 --- a/satrs/tests/tcp_servers.rs +++ b/satrs/tests/tcp_servers.rs @@ -28,7 +28,7 @@ use satrs::{ ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer, }, - tmtc::{ReceivesTc, TmPacketSource}, + tmtc::{PacketSenderRaw, TmPacketSource}, }; use spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -66,10 +66,10 @@ struct SyncTcCacher { tc_queue: Arc>>>, } -impl ReceivesTc for SyncTcCacher { +impl PacketSenderRaw for SyncTcCacher { type Error = (); - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); println!("Received TC: {:x?}", tc_raw); tc_queue.push_back(tc_raw.to_vec());