what is this?
This commit is contained in:
parent
536c5f6949
commit
c96de203b8
@ -12,6 +12,9 @@ use crate::tmtc::{ReceivesTc, TmPacketSource};
|
|||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
// Re-export the TMTC in COBS server.
|
// Re-export the TMTC in COBS server.
|
||||||
|
pub use crate::hal::std::tcp_spacepackets_server::{
|
||||||
|
CcsdsTcParser, CcsdsTmSender, TcpSpacepacketsServer,
|
||||||
|
};
|
||||||
pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
|
pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
|
||||||
|
|
||||||
/// Configuration struct for the generic TCP TMTC server
|
/// Configuration struct for the generic TCP TMTC server
|
||||||
@ -317,3 +320,59 @@ impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
|
|||||||
self.listener.local_addr()
|
self.listener.local_addr()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) mod tests {
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
|
||||||
|
|
||||||
|
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct SyncTcCacher {
|
||||||
|
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
|
}
|
||||||
|
impl ReceivesTcCore for SyncTcCacher {
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||||
|
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
||||||
|
tc_queue.push_back(tc_raw.to_vec());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub(crate) struct SyncTmSource {
|
||||||
|
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SyncTmSource {
|
||||||
|
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
|
||||||
|
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
|
||||||
|
tm_queue.push_back(tm.to_vec());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TmPacketSourceCore for SyncTmSource {
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||||
|
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
|
||||||
|
if !tm_queue.is_empty() {
|
||||||
|
let next_vec = tm_queue.front().unwrap();
|
||||||
|
if buffer.len() < next_vec.len() {
|
||||||
|
panic!(
|
||||||
|
"provided buffer too small, must be at least {} bytes",
|
||||||
|
next_vec.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let next_vec = tm_queue.pop_front().unwrap();
|
||||||
|
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
||||||
|
return Ok(next_vec.len());
|
||||||
|
}
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use std::{io::Write, net::TcpStream};
|
use delegate::delegate;
|
||||||
|
use std::{io::Write, net::{TcpStream, TcpListener, SocketAddr}};
|
||||||
|
|
||||||
use alloc::boxed::Box;
|
use alloc::boxed::Box;
|
||||||
|
|
||||||
@ -7,11 +8,19 @@ use crate::{
|
|||||||
tmtc::{ReceivesTc, TmPacketSource},
|
tmtc::{ReceivesTc, TmPacketSource},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::tcp_server::{ConnectionResult, TcpTcParser, TcpTmSender, TcpTmtcError};
|
use super::tcp_server::{
|
||||||
|
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
||||||
|
};
|
||||||
|
|
||||||
/// Concrete [TcpTcParser] implementation for the [].
|
/// Concrete [TcpTcParser] implementation for the [].
|
||||||
pub struct CcsdsTcParser {
|
pub struct CcsdsTcParser {
|
||||||
packet_id_lookup: Box<dyn PacketIdLookup>,
|
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CcsdsTcParser {
|
||||||
|
pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
|
||||||
|
Self { packet_id_lookup }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CcsdsTcParser {
|
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CcsdsTcParser {
|
||||||
@ -66,5 +75,73 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CcsdsTmSender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TCP TMTC server implementation for exchange of tightly stuffed CCSDS space packets.
|
||||||
|
///
|
||||||
|
/// This serves only works if CCSDS space packets are the only packet type being exchanged.
|
||||||
|
/// It uses the CCSDS [spacepackets::PacketId] as the packet delimiter and start marker when
|
||||||
|
/// parsing for packets. The user specifies a set of expected [spacepackets::PacketId]s as part
|
||||||
|
/// of the server configuration for that purpose.
|
||||||
|
///
|
||||||
|
/// ## Example
|
||||||
|
///
|
||||||
|
/// The [TCP integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs)
|
||||||
|
/// also serves as the example application for this module.
|
||||||
|
pub struct TcpSpacepacketsServer<TmError, TcError: 'static> {
|
||||||
|
generic_server: TcpTmtcGenericServer<TmError, TcError, CcsdsTmSender, CcsdsTcParser>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError> {
|
||||||
|
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
|
||||||
|
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
||||||
|
///
|
||||||
|
/// ## Parameter
|
||||||
|
///
|
||||||
|
/// * `cfg` - Configuration of the server.
|
||||||
|
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
|
||||||
|
/// then sent back to the client.
|
||||||
|
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
|
||||||
|
/// forwarded to this TC receiver.
|
||||||
|
pub fn new(
|
||||||
|
cfg: ServerConfig,
|
||||||
|
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
|
||||||
|
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
|
||||||
|
packet_id_lookup: Box<dyn PacketIdLookup + Send>
|
||||||
|
) -> Result<Self, TcpTmtcError<TmError, TcError>> {
|
||||||
|
Ok(Self {
|
||||||
|
generic_server: TcpTmtcGenericServer::new(
|
||||||
|
cfg,
|
||||||
|
CcsdsTcParser::new(packet_id_lookup),
|
||||||
|
CcsdsTmSender::default(),
|
||||||
|
tm_source,
|
||||||
|
tc_receiver,
|
||||||
|
)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
delegate! {
|
||||||
|
to self.generic_server {
|
||||||
|
pub fn listener(&mut self) -> &mut TcpListener;
|
||||||
|
|
||||||
|
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
|
||||||
|
/// useful if using the port number 0 for OS auto-assignment.
|
||||||
|
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
|
||||||
|
|
||||||
|
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
||||||
|
pub fn handle_next_connection(
|
||||||
|
&mut self,
|
||||||
|
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {}
|
mod tests {
|
||||||
|
use super::TcpSpacepacketsServer;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_basic() {
|
||||||
|
let
|
||||||
|
let server = TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -173,61 +173,13 @@ mod tests {
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
||||||
hal::std::tcp_server::ServerConfig,
|
hal::std::tcp_server::{ServerConfig, tests::{SyncTcCacher, SyncTmSource}},
|
||||||
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
|
||||||
};
|
};
|
||||||
use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
|
use alloc::{boxed::Box, sync::Arc};
|
||||||
use cobs::encode;
|
use cobs::encode;
|
||||||
|
|
||||||
use super::TcpTmtcInCobsServer;
|
use super::TcpTmtcInCobsServer;
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
|
||||||
struct SyncTcCacher {
|
|
||||||
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
|
||||||
}
|
|
||||||
impl ReceivesTcCore for SyncTcCacher {
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
|
||||||
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
|
||||||
tc_queue.push_back(tc_raw.to_vec());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
|
||||||
struct SyncTmSource {
|
|
||||||
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncTmSource {
|
|
||||||
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
|
|
||||||
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
|
|
||||||
tm_queue.push_back(tm.to_vec());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TmPacketSourceCore for SyncTmSource {
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
|
||||||
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
|
|
||||||
if !tm_queue.is_empty() {
|
|
||||||
let next_vec = tm_queue.front().unwrap();
|
|
||||||
if buffer.len() < next_vec.len() {
|
|
||||||
panic!(
|
|
||||||
"provided buffer too small, must be at least {} bytes",
|
|
||||||
next_vec.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let next_vec = tm_queue.pop_front().unwrap();
|
|
||||||
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
|
||||||
return Ok(next_vec.len());
|
|
||||||
}
|
|
||||||
Ok(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||||
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user