thats a larger change

This commit is contained in:
Robin Müller 2024-04-14 13:07:39 +02:00
parent 6f6ad5cafd
commit 12320c04ae
Signed by: muellerr
GPG Key ID: A649FB78196E3849
14 changed files with 184 additions and 185 deletions

View File

@ -9,7 +9,7 @@ use log::{info, warn};
use satrs::{
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
spacepackets::PacketId,
tmtc::{ReceivesTc, TmPacketSource},
tmtc::{PacketSenderRaw, TmPacketSource},
};
#[derive(Default)]
@ -89,12 +89,12 @@ pub type TcpServer<ReceivesTc, SendError> = TcpSpacepacketsServer<
SendError,
>;
pub struct TcpTask<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>(
pub struct TcpTask<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>(
pub TcpServer<TcSender, SendError>,
PhantomData<SendError>,
);
impl<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>
impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
TcpTask<TcSender, SendError>
{
pub fn new(

View File

@ -4,7 +4,7 @@ use std::sync::mpsc;
use log::{info, warn};
use satrs::pus::{PusTmAsVec, PusTmInPool};
use satrs::tmtc::ReceivesTc;
use satrs::tmtc::PacketSenderRaw;
use satrs::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{PoolProviderWithGuards, SharedStaticMemoryPool},
@ -68,7 +68,7 @@ impl UdpTmHandler for DynamicUdpTmHandler {
}
pub struct UdpTmtcServer<
TcSender: ReceivesTc<Error = SendError>,
TcSender: PacketSenderRaw<Error = SendError>,
TmHandler: UdpTmHandler,
SendError,
> {
@ -77,7 +77,7 @@ pub struct UdpTmtcServer<
}
impl<
TcSender: ReceivesTc<Error = SendError>,
TcSender: PacketSenderRaw<Error = SendError>,
TmHandler: UdpTmHandler,
SendError: Debug + 'static,
> UdpTmtcServer<TcSender, TmHandler, SendError>
@ -126,7 +126,7 @@ mod tests {
ecss::{tc::PusTcCreator, WritablePusPacket},
SpHeader,
},
tmtc::ReceivesTc,
tmtc::PacketSenderRaw,
};
use satrs_example::config::{components, OBSW_SERVER_ADDR};
@ -137,9 +137,9 @@ mod tests {
tc_vec: VecDeque<Vec<u8>>,
}
impl ReceivesTc for TestSender {
impl PacketSenderRaw for TestSender {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_vec.push_back(tc_raw.to_vec());
Ok(())
}

View File

@ -24,9 +24,9 @@ pub trait TcReleaser {
impl TcReleaser for TcSenderSharedPool {
fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool {
if enabled {
let shared_pool = self.shared_pool.get_mut();
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = self
.shared_pool
let released_tc_addr = shared_pool
.0
.write()
.expect("locking pool failed")

View File

@ -28,7 +28,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## Changed
- Renamed `ReceivesTcCore` to `ReceivesTc`.
- Renamed `ReceivesTcCore` to `PacketSenderRaw` to better show its primary purpose. It now contains
a `send_raw_tc` method which is not mutable anymore.
- Renamed `TmPacketSourceCore` to `TmPacketSource`.
- TCP server generics order. The error generics come last now.
- `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root.

View File

@ -1,4 +1,4 @@
use crate::{tmtc::ReceivesTc, ValidatorU16Id};
use crate::{tmtc::PacketSenderRaw, ValidatorU16Id};
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
/// [spacepackets::PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet
@ -12,12 +12,12 @@ use crate::{tmtc::ReceivesTc, ValidatorU16Id};
/// The parser will write all packets which were decoded successfully to the given `tc_receiver`
/// and return the number of packets found. If the [ReceivesTc::pass_tc] calls fails, the
/// error will be returned.
pub fn parse_buffer_for_ccsds_space_packets<E>(
pub fn parse_buffer_for_ccsds_space_packets<SendError>(
buf: &mut [u8],
packet_id_validator: &(impl ValidatorU16Id + ?Sized),
tc_receiver: &mut (impl ReceivesTc<Error = E> + ?Sized),
packet_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
next_write_idx: &mut usize,
) -> Result<u32, E> {
) -> Result<u32, SendError> {
*next_write_idx = 0;
let mut packets_found = 0;
let mut current_idx = 0;
@ -32,7 +32,7 @@ pub fn parse_buffer_for_ccsds_space_packets<E>(
u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap());
let packet_size = length_field + 7;
if (current_idx + packet_size as usize) <= buf_len {
tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?;
packet_sender.send_raw_tc(&buf[current_idx..current_idx + packet_size as usize])?;
packets_found += 1;
} else {
// Move packet to start of buffer if applicable.
@ -85,11 +85,9 @@ mod tests {
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
assert_eq!(tc_cacher.tc_queue.len(), 1);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[..packet_len]
);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1);
assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len]);
}
#[test]
@ -116,13 +114,11 @@ mod tests {
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
assert_eq!(tc_cacher.tc_queue.len(), 2);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len_ping]);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[..packet_len_ping]
);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
queue.pop_front().unwrap(),
buffer[packet_len_ping..packet_len_ping + packet_len_action]
);
}
@ -152,13 +148,11 @@ mod tests {
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
assert_eq!(tc_cacher.tc_queue.len(), 2);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len_ping]);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[..packet_len_ping]
);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
queue.pop_front().unwrap(),
buffer[packet_len_ping..packet_len_ping + packet_len_action]
);
}
@ -188,7 +182,9 @@ mod tests {
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
assert_eq!(tc_cacher.tc_queue.len(), 1);
let queue = tc_cacher.tc_queue.borrow();
assert_eq!(queue.len(), 1);
// The broken packet was moved to the start, so the next write index should be after the
// last segment missing 4 bytes.
assert_eq!(next_write_idx, packet_len_action - 4);
@ -215,6 +211,7 @@ mod tests {
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 0);
assert_eq!(tc_cacher.tc_queue.len(), 0);
let queue = tc_cacher.tc_queue.borrow();
assert_eq!(queue.len(), 0);
}
}

View File

@ -1,4 +1,4 @@
use crate::tmtc::ReceivesTc;
use crate::tmtc::PacketSenderRaw;
use cobs::{decode_in_place, encode, max_encoding_length};
/// This function encodes the given packet with COBS and also wraps the encoded packet with
@ -55,11 +55,11 @@ pub fn encode_packet_with_cobs(
/// future write operations will be written to the `next_write_idx` argument.
///
/// The parser will write all packets which were decoded successfully to the given `tc_receiver`.
pub fn parse_buffer_for_cobs_encoded_packets<E>(
pub fn parse_buffer_for_cobs_encoded_packets<SendError>(
buf: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = E> + ?Sized),
packet_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
next_write_idx: &mut usize,
) -> Result<u32, E> {
) -> Result<u32, SendError> {
let mut start_index_packet = 0;
let mut start_found = false;
let mut last_byte = false;
@ -78,8 +78,8 @@ pub fn parse_buffer_for_cobs_encoded_packets<E>(
let decode_result = decode_in_place(&mut buf[start_index_packet..i]);
if let Ok(packet_len) = decode_result {
packets_found += 1;
tc_receiver
.pass_tc(&buf[start_index_packet..start_index_packet + packet_len])?;
packet_sender
.send_raw_tc(&buf[start_index_packet..start_index_packet + packet_len])?;
}
start_found = false;
} else {
@ -118,8 +118,9 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(packets, 1);
assert_eq!(test_sender.tc_queue.len(), 1);
let packet = &test_sender.tc_queue[0];
let mut queue = test_sender.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1);
let packet = &queue[0];
assert_eq!(packet, &SIMPLE_PACKET);
}
@ -144,10 +145,11 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(packets, 2);
assert_eq!(test_sender.tc_queue.len(), 2);
let packet0 = &test_sender.tc_queue[0];
let mut queue = test_sender.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
let packet0 = &queue[0];
assert_eq!(packet0, &SIMPLE_PACKET);
let packet1 = &test_sender.tc_queue[1];
let packet1 = &queue[1];
assert_eq!(packet1, &INVERTED_PACKET);
}
@ -166,7 +168,8 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(packets, 0);
assert_eq!(test_sender.tc_queue.len(), 0);
let mut queue = test_sender.tc_queue.borrow_mut();
assert_eq!(queue.len(), 0);
assert_eq!(next_read_idx, 0);
}
@ -198,8 +201,9 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(packets, 1);
assert_eq!(test_sender.tc_queue.len(), 1);
assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET);
let mut queue = test_sender.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1);
assert_eq!(&queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, next_expected_write_idx);
assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start);
}
@ -238,8 +242,9 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(packets, 1);
assert_eq!(test_sender.tc_queue.len(), 1);
assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET);
let mut queue = test_sender.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1);
assert_eq!(&queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, 1);
assert_eq!(encoded_buf[0], 0);
}
@ -257,7 +262,8 @@ pub(crate) mod tests {
)
.unwrap();
assert_eq!(packets, 0);
assert!(test_sender.tc_queue.is_empty());
let mut queue = test_sender.tc_queue.borrow_mut();
assert!(queue.is_empty());
assert_eq!(next_write_idx, 0);
}
}

View File

@ -6,9 +6,11 @@ pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_e
#[cfg(test)]
pub(crate) mod tests {
use core::cell::RefCell;
use alloc::{collections::VecDeque, vec::Vec};
use crate::tmtc::ReceivesTc;
use crate::tmtc::PacketSenderRaw;
use super::cobs::encode_packet_with_cobs;
@ -17,14 +19,15 @@ pub(crate) mod tests {
#[derive(Default)]
pub(crate) struct TcCacher {
pub(crate) tc_queue: VecDeque<Vec<u8>>,
pub(crate) tc_queue: RefCell<VecDeque<Vec<u8>>>,
}
impl ReceivesTc for TcCacher {
impl PacketSenderRaw for TcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_queue.push_back(tc_raw.to_vec());
fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut mut_queue = self.tc_queue.borrow_mut();
mut_queue.push_back(tc_raw.to_vec());
Ok(())
}
}

View File

@ -10,7 +10,7 @@ use std::net::SocketAddr;
use std::vec::Vec;
use crate::encoding::parse_buffer_for_cobs_encoded_packets;
use crate::tmtc::ReceivesTc;
use crate::tmtc::PacketSenderRaw;
use crate::tmtc::TmPacketSource;
use crate::hal::std::tcp_server::{
@ -28,7 +28,7 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
tc_receiver: &mut (impl PacketSenderRaw<Error = TcError> + ?Sized),
conn_result: &mut HandledConnectionInfo,
current_write_idx: usize,
next_write_idx: &mut usize,
@ -116,25 +116,25 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
/// test also serves as the example application for this module.
pub struct TcpTmtcInCobsServer<
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TcSender: PacketSenderRaw<Error = SendError>,
HandledConnection: HandledConnectionHandler,
TmError,
TcError: 'static,
SendError: 'static,
> {
pub generic_server: TcpTmtcGenericServer<
TmSource,
TcReceiver,
TcSender,
CobsTmSender,
CobsTcParser,
HandledConnection,
TmError,
TcError,
SendError,
>,
}
impl<
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TcReceiver: PacketSenderRaw<Error = TcError>,
HandledConnection: HandledConnectionHandler,
TmError: 'static,
TcError: 'static,
@ -196,18 +196,21 @@ mod tests {
use std::{
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
panic, thread,
panic,
sync::mpsc,
thread,
time::Instant,
};
use crate::{
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
hal::std::tcp_server::{
tests::{ConnectionFinishedHandler, SyncTcCacher, SyncTmSource},
tests::{ConnectionFinishedHandler, SyncTmSource},
ConnectionResult, ServerConfig,
},
queue::GenericSendError,
};
use alloc::sync::Arc;
use alloc::{sync::Arc, vec::Vec};
use cobs::encode;
use super::TcpTmtcInCobsServer;
@ -230,14 +233,20 @@ mod tests {
fn generic_tmtc_server(
addr: &SocketAddr,
tc_receiver: SyncTcCacher,
tc_sender: mpsc::Sender<Vec<u8>>,
tm_source: SyncTmSource,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpTmtcInCobsServer<SyncTmSource, SyncTcCacher, ConnectionFinishedHandler, (), ()> {
) -> TcpTmtcInCobsServer<
SyncTmSource,
mpsc::Sender<Vec<u8>>,
ConnectionFinishedHandler,
(),
GenericSendError,
> {
TcpTmtcInCobsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_receiver,
tc_sender,
ConnectionFinishedHandler::default(),
stop_signal,
)
@ -247,10 +256,10 @@ mod tests {
#[test]
fn test_server_basic_no_tm() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let (tc_sender, tc_receiver) = mpsc::channel();
let tm_source = SyncTmSource::default();
let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None);
generic_tmtc_server(&auto_port_addr, tc_sender.clone(), tm_source, None);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
@ -293,28 +302,20 @@ mod tests {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
let packet = tc_receiver.recv().expect("receiving TC failed");
assert_eq!(packet, &SIMPLE_PACKET);
matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
}
#[test]
fn test_server_basic_multi_tm_multi_tc() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let (tc_sender, tc_receiver) = mpsc::channel();
let mut tm_source = SyncTmSource::default();
tm_source.add_tm(&INVERTED_PACKET);
tm_source.add_tm(&SIMPLE_PACKET);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source.clone(),
None,
);
let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_sender.clone(), tm_source.clone(), None);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
@ -409,23 +410,20 @@ mod tests {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 2);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
drop(tc_queue);
let packet = tc_receiver.recv().expect("receiving TC failed");
assert_eq!(packet, &SIMPLE_PACKET);
let packet = tc_receiver.recv().expect("receiving TC failed");
assert_eq!(packet, &INVERTED_PACKET);
matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
}
#[test]
fn test_server_accept_timeout() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let (tc_sender, _tc_receiver) = mpsc::channel();
let tm_source = SyncTmSource::default();
let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None);
generic_tmtc_server(&auto_port_addr, tc_sender.clone(), tm_source, None);
let start = Instant::now();
// Call the connection handler in separate thread, does block.
let thread_jh = thread::spawn(move || loop {
@ -447,12 +445,12 @@ mod tests {
#[test]
fn test_server_stop_signal() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let (tc_sender, _tc_receiver) = mpsc::channel();
let tm_source = SyncTmSource::default();
let stop_signal = Arc::new(AtomicBool::new(false));
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tc_sender.clone(),
tm_source,
Some(stop_signal.clone()),
);

View File

@ -13,7 +13,7 @@ use std::net::SocketAddr;
// use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{ReceivesTc, TmPacketSource};
use crate::tmtc::{PacketSenderRaw, TmPacketSource};
use thiserror::Error;
// Re-export the TMTC in COBS server.
@ -122,7 +122,7 @@ pub trait TcpTcParser<TmError, TcError> {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
tc_receiver: &mut (impl PacketSenderRaw<Error = TcError> + ?Sized),
conn_result: &mut HandledConnectionInfo,
current_write_idx: usize,
next_write_idx: &mut usize,
@ -162,7 +162,7 @@ pub trait TcpTmSender<TmError, TcError> {
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
pub struct TcpTmtcGenericServer<
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TcReceiver: PacketSenderRaw<Error = TcError>,
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
HandledConnection: HandledConnectionHandler,
@ -185,7 +185,7 @@ pub struct TcpTmtcGenericServer<
impl<
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TcReceiver: PacketSenderRaw<Error = TcError>,
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
HandledConnection: HandledConnectionHandler,
@ -420,28 +420,14 @@ impl<
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Mutex;
use std::sync::{mpsc, Mutex};
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use crate::tmtc::{ReceivesTc, TmPacketSource};
use crate::tmtc::{PacketSenderRaw, TmPacketSource};
use super::*;
#[derive(Default, Clone)]
pub(crate) struct SyncTcCacher {
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTc for SyncTcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Clone)]
pub(crate) struct SyncTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,

View File

@ -6,7 +6,7 @@ use std::{io::Write, net::SocketAddr};
use crate::{
encoding::parse_buffer_for_ccsds_space_packets,
tmtc::{ReceivesTc, TmPacketSource},
tmtc::{PacketSenderRaw, TmPacketSource},
ValidatorU16Id,
};
@ -32,7 +32,7 @@ impl<PacketIdChecker: ValidatorU16Id, TmError, TcError: 'static> TcpTcParser<TmE
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
tc_receiver: &mut (impl PacketSenderRaw<Error = TcError> + ?Sized),
conn_result: &mut HandledConnectionInfo,
current_write_idx: usize,
next_write_idx: &mut usize,
@ -94,7 +94,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
/// also serves as the example application for this module.
pub struct TcpSpacepacketsServer<
TmSource: TmPacketSource<Error = TmError>,
TcSender: ReceivesTc<Error = SendError>,
TcSender: PacketSenderRaw<Error = SendError>,
PacketIdChecker: ValidatorU16Id,
HandledConnection: HandledConnectionHandler,
TmError,
@ -113,7 +113,7 @@ pub struct TcpSpacepacketsServer<
impl<
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TcReceiver: PacketSenderRaw<Error = TcError>,
PacketIdChecker: ValidatorU16Id,
HandledConnection: HandledConnectionHandler,
TmError: 'static,
@ -187,19 +187,23 @@ mod tests {
use std::{
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::mpsc,
thread,
};
use alloc::sync::Arc;
use alloc::{sync::Arc, vec::Vec};
use hashbrown::HashSet;
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
PacketId, SpHeader,
};
use crate::hal::std::tcp_server::{
tests::{ConnectionFinishedHandler, SyncTcCacher, SyncTmSource},
ConnectionResult, ServerConfig,
use crate::{
hal::std::tcp_server::{
tests::{ConnectionFinishedHandler, SyncTmSource},
ConnectionResult, ServerConfig,
},
queue::GenericSendError,
};
use super::TcpSpacepacketsServer;
@ -211,17 +215,17 @@ mod tests {
fn generic_tmtc_server(
addr: &SocketAddr,
tc_receiver: SyncTcCacher,
tc_receiver: mpsc::Sender<Vec<u8>>,
tm_source: SyncTmSource,
packet_id_lookup: HashSet<PacketId>,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpSpacepacketsServer<
SyncTmSource,
SyncTcCacher,
mpsc::Sender<Vec<u8>>,
HashSet<PacketId>,
ConnectionFinishedHandler,
(),
(),
GenericSendError,
> {
TcpSpacepacketsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
@ -237,13 +241,13 @@ mod tests {
#[test]
fn test_basic_tc_only() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let (tc_sender, tc_receiver) = mpsc::channel();
let tm_source = SyncTmSource::default();
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tc_sender.clone(),
tm_source,
packet_id_lookup,
None,
@ -289,16 +293,15 @@ mod tests {
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
let packet = tc_receiver.try_recv().expect("receiving TC failed");
assert_eq!(packet, tc_0);
matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
}
#[test]
fn test_multi_tc_multi_tm() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let (tc_sender, tc_receiver) = mpsc::channel();
let mut tm_source = SyncTmSource::default();
// Add telemetry
@ -320,7 +323,7 @@ mod tests {
packet_id_lookup.insert(TEST_PACKET_ID_1);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tc_sender.clone(),
tm_source,
packet_id_lookup,
None,
@ -397,9 +400,10 @@ mod tests {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 2);
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
assert_eq!(tc_queue.pop_front().unwrap(), tc_1);
let packet_0 = tc_receiver.try_recv().expect("receiving TC failed");
assert_eq!(packet_0, tc_0);
let packet_1 = tc_receiver.try_recv().expect("receiving TC failed");
assert_eq!(packet_1, tc_1);
matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
}
}

View File

@ -1,5 +1,5 @@
//! Generic UDP TC server.
use crate::tmtc::ReceivesTc;
use crate::tmtc::PacketSenderRaw;
use core::fmt::Debug;
use std::io::{self, ErrorKind};
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
@ -58,7 +58,7 @@ use std::vec::Vec;
/// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67)
/// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port
/// and then forwards them to a generic CCSDS packet receiver.
pub struct UdpTcServer<TcSender: ReceivesTc<Error = SendError>, SendError> {
pub struct UdpTcServer<TcSender: PacketSenderRaw<Error = SendError>, SendError> {
pub socket: UdpSocket,
recv_buf: Vec<u8>,
sender_addr: Option<SocketAddr>,
@ -75,7 +75,7 @@ pub enum ReceiveResult<SendError: Debug + 'static> {
Send(SendError),
}
impl<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>
impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
UdpTcServer<TcSender, SendError>
{
pub fn new<A: ToSocketAddrs>(
@ -107,7 +107,7 @@ impl<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>
let (num_bytes, from) = res;
self.sender_addr = Some(from);
self.tc_sender
.pass_tc(&self.recv_buf[0..num_bytes])
.send_raw_tc(&self.recv_buf[0..num_bytes])
.map_err(ReceiveResult::Send)?;
Ok(res)
}
@ -121,7 +121,8 @@ impl<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>
mod tests {
use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use crate::queue::GenericSendError;
use crate::tmtc::ReceivesTc;
use crate::tmtc::PacketSenderRaw;
use core::cell::RefCell;
use spacepackets::ecss::tc::PusTcCreator;
use spacepackets::ecss::WritablePusPacket;
use spacepackets::SpHeader;
@ -133,16 +134,17 @@ mod tests {
#[derive(Default)]
struct PingReceiver {
pub sent_cmds: VecDeque<Vec<u8>>,
pub sent_cmds: RefCell<VecDeque<Vec<u8>>>,
}
impl ReceivesTc for PingReceiver {
impl PacketSenderRaw for PingReceiver {
type Error = GenericSendError;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut sent_data = Vec::new();
sent_data.extend_from_slice(tc_raw);
self.sent_cmds.push_back(sent_data);
let mut queue = self.sent_cmds.borrow_mut();
queue.push_back(sent_data);
Ok(())
}
}
@ -174,8 +176,9 @@ mod tests {
local_addr
);
let ping_receiver = &mut udp_tc_server.tc_sender;
assert_eq!(ping_receiver.sent_cmds.len(), 1);
let sent_cmd = ping_receiver.sent_cmds.pop_front().unwrap();
let mut queue = ping_receiver.sent_cmds.borrow_mut();
assert_eq!(queue.len(), 1);
let sent_cmd = queue.pop_front().unwrap();
assert_eq!(sent_cmd, buf[0..len]);
}

View File

@ -22,26 +22,26 @@ use crate::queue::GenericSendError;
///
/// This trait can also be implemented for sender components which forward the packet.
/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender].
pub trait ReceivesTc: Send {
pub trait PacketSenderRaw: Send {
type Error;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error>;
fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error>;
}
#[cfg(feature = "std")]
impl ReceivesTc for mpsc::Sender<alloc::vec::Vec<u8>> {
impl PacketSenderRaw for mpsc::Sender<alloc::vec::Vec<u8>> {
type Error = GenericSendError;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.send(tc_raw.to_vec())
.map_err(|_| GenericSendError::RxDisconnected)
}
}
#[cfg(feature = "std")]
impl ReceivesTc for mpsc::SyncSender<alloc::vec::Vec<u8>> {
impl PacketSenderRaw for mpsc::SyncSender<alloc::vec::Vec<u8>> {
type Error = GenericSendError;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.try_send(tc_raw.to_vec()).map_err(|e| match e {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
@ -51,36 +51,36 @@ impl ReceivesTc for mpsc::SyncSender<alloc::vec::Vec<u8>> {
/// Extension trait of [ReceivesTc] which allows downcasting by implementing [Downcast].
#[cfg(feature = "alloc")]
pub trait ReceivesTcExt: ReceivesTc + Downcast {
pub trait TcSenderRawExt: PacketSenderRaw + Downcast {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn ReceivesTc<Error = Self::Error>;
fn upcast(&self) -> &dyn PacketSenderRaw<Error = Self::Error>;
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn ReceivesTc<Error = Self::Error>;
fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw<Error = Self::Error>;
}
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature
/// is enabled.
#[cfg(feature = "alloc")]
impl<T> ReceivesTcExt for T
impl<T> TcSenderRawExt for T
where
T: ReceivesTc + Send + 'static,
T: PacketSenderRaw + Send + 'static,
{
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn ReceivesTc<Error = Self::Error> {
fn upcast(&self) -> &dyn PacketSenderRaw<Error = Self::Error> {
self
}
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn ReceivesTc<Error = Self::Error> {
fn upcast_mut(&mut self) -> &mut dyn PacketSenderRaw<Error = Self::Error> {
self
}
}
#[cfg(feature = "alloc")]
impl_downcast!(ReceivesTcExt assoc Error);
impl_downcast!(TcSenderRawExt assoc Error);
/// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets
/// for CCSDS File Delivery Protocol (CFDP) packets.

View File

@ -1,3 +1,4 @@
use core::cell::RefCell;
use std::sync::mpsc;
use spacepackets::{ecss::tc::PusTcReader, SpHeader};
@ -9,7 +10,7 @@ use crate::{
queue::GenericSendError,
};
use super::{ReceivesCcsdsTc, ReceivesTc};
use super::{PacketSenderRaw, ReceivesCcsdsTc};
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum StoreAndSendError {
@ -47,28 +48,31 @@ impl SharedTcPool {
#[derive(Clone)]
pub struct TcSenderSharedPool {
pub tc_source: mpsc::SyncSender<StoreAddr>,
pub shared_pool: SharedTcPool,
pub shared_pool: RefCell<SharedTcPool>,
}
impl TcSenderSharedPool {
pub fn new(tc_source: mpsc::SyncSender<StoreAddr>, shared_pool: SharedTcPool) -> Self {
Self {
tc_source,
shared_pool,
shared_pool: RefCell::new(shared_pool),
}
}
#[allow(dead_code)]
pub fn shared_pool(&self) -> SharedStaticMemoryPool {
self.shared_pool.0.clone()
let pool = self.shared_pool.borrow();
pool.0.clone()
}
}
impl ReceivesTc for TcSenderSharedPool {
impl PacketSenderRaw for TcSenderSharedPool {
type Error = StoreAndSendError;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let addr = self.shared_pool.add_raw_tc(tc_raw)?;
fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut shared_pool = self.shared_pool.borrow_mut();
let addr = shared_pool.add_raw_tc(tc_raw)?;
drop(shared_pool);
self.tc_source.try_send(addr).map_err(|e| match e {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
@ -81,7 +85,9 @@ impl ReceivesEcssPusTc for TcSenderSharedPool {
type Error = StoreAndSendError;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
let addr = self.shared_pool.add_pus_tc(pus_tc)?;
let mut shared_pool = self.shared_pool.borrow_mut();
let addr = shared_pool.add_raw_tc(pus_tc.raw_data())?;
drop(shared_pool);
self.tc_source.try_send(addr).map_err(|e| match e {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
@ -93,13 +99,8 @@ impl ReceivesEcssPusTc for TcSenderSharedPool {
impl ReceivesCcsdsTc for TcSenderSharedPool {
type Error = StoreAndSendError;
fn pass_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
let addr = self.shared_pool.add_ccsds_tc(sp_header, tc_raw)?;
self.tc_source.try_send(addr).map_err(|e| match e {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
})?;
Ok(())
fn pass_ccsds(&mut self, _sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.send_raw_tc(tc_raw)
}
}

View File

@ -28,7 +28,7 @@ use satrs::{
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig,
TcpSpacepacketsServer, TcpTmtcInCobsServer,
},
tmtc::{ReceivesTc, TmPacketSource},
tmtc::{PacketSenderRaw, TmPacketSource},
};
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
@ -66,10 +66,10 @@ struct SyncTcCacher {
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTc for SyncTcCacher {
impl PacketSenderRaw for SyncTcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
fn send_raw_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
println!("Received TC: {:x?}", tc_raw);
tc_queue.push_back(tc_raw.to_vec());