re-worked TMTC modules
This commit is contained in:
parent
39ab9fa27b
commit
cecf9c263e
@ -1,18 +1,17 @@
|
||||
use std::{
|
||||
collections::{HashSet, VecDeque},
|
||||
fmt::Debug,
|
||||
marker::PhantomData,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use log::{info, warn};
|
||||
use satrs::{
|
||||
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
|
||||
pus::ReceivesEcssPusTc,
|
||||
spacepackets::PacketId,
|
||||
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
|
||||
tmtc::{ReceivesTc, TmPacketSource},
|
||||
};
|
||||
|
||||
use crate::tmtc::ccsds::CcsdsReceiver;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionFinishedHandler {}
|
||||
|
||||
@ -53,7 +52,7 @@ impl SyncTcpTmSource {
|
||||
}
|
||||
}
|
||||
|
||||
impl TmPacketSourceCore for SyncTcpTmSource {
|
||||
impl TmPacketSource for SyncTcpTmSource {
|
||||
type Error = ();
|
||||
|
||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||
@ -81,56 +80,45 @@ impl TmPacketSourceCore for SyncTcpTmSource {
|
||||
}
|
||||
}
|
||||
|
||||
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
|
||||
pub type TcpServer<ReceivesTc, SendError> = TcpSpacepacketsServer<
|
||||
SyncTcpTmSource,
|
||||
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||
ReceivesTc,
|
||||
HashSet<PacketId>,
|
||||
ConnectionFinishedHandler,
|
||||
(),
|
||||
CcsdsError<MpscErrorType>,
|
||||
SendError,
|
||||
>;
|
||||
|
||||
pub struct TcpTask<
|
||||
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
|
||||
+ ReceivesEcssPusTc<Error = MpscErrorType>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
MpscErrorType: 'static,
|
||||
> {
|
||||
server: TcpServerType<TcSource, MpscErrorType>,
|
||||
}
|
||||
pub struct TcpTask<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>(
|
||||
pub TcpServer<TcSender, SendError>,
|
||||
PhantomData<SendError>,
|
||||
);
|
||||
|
||||
impl<
|
||||
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
|
||||
+ ReceivesEcssPusTc<Error = MpscErrorType>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
MpscErrorType: 'static + core::fmt::Debug,
|
||||
> TcpTask<TcSource, MpscErrorType>
|
||||
impl<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>
|
||||
TcpTask<TcSender, SendError>
|
||||
{
|
||||
pub fn new(
|
||||
cfg: ServerConfig,
|
||||
tm_source: SyncTcpTmSource,
|
||||
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||
tc_sender: TcSender,
|
||||
packet_id_lookup: HashSet<PacketId>,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
Ok(Self {
|
||||
server: TcpSpacepacketsServer::new(
|
||||
Ok(Self(
|
||||
TcpSpacepacketsServer::new(
|
||||
cfg,
|
||||
tm_source,
|
||||
tc_receiver,
|
||||
tc_sender,
|
||||
packet_id_lookup,
|
||||
ConnectionFinishedHandler::default(),
|
||||
None,
|
||||
)?,
|
||||
})
|
||||
PhantomData,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
loop {
|
||||
let result = self.server.handle_next_connection(None);
|
||||
let result = self.0.handle_next_connection(None);
|
||||
match result {
|
||||
Ok(_conn_result) => (),
|
||||
Err(e) => {
|
||||
|
@ -1,12 +1,13 @@
|
||||
use core::fmt::Debug;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::mpsc;
|
||||
|
||||
use log::{info, warn};
|
||||
use satrs::pus::{PusTmAsVec, PusTmInPool};
|
||||
use satrs::tmtc::ReceivesTc;
|
||||
use satrs::{
|
||||
hal::std::udp_server::{ReceiveResult, UdpTcServer},
|
||||
pool::{PoolProviderWithGuards, SharedStaticMemoryPool},
|
||||
tmtc::CcsdsError,
|
||||
};
|
||||
|
||||
pub trait UdpTmHandler {
|
||||
@ -64,13 +65,20 @@ impl UdpTmHandler for DynamicUdpTmHandler {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UdpTmtcServer<TmHandler: UdpTmHandler, SendError> {
|
||||
pub udp_tc_server: UdpTcServer<CcsdsError<SendError>>,
|
||||
pub struct UdpTmtcServer<
|
||||
TcSender: ReceivesTc<Error = SendError>,
|
||||
TmHandler: UdpTmHandler,
|
||||
SendError,
|
||||
> {
|
||||
pub udp_tc_server: UdpTcServer<TcSender, SendError>,
|
||||
pub tm_handler: TmHandler,
|
||||
}
|
||||
|
||||
impl<TmHandler: UdpTmHandler, SendError: core::fmt::Debug + 'static>
|
||||
UdpTmtcServer<TmHandler, SendError>
|
||||
impl<
|
||||
TcSender: ReceivesTc<Error = SendError>,
|
||||
TmHandler: UdpTmHandler,
|
||||
SendError: Debug + 'static,
|
||||
> UdpTmtcServer<TcSender, TmHandler, SendError>
|
||||
{
|
||||
pub fn periodic_operation(&mut self) {
|
||||
while self.poll_tc_server() {}
|
||||
@ -84,21 +92,15 @@ impl<TmHandler: UdpTmHandler, SendError: core::fmt::Debug + 'static>
|
||||
match self.udp_tc_server.try_recv_tc() {
|
||||
Ok(_) => true,
|
||||
Err(e) => match e {
|
||||
ReceiveResult::ReceiverError(e) => match e {
|
||||
CcsdsError::ByteConversionError(e) => {
|
||||
warn!("packet error: {e:?}");
|
||||
true
|
||||
}
|
||||
CcsdsError::CustomError(e) => {
|
||||
warn!("mpsc custom error {e:?}");
|
||||
true
|
||||
}
|
||||
},
|
||||
ReceiveResult::IoError(e) => {
|
||||
ReceiveResult::Io(e) => {
|
||||
warn!("IO error {e}");
|
||||
false
|
||||
}
|
||||
ReceiveResult::NothingReceived => false,
|
||||
ReceiveResult::Send(send_error) => {
|
||||
warn!("send error {send_error:?}");
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -117,21 +119,21 @@ mod tests {
|
||||
ecss::{tc::PusTcCreator, WritablePusPacket},
|
||||
SpHeader,
|
||||
},
|
||||
tmtc::ReceivesTcCore,
|
||||
tmtc::ReceivesTc,
|
||||
};
|
||||
use satrs_example::config::{components, OBSW_SERVER_ADDR};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
pub struct TestReceiver {
|
||||
tc_vec: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||
#[derive(Default, Debug)]
|
||||
pub struct TestSender {
|
||||
tc_vec: VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl ReceivesTcCore for TestReceiver {
|
||||
type Error = CcsdsError<()>;
|
||||
impl ReceivesTc for TestSender {
|
||||
type Error = ();
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec());
|
||||
self.tc_vec.push_back(tc_raw.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -150,9 +152,9 @@ mod tests {
|
||||
#[test]
|
||||
fn test_basic() {
|
||||
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
|
||||
let test_receiver = TestReceiver::default();
|
||||
let tc_queue = test_receiver.tc_vec.clone();
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
|
||||
let test_receiver = TestSender::default();
|
||||
// let tc_queue = test_receiver.tc_vec.clone();
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, test_receiver).unwrap();
|
||||
let tm_handler = TestTmHandler::default();
|
||||
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
|
||||
let mut udp_dyn_server = UdpTmtcServer {
|
||||
@ -160,16 +162,16 @@ mod tests {
|
||||
tm_handler,
|
||||
};
|
||||
udp_dyn_server.periodic_operation();
|
||||
assert!(tc_queue.lock().unwrap().is_empty());
|
||||
assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty());
|
||||
assert!(tm_handler_calls.lock().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transactions() {
|
||||
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
|
||||
let test_receiver = TestReceiver::default();
|
||||
let tc_queue = test_receiver.tc_vec.clone();
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
|
||||
let test_receiver = TestSender::default();
|
||||
// let tc_queue = test_receiver.tc_vec.clone();
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, test_receiver).unwrap();
|
||||
let server_addr = udp_tc_server.socket.local_addr().unwrap();
|
||||
let tm_handler = TestTmHandler::default();
|
||||
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
|
||||
@ -187,9 +189,14 @@ mod tests {
|
||||
client.send(&ping_tc).unwrap();
|
||||
udp_dyn_server.periodic_operation();
|
||||
{
|
||||
let mut tc_queue = tc_queue.lock().unwrap();
|
||||
assert!(!tc_queue.is_empty());
|
||||
let received_tc = tc_queue.pop_front().unwrap();
|
||||
//let mut tc_queue = tc_queue.lock().unwrap();
|
||||
assert!(!udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty());
|
||||
let received_tc = udp_dyn_server
|
||||
.udp_tc_server
|
||||
.tc_sender
|
||||
.tc_vec
|
||||
.pop_front()
|
||||
.unwrap();
|
||||
assert_eq!(received_tc, ping_tc);
|
||||
}
|
||||
|
||||
@ -201,7 +208,7 @@ mod tests {
|
||||
assert_eq!(received_addr, client_addr);
|
||||
}
|
||||
udp_dyn_server.periodic_operation();
|
||||
assert!(tc_queue.lock().unwrap().is_empty());
|
||||
assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty());
|
||||
// Still tries to send to the same client.
|
||||
{
|
||||
let mut tm_handler_calls = tm_handler_calls.lock().unwrap();
|
||||
|
@ -10,19 +10,20 @@ mod tmtc;
|
||||
use crate::events::EventHandler;
|
||||
use crate::interface::udp::DynamicUdpTmHandler;
|
||||
use crate::pus::stack::PusStack;
|
||||
use crate::tmtc::tm_funnel::{TmFunnelDynamic, TmFunnelStatic};
|
||||
use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic};
|
||||
use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic};
|
||||
use log::info;
|
||||
use pus::test::create_test_service_dynamic;
|
||||
use satrs::hal::std::tcp_server::ServerConfig;
|
||||
use satrs::hal::std::udp_server::UdpTcServer;
|
||||
use satrs::request::GenericMessage;
|
||||
use satrs::tmtc::tc_helper::{SharedTcPool, TcSenderSharedPool};
|
||||
use satrs::tmtc::tm_helper::SharedTmPool;
|
||||
use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools};
|
||||
use satrs_example::config::tasks::{
|
||||
FREQ_MS_AOCS, FREQ_MS_EVENT_HANDLING, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC,
|
||||
};
|
||||
use satrs_example::config::{OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT};
|
||||
use tmtc::PusTcSourceProviderDynamic;
|
||||
|
||||
use crate::acs::mgm::{MgmHandlerLis3Mdl, MpscModeLeafInterface, SpiDummyInterface};
|
||||
use crate::interface::tcp::{SyncTcpTmSource, TcpTask};
|
||||
@ -34,17 +35,12 @@ use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static};
|
||||
use crate::pus::mode::{create_mode_service_dynamic, create_mode_service_static};
|
||||
use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static};
|
||||
use crate::pus::test::create_test_service_static;
|
||||
use crate::pus::{PusReceiver, PusTcMpscRouter};
|
||||
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
|
||||
use crate::requests::{CompositeRequest, GenericRequestRouter};
|
||||
use crate::tmtc::ccsds::CcsdsReceiver;
|
||||
use crate::tmtc::{
|
||||
PusTcSourceProviderSharedPool, SharedTcPool, TcSourceTaskDynamic, TcSourceTaskStatic,
|
||||
};
|
||||
use satrs::mode::ModeRequest;
|
||||
use satrs::pus::event_man::EventRequestWithToken;
|
||||
use satrs::pus::TmInSharedPoolSender;
|
||||
use satrs::spacepackets::{time::cds::CdsTime, time::TimeWriter};
|
||||
use satrs::tmtc::CcsdsDistributor;
|
||||
use satrs_example::config::components::MGM_HANDLER_0;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::mpsc;
|
||||
@ -56,9 +52,7 @@ use std::time::Duration;
|
||||
fn static_tmtc_pool_main() {
|
||||
let (tm_pool, tc_pool) = create_static_pools();
|
||||
let shared_tm_pool = SharedTmPool::new(tm_pool);
|
||||
let shared_tc_pool = SharedTcPool {
|
||||
pool: Arc::new(RwLock::new(tc_pool)),
|
||||
};
|
||||
let shared_tc_pool = SharedTcPool(Arc::new(RwLock::new(tc_pool)));
|
||||
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
|
||||
let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50);
|
||||
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
|
||||
@ -81,10 +75,7 @@ fn static_tmtc_pool_main() {
|
||||
|
||||
// This helper structure is used by all telecommand providers which need to send telecommands
|
||||
// to the TC source.
|
||||
let tc_source = PusTcSourceProviderSharedPool {
|
||||
shared_pool: shared_tc_pool.clone(),
|
||||
tc_source: tc_source_tx,
|
||||
};
|
||||
let tc_source = TcSenderSharedPool::new(tc_source_tx, shared_tc_pool.clone());
|
||||
|
||||
// Create event handling components
|
||||
// These sender handles are used to send event requests, for example to enable or disable
|
||||
@ -116,7 +107,7 @@ fn static_tmtc_pool_main() {
|
||||
};
|
||||
let pus_test_service = create_test_service_static(
|
||||
tm_funnel_tx_sender.clone(),
|
||||
shared_tc_pool.pool.clone(),
|
||||
shared_tc_pool.0.clone(),
|
||||
event_handler.clone_event_sender(),
|
||||
pus_test_rx,
|
||||
);
|
||||
@ -128,27 +119,27 @@ fn static_tmtc_pool_main() {
|
||||
);
|
||||
let pus_event_service = create_event_service_static(
|
||||
tm_funnel_tx_sender.clone(),
|
||||
shared_tc_pool.pool.clone(),
|
||||
shared_tc_pool.0.clone(),
|
||||
pus_event_rx,
|
||||
event_request_tx,
|
||||
);
|
||||
let pus_action_service = create_action_service_static(
|
||||
tm_funnel_tx_sender.clone(),
|
||||
shared_tc_pool.pool.clone(),
|
||||
shared_tc_pool.0.clone(),
|
||||
pus_action_rx,
|
||||
request_map.clone(),
|
||||
pus_action_reply_rx,
|
||||
);
|
||||
let pus_hk_service = create_hk_service_static(
|
||||
tm_funnel_tx_sender.clone(),
|
||||
shared_tc_pool.pool.clone(),
|
||||
shared_tc_pool.0.clone(),
|
||||
pus_hk_rx,
|
||||
request_map.clone(),
|
||||
pus_hk_reply_rx,
|
||||
);
|
||||
let pus_mode_service = create_mode_service_static(
|
||||
tm_funnel_tx_sender.clone(),
|
||||
shared_tc_pool.pool.clone(),
|
||||
shared_tc_pool.0.clone(),
|
||||
pus_mode_rx,
|
||||
request_map,
|
||||
pus_mode_reply_rx,
|
||||
@ -162,16 +153,14 @@ fn static_tmtc_pool_main() {
|
||||
pus_mode_service,
|
||||
);
|
||||
|
||||
let ccsds_receiver = CcsdsReceiver { tc_source };
|
||||
let mut tmtc_task = TcSourceTaskStatic::new(
|
||||
shared_tc_pool.clone(),
|
||||
tc_source_rx,
|
||||
PusReceiver::new(tm_funnel_tx_sender, pus_router),
|
||||
PusTcDistributor::new(tm_funnel_tx_sender, pus_router),
|
||||
);
|
||||
|
||||
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
|
||||
let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, tc_source.clone())
|
||||
.expect("creating UDP TMTC server failed");
|
||||
let mut udp_tmtc_server = UdpTmtcServer {
|
||||
udp_tc_server,
|
||||
@ -181,13 +170,12 @@ fn static_tmtc_pool_main() {
|
||||
},
|
||||
};
|
||||
|
||||
let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
|
||||
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
|
||||
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
|
||||
let mut tcp_server = TcpTask::new(
|
||||
tcp_server_cfg,
|
||||
sync_tm_tcp_source.clone(),
|
||||
tcp_ccsds_distributor,
|
||||
tc_source.clone(),
|
||||
PACKET_ID_VALIDATOR.clone(),
|
||||
)
|
||||
.expect("tcp server creation failed");
|
||||
@ -317,8 +305,6 @@ fn dyn_tmtc_pool_main() {
|
||||
.mode_router_map
|
||||
.insert(MGM_HANDLER_0.raw(), mgm_handler_mode_tx);
|
||||
|
||||
let tc_source = PusTcSourceProviderDynamic(tc_source_tx);
|
||||
|
||||
// Create event handling components
|
||||
// These sender handles are used to send event requests, for example to enable or disable
|
||||
// certain events.
|
||||
@ -354,7 +340,7 @@ fn dyn_tmtc_pool_main() {
|
||||
);
|
||||
let pus_scheduler_service = create_scheduler_service_dynamic(
|
||||
tm_funnel_tx.clone(),
|
||||
tc_source.0.clone(),
|
||||
tc_source_tx.clone(),
|
||||
pus_sched_rx,
|
||||
create_sched_tc_pool(),
|
||||
);
|
||||
@ -388,16 +374,18 @@ fn dyn_tmtc_pool_main() {
|
||||
pus_mode_service,
|
||||
);
|
||||
|
||||
let ccsds_receiver = CcsdsReceiver { tc_source };
|
||||
// let ccsds_receiver = CcsdsDistributor {
|
||||
//pus_sender: tc_source_tx,
|
||||
//};
|
||||
|
||||
let mut tmtc_task = TcSourceTaskDynamic::new(
|
||||
tc_source_rx,
|
||||
PusReceiver::new(tm_funnel_tx.clone(), pus_router),
|
||||
PusTcDistributor::new(tm_funnel_tx.clone(), pus_router),
|
||||
);
|
||||
|
||||
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
|
||||
let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
|
||||
//let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
|
||||
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, tc_source_tx.clone())
|
||||
.expect("creating UDP TMTC server failed");
|
||||
let mut udp_tmtc_server = UdpTmtcServer {
|
||||
udp_tc_server,
|
||||
@ -406,13 +394,13 @@ fn dyn_tmtc_pool_main() {
|
||||
},
|
||||
};
|
||||
|
||||
let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
|
||||
//let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
|
||||
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
|
||||
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
|
||||
let mut tcp_server = TcpTask::new(
|
||||
tcp_server_cfg,
|
||||
sync_tm_tcp_source.clone(),
|
||||
tcp_ccsds_distributor,
|
||||
tc_source_tx.clone(),
|
||||
PACKET_ID_VALIDATOR.clone(),
|
||||
)
|
||||
.expect("tcp server creation failed");
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::requests::GenericRequestRouter;
|
||||
use crate::tmtc::MpscStoreAndSendError;
|
||||
use crate::tmtc::StoreAndSendError;
|
||||
use log::warn;
|
||||
use satrs::pus::verification::{
|
||||
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
|
||||
@ -11,7 +11,7 @@ use satrs::pus::{
|
||||
GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler,
|
||||
PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
|
||||
};
|
||||
use satrs::queue::GenericReceiveError;
|
||||
use satrs::queue::{GenericReceiveError, GenericSendError};
|
||||
use satrs::request::{Apid, GenericMessage, MessageMetadata};
|
||||
use satrs::spacepackets::ecss::tc::PusTcReader;
|
||||
use satrs::spacepackets::ecss::PusServiceId;
|
||||
@ -53,7 +53,7 @@ pub struct PusTcMpscRouter {
|
||||
pub mode_tc_sender: Sender<EcssTcAndToken>,
|
||||
}
|
||||
|
||||
pub struct PusReceiver<TmSender: EcssTmSenderCore> {
|
||||
pub struct PusTcDistributor<TmSender: EcssTmSenderCore> {
|
||||
pub id: ComponentId,
|
||||
pub tm_sender: TmSender,
|
||||
pub verif_reporter: VerificationReporter,
|
||||
@ -61,7 +61,7 @@ pub struct PusReceiver<TmSender: EcssTmSenderCore> {
|
||||
stamp_helper: TimeStampHelper,
|
||||
}
|
||||
|
||||
impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
|
||||
impl<TmSender: EcssTmSenderCore> PusTcDistributor<TmSender> {
|
||||
pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self {
|
||||
Self {
|
||||
id: PUS_ROUTING_SERVICE.raw(),
|
||||
@ -80,7 +80,7 @@ impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
|
||||
tc_in_memory: TcInMemory,
|
||||
service: u8,
|
||||
pus_tc: &PusTcReader,
|
||||
) -> Result<PusPacketHandlerResult, MpscStoreAndSendError> {
|
||||
) -> Result<PusPacketHandlerResult, StoreAndSendError> {
|
||||
let init_token = self.verif_reporter.add_tc(pus_tc);
|
||||
self.stamp_helper.update_from_now();
|
||||
let accepted_token = self
|
||||
@ -90,26 +90,38 @@ impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
|
||||
let service = PusServiceId::try_from(service);
|
||||
match service {
|
||||
Ok(standard_service) => match standard_service {
|
||||
PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?,
|
||||
PusServiceId::Housekeeping => {
|
||||
self.pus_router.hk_tc_sender.send(EcssTcAndToken {
|
||||
PusServiceId::Test => self
|
||||
.pus_router
|
||||
.test_tc_sender
|
||||
.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?
|
||||
}
|
||||
PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?,
|
||||
PusServiceId::Scheduling => {
|
||||
self.pus_router.sched_tc_sender.send(EcssTcAndToken {
|
||||
})
|
||||
.map_err(|_| GenericSendError::RxDisconnected)?,
|
||||
PusServiceId::Housekeeping => self
|
||||
.pus_router
|
||||
.hk_tc_sender
|
||||
.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?
|
||||
}
|
||||
})
|
||||
.map_err(|_| GenericSendError::RxDisconnected)?,
|
||||
PusServiceId::Event => self
|
||||
.pus_router
|
||||
.event_tc_sender
|
||||
.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})
|
||||
.map_err(|_| GenericSendError::RxDisconnected)?,
|
||||
PusServiceId::Scheduling => self
|
||||
.pus_router
|
||||
.sched_tc_sender
|
||||
.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})
|
||||
.map_err(|_| GenericSendError::RxDisconnected)?,
|
||||
_ => {
|
||||
let result = self.verif_reporter.start_failure(
|
||||
&self.tm_sender,
|
||||
@ -128,12 +140,14 @@ impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
|
||||
Err(e) => {
|
||||
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
|
||||
match custom_service {
|
||||
CustomPusServiceId::Mode => {
|
||||
self.pus_router.mode_tc_sender.send(EcssTcAndToken {
|
||||
CustomPusServiceId::Mode => self
|
||||
.pus_router
|
||||
.mode_tc_sender
|
||||
.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?
|
||||
}
|
||||
})
|
||||
.map_err(|_| GenericSendError::RxDisconnected)?,
|
||||
CustomPusServiceId::Health => {}
|
||||
}
|
||||
} else {
|
||||
|
@ -12,23 +12,22 @@ use satrs::pus::{
|
||||
EcssTmSenderCore, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded,
|
||||
PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusTmInPool, TmInSharedPoolSender,
|
||||
};
|
||||
use satrs::tmtc::tc_helper::TcSenderSharedPool;
|
||||
use satrs_example::config::components::PUS_SCHED_SERVICE;
|
||||
|
||||
use crate::tmtc::PusTcSourceProviderSharedPool;
|
||||
|
||||
use super::HandlingStatus;
|
||||
|
||||
pub trait TcReleaser {
|
||||
fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
|
||||
}
|
||||
|
||||
impl TcReleaser for PusTcSourceProviderSharedPool {
|
||||
impl TcReleaser for TcSenderSharedPool {
|
||||
fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool {
|
||||
if enabled {
|
||||
// Transfer TC from scheduler TC pool to shared TC pool.
|
||||
let released_tc_addr = self
|
||||
.shared_pool
|
||||
.pool
|
||||
.0
|
||||
.write()
|
||||
.expect("locking pool failed")
|
||||
.add(tc)
|
||||
@ -122,7 +121,7 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
||||
|
||||
pub fn create_scheduler_service_static(
|
||||
tm_sender: TmInSharedPoolSender<mpsc::SyncSender<PusTmInPool>>,
|
||||
tc_releaser: PusTcSourceProviderSharedPool,
|
||||
tc_releaser: TcSenderSharedPool,
|
||||
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||
sched_tc_pool: StaticMemoryPool,
|
||||
) -> SchedulingServiceWrapper<MpscTmInSharedPoolSenderBounded, EcssTcInSharedStoreConverter> {
|
||||
@ -134,7 +133,7 @@ pub fn create_scheduler_service_static(
|
||||
pus_sched_rx,
|
||||
tm_sender,
|
||||
create_verification_reporter(PUS_SCHED_SERVICE.id(), PUS_SCHED_SERVICE.apid),
|
||||
EcssTcInSharedStoreConverter::new(tc_releaser.clone_backing_pool(), 2048),
|
||||
EcssTcInSharedStoreConverter::new(tc_releaser.shared_pool(), 2048),
|
||||
),
|
||||
scheduler,
|
||||
);
|
||||
|
@ -1,53 +0,0 @@
|
||||
use satrs::pus::ReceivesEcssPusTc;
|
||||
use satrs::spacepackets::{CcsdsPacket, SpHeader};
|
||||
use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
|
||||
use satrs::ValidatorU16Id;
|
||||
use satrs_example::config::components::Apid;
|
||||
use satrs_example::config::APID_VALIDATOR;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CcsdsReceiver<
|
||||
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone,
|
||||
E,
|
||||
> {
|
||||
pub tc_source: TcSource,
|
||||
}
|
||||
|
||||
impl<
|
||||
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
|
||||
E: 'static,
|
||||
> ValidatorU16Id for CcsdsReceiver<TcSource, E>
|
||||
{
|
||||
fn validate(&self, apid: u16) -> bool {
|
||||
APID_VALIDATOR.contains(&apid)
|
||||
}
|
||||
}
|
||||
|
||||
impl<
|
||||
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
|
||||
E: 'static,
|
||||
> CcsdsPacketHandler for CcsdsReceiver<TcSource, E>
|
||||
{
|
||||
type Error = E;
|
||||
|
||||
fn handle_packet_with_valid_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
if sp_header.apid() == Apid::Cfdp as u16 {
|
||||
} else {
|
||||
return self.tc_source.pass_ccsds(sp_header, tc_raw);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_packet_with_unknown_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
_tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
log::warn!("unknown APID 0x{:x?} detected", sp_header.apid());
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,215 +1,96 @@
|
||||
use log::warn;
|
||||
use satrs::pus::{
|
||||
EcssTcAndToken, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, ReceivesEcssPusTc,
|
||||
};
|
||||
use satrs::pus::ReceivesEcssPusTc;
|
||||
use satrs::queue::GenericSendError;
|
||||
use satrs::spacepackets::SpHeader;
|
||||
use std::sync::mpsc::{self, Receiver, SendError, Sender, SyncSender, TryRecvError};
|
||||
use std::sync::mpsc::{self};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::pus::PusReceiver;
|
||||
use satrs::pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError};
|
||||
use satrs::spacepackets::ecss::tc::PusTcReader;
|
||||
use satrs::spacepackets::ecss::PusPacket;
|
||||
use satrs::tmtc::ReceivesCcsdsTc;
|
||||
use satrs::tmtc::{ReceivesCcsdsTc, ReceivesTc};
|
||||
|
||||
pub mod ccsds;
|
||||
pub mod tm_funnel;
|
||||
pub mod tc_source;
|
||||
pub mod tm_sink;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
pub enum MpscStoreAndSendError {
|
||||
pub enum StoreAndSendError {
|
||||
#[error("Store error: {0}")]
|
||||
Store(#[from] StoreError),
|
||||
#[error("TC send error: {0}")]
|
||||
TcSend(#[from] SendError<EcssTcAndToken>),
|
||||
#[error("TMTC send error: {0}")]
|
||||
TmTcSend(#[from] SendError<StoreAddr>),
|
||||
#[error("Genreric send error: {0}")]
|
||||
Send(#[from] GenericSendError),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedTcPool {
|
||||
pub pool: SharedStaticMemoryPool,
|
||||
}
|
||||
pub struct SharedTcPool(pub SharedStaticMemoryPool);
|
||||
|
||||
impl SharedTcPool {
|
||||
pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
|
||||
let mut pg = self.pool.write().expect("error locking TC store");
|
||||
let mut pg = self.0.write().expect("error locking TC store");
|
||||
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
|
||||
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
|
||||
})?;
|
||||
Ok(addr)
|
||||
}
|
||||
|
||||
pub fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<StoreAddr, StoreError> {
|
||||
self.add_raw_tc(tc_raw)
|
||||
}
|
||||
|
||||
pub fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<StoreAddr, StoreError> {
|
||||
let mut pg = self.0.write().expect("error locking TC store");
|
||||
let addr = pg.free_element(tc_raw.len(), |buf| {
|
||||
buf[0..tc_raw.len()].copy_from_slice(tc_raw);
|
||||
})?;
|
||||
Ok(addr)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PusTcSourceProviderSharedPool {
|
||||
pub tc_source: SyncSender<StoreAddr>,
|
||||
pub struct TcSenderSharedPool {
|
||||
pub tc_source: mpsc::SyncSender<StoreAddr>,
|
||||
pub shared_pool: SharedTcPool,
|
||||
}
|
||||
|
||||
impl PusTcSourceProviderSharedPool {
|
||||
impl TcSenderSharedPool {
|
||||
#[allow(dead_code)]
|
||||
pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool {
|
||||
self.shared_pool.pool.clone()
|
||||
pub fn shared_pool(&self) -> SharedStaticMemoryPool {
|
||||
self.shared_pool.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesEcssPusTc for PusTcSourceProviderSharedPool {
|
||||
type Error = MpscStoreAndSendError;
|
||||
impl ReceivesTc for TcSenderSharedPool {
|
||||
type Error = StoreAndSendError;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let addr = self.shared_pool.add_raw_tc(tc_raw)?;
|
||||
self.tc_source.try_send(addr).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
impl ReceivesEcssPusTc for TcSenderSharedPool {
|
||||
type Error = StoreAndSendError;
|
||||
|
||||
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||
let addr = self.shared_pool.add_pus_tc(pus_tc)?;
|
||||
self.tc_source.send(addr)?;
|
||||
self.tc_source.try_send(addr).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool {
|
||||
type Error = MpscStoreAndSendError;
|
||||
impl ReceivesCcsdsTc for TcSenderSharedPool {
|
||||
type Error = StoreAndSendError;
|
||||
|
||||
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let mut pool = self.shared_pool.pool.write().expect("locking pool failed");
|
||||
let addr = pool.add(tc_raw)?;
|
||||
drop(pool);
|
||||
self.tc_source.send(addr)?;
|
||||
fn pass_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let addr = self.shared_pool.add_ccsds_tc(sp_header, tc_raw)?;
|
||||
self.tc_source.try_send(addr).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules.
|
||||
#[derive(Clone)]
|
||||
pub struct PusTcSourceProviderDynamic(pub Sender<Vec<u8>>);
|
||||
|
||||
impl ReceivesEcssPusTc for PusTcSourceProviderDynamic {
|
||||
type Error = SendError<Vec<u8>>;
|
||||
|
||||
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||
self.0.send(pus_tc.raw_data().to_vec())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
|
||||
type Error = mpsc::SendError<Vec<u8>>;
|
||||
|
||||
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.0.send(tc_raw.to_vec())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// TC source components where static pools are the backing memory of the received telecommands.
|
||||
pub struct TcSourceTaskStatic {
|
||||
shared_tc_pool: SharedTcPool,
|
||||
tc_receiver: Receiver<StoreAddr>,
|
||||
tc_buf: [u8; 4096],
|
||||
pus_receiver: PusReceiver<MpscTmInSharedPoolSenderBounded>,
|
||||
}
|
||||
|
||||
impl TcSourceTaskStatic {
|
||||
pub fn new(
|
||||
shared_tc_pool: SharedTcPool,
|
||||
tc_receiver: Receiver<StoreAddr>,
|
||||
pus_receiver: PusReceiver<MpscTmInSharedPoolSenderBounded>,
|
||||
) -> Self {
|
||||
Self {
|
||||
shared_tc_pool,
|
||||
tc_receiver,
|
||||
tc_buf: [0; 4096],
|
||||
pus_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.poll_tc();
|
||||
}
|
||||
|
||||
pub fn poll_tc(&mut self) -> bool {
|
||||
match self.tc_receiver.try_recv() {
|
||||
Ok(addr) => {
|
||||
let pool = self
|
||||
.shared_tc_pool
|
||||
.pool
|
||||
.read()
|
||||
.expect("locking tc pool failed");
|
||||
pool.read(&addr, &mut self.tc_buf)
|
||||
.expect("reading pool failed");
|
||||
drop(pool);
|
||||
match PusTcReader::new(&self.tc_buf) {
|
||||
Ok((pus_tc, _)) => {
|
||||
self.pus_receiver
|
||||
.handle_tc_packet(
|
||||
satrs::pus::TcInMemory::StoreAddr(addr),
|
||||
pus_tc.service(),
|
||||
&pus_tc,
|
||||
)
|
||||
.ok();
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("error creating PUS TC from raw data: {e}");
|
||||
warn!("raw data: {:x?}", self.tc_buf);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => false,
|
||||
TryRecvError::Disconnected => {
|
||||
warn!("tmtc thread: sender disconnected");
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TC source components where the heap is the backing memory of the received telecommands.
|
||||
pub struct TcSourceTaskDynamic {
|
||||
pub tc_receiver: Receiver<Vec<u8>>,
|
||||
pus_receiver: PusReceiver<MpscTmAsVecSender>,
|
||||
}
|
||||
|
||||
impl TcSourceTaskDynamic {
|
||||
pub fn new(
|
||||
tc_receiver: Receiver<Vec<u8>>,
|
||||
pus_receiver: PusReceiver<MpscTmAsVecSender>,
|
||||
) -> Self {
|
||||
Self {
|
||||
tc_receiver,
|
||||
pus_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.poll_tc();
|
||||
}
|
||||
|
||||
pub fn poll_tc(&mut self) -> bool {
|
||||
match self.tc_receiver.try_recv() {
|
||||
Ok(tc) => match PusTcReader::new(&tc) {
|
||||
Ok((pus_tc, _)) => {
|
||||
self.pus_receiver
|
||||
.handle_tc_packet(
|
||||
satrs::pus::TcInMemory::Vec(tc.clone()),
|
||||
pus_tc.service(),
|
||||
&pus_tc,
|
||||
)
|
||||
.ok();
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("error creating PUS TC from raw data: {e}");
|
||||
warn!("raw data: {:x?}", tc);
|
||||
true
|
||||
}
|
||||
},
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => false,
|
||||
TryRecvError::Disconnected => {
|
||||
warn!("tmtc thread: sender disconnected");
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
129
satrs-example/src/tmtc/tc_source.rs
Normal file
129
satrs-example/src/tmtc/tc_source.rs
Normal file
@ -0,0 +1,129 @@
|
||||
use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedTcPool};
|
||||
use std::sync::mpsc::{self, TryRecvError};
|
||||
|
||||
use satrs::{
|
||||
pool::StoreAddr,
|
||||
pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded},
|
||||
spacepackets::ecss::{tc::PusTcReader, PusPacket},
|
||||
};
|
||||
|
||||
use crate::pus::PusTcDistributor;
|
||||
|
||||
|
||||
// TC source components where static pools are the backing memory of the received telecommands.
|
||||
pub struct TcSourceTaskStatic {
|
||||
shared_tc_pool: SharedTcPool,
|
||||
tc_receiver: mpsc::Receiver<StoreAddr>,
|
||||
tc_buf: [u8; 4096],
|
||||
pus_receiver: PusTcDistributor<MpscTmInSharedPoolSenderBounded>,
|
||||
}
|
||||
|
||||
impl TcSourceTaskStatic {
|
||||
pub fn new(
|
||||
shared_tc_pool: SharedTcPool,
|
||||
tc_receiver: mpsc::Receiver<StoreAddr>,
|
||||
pus_receiver: PusTcDistributor<MpscTmInSharedPoolSenderBounded>,
|
||||
) -> Self {
|
||||
Self {
|
||||
shared_tc_pool,
|
||||
tc_receiver,
|
||||
tc_buf: [0; 4096],
|
||||
pus_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.poll_tc();
|
||||
}
|
||||
|
||||
pub fn poll_tc(&mut self) -> bool {
|
||||
match self.tc_receiver.try_recv() {
|
||||
Ok(addr) => {
|
||||
let pool = self
|
||||
.shared_tc_pool
|
||||
.0
|
||||
.read()
|
||||
.expect("locking tc pool failed");
|
||||
pool.read(&addr, &mut self.tc_buf)
|
||||
.expect("reading pool failed");
|
||||
drop(pool);
|
||||
match PusTcReader::new(&self.tc_buf) {
|
||||
Ok((pus_tc, _)) => {
|
||||
self.pus_receiver
|
||||
.handle_tc_packet(
|
||||
satrs::pus::TcInMemory::StoreAddr(addr),
|
||||
pus_tc.service(),
|
||||
&pus_tc,
|
||||
)
|
||||
.ok();
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("error creating PUS TC from raw data: {e}");
|
||||
log::warn!("raw data: {:x?}", self.tc_buf);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => false,
|
||||
TryRecvError::Disconnected => {
|
||||
log::warn!("tmtc thread: sender disconnected");
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TC source components where the heap is the backing memory of the received telecommands.
|
||||
pub struct TcSourceTaskDynamic {
|
||||
pub tc_receiver: mpsc::Receiver<Vec<u8>>,
|
||||
pus_receiver: PusTcDistributor<MpscTmAsVecSender>,
|
||||
}
|
||||
|
||||
impl TcSourceTaskDynamic {
|
||||
pub fn new(
|
||||
tc_receiver: mpsc::Receiver<Vec<u8>>,
|
||||
pus_receiver: PusTcDistributor<MpscTmAsVecSender>,
|
||||
) -> Self {
|
||||
Self {
|
||||
tc_receiver,
|
||||
pus_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.poll_tc();
|
||||
}
|
||||
|
||||
pub fn poll_tc(&mut self) -> bool {
|
||||
// Right now, we only expect PUS packets.
|
||||
match self.tc_receiver.try_recv() {
|
||||
Ok(tc) => match PusTcReader::new(&tc) {
|
||||
Ok((pus_tc, _)) => {
|
||||
self.pus_receiver
|
||||
.handle_tc_packet(
|
||||
satrs::pus::TcInMemory::Vec(tc.clone()),
|
||||
pus_tc.service(),
|
||||
&pus_tc,
|
||||
)
|
||||
.ok();
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("error creating PUS TC from raw data: {e}");
|
||||
log::warn!("raw data: {:x?}", tc);
|
||||
true
|
||||
}
|
||||
},
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => false,
|
||||
TryRecvError::Disconnected => {
|
||||
log::warn!("tmtc thread: sender disconnected");
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
|
||||
|
||||
## Changed
|
||||
|
||||
- Renamed `ReceivesTcCore` to `ReceivesTc`.
|
||||
- Renamed `TmPacketSourceCore` to `TmPacketSource`.
|
||||
- TCP server generics order. The error generics come last now.
|
||||
- `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root.
|
||||
It can be used for both CCSDS packet ID and CCSDS APID validation.
|
||||
@ -76,6 +78,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
|
||||
## Removed
|
||||
|
||||
- Remove `objects` module.
|
||||
- Removed CCSDS and PUS distributor modules. Their worth is questionable in an architecture
|
||||
where routing traits are sufficient and the core logic to demultiplex and distribute packets
|
||||
is simple enough to be application code.
|
||||
|
||||
# [v0.2.0-rc.0] 2024-02-21
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
use crate::{tmtc::ReceivesTcCore, ValidatorU16Id};
|
||||
use crate::{tmtc::ReceivesTc, ValidatorU16Id};
|
||||
|
||||
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
|
||||
/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then
|
||||
/// uses the length field of the packet to extract CCSDS packets.
|
||||
/// [spacepackets::PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet
|
||||
/// and then uses the length field of the packet to extract CCSDS packets.
|
||||
///
|
||||
/// This function is also able to deal with broken tail packets at the end as long a the parser
|
||||
/// can read the full 7 bytes which constitue a space packet header plus one byte minimal size.
|
||||
@ -10,12 +10,12 @@ use crate::{tmtc::ReceivesTcCore, ValidatorU16Id};
|
||||
/// index for future write operations will be written to the `next_write_idx` argument.
|
||||
///
|
||||
/// The parser will write all packets which were decoded successfully to the given `tc_receiver`
|
||||
/// and return the number of packets found. If the [ReceivesTcCore::pass_tc] calls fails, the
|
||||
/// and return the number of packets found. If the [ReceivesTc::pass_tc] calls fails, the
|
||||
/// error will be returned.
|
||||
pub fn parse_buffer_for_ccsds_space_packets<E>(
|
||||
buf: &mut [u8],
|
||||
packet_id_validator: &(impl ValidatorU16Id + ?Sized),
|
||||
tc_receiver: &mut (impl ReceivesTcCore<Error = E> + ?Sized),
|
||||
tc_receiver: &mut (impl ReceivesTc<Error = E> + ?Sized),
|
||||
next_write_idx: &mut usize,
|
||||
) -> Result<u32, E> {
|
||||
*next_write_idx = 0;
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::tmtc::ReceivesTcCore;
|
||||
use crate::tmtc::ReceivesTc;
|
||||
use cobs::{decode_in_place, encode, max_encoding_length};
|
||||
|
||||
/// This function encodes the given packet with COBS and also wraps the encoded packet with
|
||||
@ -57,7 +57,7 @@ pub fn encode_packet_with_cobs(
|
||||
/// The parser will write all packets which were decoded successfully to the given `tc_receiver`.
|
||||
pub fn parse_buffer_for_cobs_encoded_packets<E>(
|
||||
buf: &mut [u8],
|
||||
tc_receiver: &mut dyn ReceivesTcCore<Error = E>,
|
||||
tc_receiver: &mut (impl ReceivesTc<Error = E> + ?Sized),
|
||||
next_write_idx: &mut usize,
|
||||
) -> Result<u32, E> {
|
||||
let mut start_index_packet = 0;
|
||||
|
@ -8,7 +8,7 @@ pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_e
|
||||
pub(crate) mod tests {
|
||||
use alloc::{collections::VecDeque, vec::Vec};
|
||||
|
||||
use crate::tmtc::ReceivesTcCore;
|
||||
use crate::tmtc::ReceivesTc;
|
||||
|
||||
use super::cobs::encode_packet_with_cobs;
|
||||
|
||||
@ -20,7 +20,7 @@ pub(crate) mod tests {
|
||||
pub(crate) tc_queue: VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl ReceivesTcCore for TcCacher {
|
||||
impl ReceivesTc for TcCacher {
|
||||
type Error = ();
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
|
@ -35,7 +35,7 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
|
||||
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
|
||||
&mut tc_buffer[..current_write_idx],
|
||||
tc_receiver.upcast_mut(),
|
||||
tc_receiver,
|
||||
next_write_idx,
|
||||
)
|
||||
.map_err(|e| TcpTmtcError::TcError(e))?;
|
||||
|
@ -178,8 +178,8 @@ pub struct TcpTmtcGenericServer<
|
||||
pub(crate) tc_buffer: Vec<u8>,
|
||||
poll: Poll,
|
||||
events: Events,
|
||||
tc_handler: TcParser,
|
||||
tm_handler: TmSender,
|
||||
pub tc_handler: TcParser,
|
||||
pub tm_handler: TmSender,
|
||||
stop_signal: Option<Arc<AtomicBool>>,
|
||||
}
|
||||
|
||||
@ -424,7 +424,7 @@ pub(crate) mod tests {
|
||||
|
||||
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
|
||||
|
||||
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
|
||||
use crate::tmtc::{ReceivesTc, TmPacketSource};
|
||||
|
||||
use super::*;
|
||||
|
||||
@ -432,7 +432,7 @@ pub(crate) mod tests {
|
||||
pub(crate) struct SyncTcCacher {
|
||||
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||
}
|
||||
impl ReceivesTcCore for SyncTcCacher {
|
||||
impl ReceivesTc for SyncTcCacher {
|
||||
type Error = ();
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
@ -454,7 +454,7 @@ pub(crate) mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl TmPacketSourceCore for SyncTmSource {
|
||||
impl TmPacketSource for SyncTmSource {
|
||||
type Error = ();
|
||||
|
||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||
|
@ -41,7 +41,7 @@ impl<PacketIdChecker: ValidatorU16Id, TmError, TcError: 'static> TcpTcParser<TmE
|
||||
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
|
||||
&mut tc_buffer[..current_write_idx],
|
||||
&self.packet_id_lookup,
|
||||
tc_receiver.upcast_mut(),
|
||||
tc_receiver,
|
||||
next_write_idx,
|
||||
)
|
||||
.map_err(|e| TcpTmtcError::TcError(e))?;
|
||||
@ -94,20 +94,20 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
||||
/// also serves as the example application for this module.
|
||||
pub struct TcpSpacepacketsServer<
|
||||
TmSource: TmPacketSource<Error = TmError>,
|
||||
TcReceiver: ReceivesTc<Error = TcError>,
|
||||
TcSender: ReceivesTc<Error = SendError>,
|
||||
PacketIdChecker: ValidatorU16Id,
|
||||
HandledConnection: HandledConnectionHandler,
|
||||
TmError,
|
||||
TcError: 'static,
|
||||
SendError: 'static,
|
||||
> {
|
||||
pub generic_server: TcpTmtcGenericServer<
|
||||
TmSource,
|
||||
TcReceiver,
|
||||
TcSender,
|
||||
SpacepacketsTmSender,
|
||||
SpacepacketsTcParser<PacketIdChecker>,
|
||||
HandledConnection,
|
||||
TmError,
|
||||
TcError,
|
||||
SendError,
|
||||
>,
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
//! Generic UDP TC server.
|
||||
use crate::tmtc::{ReceivesTc, ReceivesTcCore};
|
||||
use std::boxed::Box;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use crate::tmtc::ReceivesTc;
|
||||
use core::fmt::Debug;
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
|
||||
use std::vec;
|
||||
use std::vec::Vec;
|
||||
@ -11,9 +11,10 @@ use std::vec::Vec;
|
||||
///
|
||||
/// It caches all received telecomands into a vector. The maximum expected telecommand size should
|
||||
/// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC
|
||||
/// receiver in form of a special trait object which implements [ReceivesTc]. Please note that the
|
||||
/// sender in form of a special trait object which implements [ReceivesTc]. For example, this can
|
||||
/// be used to send the telecommands to a centralized TC source. Please note that the
|
||||
/// receiver should copy out the received data if it the data is required past the
|
||||
/// [ReceivesTcCore::pass_tc] call.
|
||||
/// [ReceivesTc::pass_tc] call and no message passing is used to process the packet.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
@ -21,13 +22,13 @@ use std::vec::Vec;
|
||||
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
/// use spacepackets::ecss::WritablePusPacket;
|
||||
/// use satrs::hal::std::udp_server::UdpTcServer;
|
||||
/// use satrs::tmtc::{ReceivesTc, ReceivesTcCore};
|
||||
/// use satrs::tmtc::ReceivesTc;
|
||||
/// use spacepackets::SpHeader;
|
||||
/// use spacepackets::ecss::tc::PusTcCreator;
|
||||
///
|
||||
/// #[derive (Default)]
|
||||
/// struct PingReceiver {}
|
||||
/// impl ReceivesTcCore for PingReceiver {
|
||||
/// impl ReceivesTc for PingReceiver {
|
||||
/// type Error = ();
|
||||
/// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
/// assert_eq!(tc_raw.len(), 13);
|
||||
@ -38,7 +39,7 @@ use std::vec::Vec;
|
||||
/// let mut buf = [0; 32];
|
||||
/// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
|
||||
/// let ping_receiver = PingReceiver::default();
|
||||
/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver))
|
||||
/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver)
|
||||
/// .expect("Creating UDP TMTC server failed");
|
||||
/// let sph = SpHeader::new_from_apid(0x02);
|
||||
/// let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true);
|
||||
@ -57,65 +58,42 @@ use std::vec::Vec;
|
||||
/// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67)
|
||||
/// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port
|
||||
/// and then forwards them to a generic CCSDS packet receiver.
|
||||
pub struct UdpTcServer<E> {
|
||||
pub struct UdpTcServer<TcSender: ReceivesTc<Error = SendError>, SendError> {
|
||||
pub socket: UdpSocket,
|
||||
recv_buf: Vec<u8>,
|
||||
sender_addr: Option<SocketAddr>,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = E>>,
|
||||
pub tc_sender: TcSender,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ReceiveResult<E> {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ReceiveResult<SendError: Debug + 'static> {
|
||||
#[error("nothing was received")]
|
||||
NothingReceived,
|
||||
IoError(Error),
|
||||
ReceiverError(E),
|
||||
#[error(transparent)]
|
||||
Io(#[from] io::Error),
|
||||
#[error(transparent)]
|
||||
Send(SendError),
|
||||
}
|
||||
|
||||
impl<E> From<Error> for ReceiveResult<E> {
|
||||
fn from(e: Error) -> Self {
|
||||
ReceiveResult::IoError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: PartialEq> PartialEq for ReceiveResult<E> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
use ReceiveResult::*;
|
||||
match (self, other) {
|
||||
(IoError(ref e), IoError(ref other_e)) => e.kind() == other_e.kind(),
|
||||
(NothingReceived, NothingReceived) => true,
|
||||
(ReceiverError(e), ReceiverError(other_e)) => e == other_e,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Eq + PartialEq> Eq for ReceiveResult<E> {}
|
||||
|
||||
impl<E: 'static> ReceivesTcCore for UdpTcServer<E> {
|
||||
type Error = E;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.tc_receiver.pass_tc(tc_raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: 'static> UdpTcServer<E> {
|
||||
impl<TcSender: ReceivesTc<Error = SendError>, SendError: Debug + 'static>
|
||||
UdpTcServer<TcSender, SendError>
|
||||
{
|
||||
pub fn new<A: ToSocketAddrs>(
|
||||
addr: A,
|
||||
max_recv_size: usize,
|
||||
tc_receiver: Box<dyn ReceivesTc<Error = E>>,
|
||||
) -> Result<Self, Error> {
|
||||
tc_sender: TcSender,
|
||||
) -> Result<Self, io::Error> {
|
||||
let server = Self {
|
||||
socket: UdpSocket::bind(addr)?,
|
||||
recv_buf: vec![0; max_recv_size],
|
||||
sender_addr: None,
|
||||
tc_receiver,
|
||||
tc_sender,
|
||||
};
|
||||
server.socket.set_nonblocking(true)?;
|
||||
Ok(server)
|
||||
}
|
||||
|
||||
pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult<E>> {
|
||||
pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult<SendError>> {
|
||||
let res = match self.socket.recv_from(&mut self.recv_buf) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
@ -128,9 +106,9 @@ impl<E: 'static> UdpTcServer<E> {
|
||||
};
|
||||
let (num_bytes, from) = res;
|
||||
self.sender_addr = Some(from);
|
||||
self.tc_receiver
|
||||
self.tc_sender
|
||||
.pass_tc(&self.recv_buf[0..num_bytes])
|
||||
.map_err(|e| ReceiveResult::ReceiverError(e))?;
|
||||
.map_err(ReceiveResult::Send)?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@ -142,11 +120,11 @@ impl<E: 'static> UdpTcServer<E> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer};
|
||||
use crate::tmtc::ReceivesTcCore;
|
||||
use crate::queue::GenericSendError;
|
||||
use crate::tmtc::ReceivesTc;
|
||||
use spacepackets::ecss::tc::PusTcCreator;
|
||||
use spacepackets::ecss::WritablePusPacket;
|
||||
use spacepackets::SpHeader;
|
||||
use std::boxed::Box;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
|
||||
use std::vec::Vec;
|
||||
@ -158,8 +136,8 @@ mod tests {
|
||||
pub sent_cmds: VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl ReceivesTcCore for PingReceiver {
|
||||
type Error = ();
|
||||
impl ReceivesTc for PingReceiver {
|
||||
type Error = GenericSendError;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let mut sent_data = Vec::new();
|
||||
@ -175,7 +153,7 @@ mod tests {
|
||||
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
|
||||
let ping_receiver = PingReceiver::default();
|
||||
is_send(&ping_receiver);
|
||||
let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver))
|
||||
let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver)
|
||||
.expect("Creating UDP TMTC server failed");
|
||||
is_send(&udp_tc_server);
|
||||
let sph = SpHeader::new_from_apid(0x02);
|
||||
@ -195,7 +173,7 @@ mod tests {
|
||||
udp_tc_server.last_sender().expect("No sender set"),
|
||||
local_addr
|
||||
);
|
||||
let ping_receiver: &mut PingReceiver = udp_tc_server.tc_receiver.downcast_mut().unwrap();
|
||||
let ping_receiver = &mut udp_tc_server.tc_sender;
|
||||
assert_eq!(ping_receiver.sent_cmds.len(), 1);
|
||||
let sent_cmd = ping_receiver.sent_cmds.pop_front().unwrap();
|
||||
assert_eq!(sent_cmd, buf[0..len]);
|
||||
@ -205,11 +183,11 @@ mod tests {
|
||||
fn test_nothing_received() {
|
||||
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7779);
|
||||
let ping_receiver = PingReceiver::default();
|
||||
let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver))
|
||||
let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver)
|
||||
.expect("Creating UDP TMTC server failed");
|
||||
let res = udp_tc_server.try_recv_tc();
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert_eq!(err, ReceiveResult::NothingReceived);
|
||||
matches!(err, ReceiveResult::NothingReceived);
|
||||
}
|
||||
}
|
||||
|
@ -273,12 +273,8 @@ pub trait EcssTcReceiverCore {
|
||||
fn recv_tc(&self) -> Result<EcssTcAndToken, TryRecvTmtcError>;
|
||||
}
|
||||
|
||||
/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is
|
||||
/// implemented by the [crate::tmtc::pus_distrib::PusDistributor] objects to allow passing PUS TC
|
||||
/// packets into it. It is generally assumed that the telecommand is stored in some pool structure,
|
||||
/// and the store address is passed as well. This allows efficient zero-copy forwarding of
|
||||
/// telecommands.
|
||||
pub trait ReceivesEcssPusTc {
|
||||
/// Generic trait for objects which can receive ECSS PUS telecommands.
|
||||
pub trait ReceivesEcssPusTc: Send {
|
||||
type Error;
|
||||
fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
@ -1,403 +0,0 @@
|
||||
//! CCSDS packet routing components.
|
||||
//!
|
||||
//! The routing components consist of two core components:
|
||||
//! 1. [CcsdsDistributor] component which dispatches received packets to a user-provided handler
|
||||
//! 2. [CcsdsPacketHandler] trait which should be implemented by the user-provided packet handler.
|
||||
//!
|
||||
//! The [CcsdsDistributor] implements the [ReceivesCcsdsTc] and [ReceivesTcCore] trait which allows to
|
||||
//! pass raw or CCSDS packets to it. Upon receiving a packet, it performs the following steps:
|
||||
//!
|
||||
//! 1. It tries to identify the target Application Process Identifier (APID) based on the
|
||||
//! respective CCSDS space packet header field. If that process fails, a [ByteConversionError] is
|
||||
//! returned to the user
|
||||
//! 2. If a valid APID is found and matches one of the APIDs provided by
|
||||
//! [CcsdsPacketHandler::valid_apids], it will pass the packet to the user provided
|
||||
//! [CcsdsPacketHandler::handle_known_apid] function. If no valid APID is found, the packet
|
||||
//! will be passed to the [CcsdsPacketHandler::handle_unknown_apid] function.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```rust
|
||||
//! use satrs::ValidatorU16Id;
|
||||
//! use satrs::tmtc::ccsds_distrib::{CcsdsPacketHandler, CcsdsDistributor};
|
||||
//! use satrs::tmtc::{ReceivesTc, ReceivesTcCore};
|
||||
//! use spacepackets::{CcsdsPacket, SpHeader};
|
||||
//! use spacepackets::ecss::WritablePusPacket;
|
||||
//! use spacepackets::ecss::tc::PusTcCreator;
|
||||
//!
|
||||
//! #[derive (Default)]
|
||||
//! struct ConcreteApidHandler {
|
||||
//! known_call_count: u32,
|
||||
//! unknown_call_count: u32
|
||||
//! }
|
||||
//!
|
||||
//! impl ConcreteApidHandler {
|
||||
//! fn mutable_foo(&mut self) {}
|
||||
//! }
|
||||
//!
|
||||
//! impl ValidatorU16Id for ConcreteApidHandler {
|
||||
//! fn validate(&self, apid: u16) -> bool { apid == 0x0002 }
|
||||
//! }
|
||||
//!
|
||||
//! impl CcsdsPacketHandler for ConcreteApidHandler {
|
||||
//! type Error = ();
|
||||
//! fn handle_packet_with_valid_apid(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
//! assert_eq!(sp_header.apid(), 0x002);
|
||||
//! assert_eq!(tc_raw.len(), 13);
|
||||
//! self.known_call_count += 1;
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! fn handle_packet_with_unknown_apid(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
//! assert_eq!(sp_header.apid(), 0x003);
|
||||
//! assert_eq!(tc_raw.len(), 13);
|
||||
//! self.unknown_call_count += 1;
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! let apid_handler = ConcreteApidHandler::default();
|
||||
//! let mut ccsds_distributor = CcsdsDistributor::new(apid_handler);
|
||||
//!
|
||||
//! // Create and pass PUS telecommand with a valid APID
|
||||
//! let sp_header = SpHeader::new_for_unseg_tc(0x002, 0x34, 0);
|
||||
//! let mut pus_tc = PusTcCreator::new_simple(sp_header, 17, 1, &[], true);
|
||||
//! let mut test_buf: [u8; 32] = [0; 32];
|
||||
//! let mut size = pus_tc
|
||||
//! .write_to_bytes(test_buf.as_mut_slice())
|
||||
//! .expect("Error writing TC to buffer");
|
||||
//! let tc_slice = &test_buf[0..size];
|
||||
//! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed");
|
||||
//!
|
||||
//! // Now pass a packet with an unknown APID to the distributor
|
||||
//! pus_tc.set_apid(0x003);
|
||||
//! size = pus_tc
|
||||
//! .write_to_bytes(test_buf.as_mut_slice())
|
||||
//! .expect("Error writing TC to buffer");
|
||||
//! let tc_slice = &test_buf[0..size];
|
||||
//! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed");
|
||||
//!
|
||||
//! // Retrieve the APID handler.
|
||||
//! let handler_ref = ccsds_distributor.packet_handler();
|
||||
//! assert_eq!(handler_ref.known_call_count, 1);
|
||||
//! assert_eq!(handler_ref.unknown_call_count, 1);
|
||||
//!
|
||||
//! // Mutable access to the handler.
|
||||
//! let mutable_handler_ref = ccsds_distributor.packet_handler_mut();
|
||||
//! mutable_handler_ref.mutable_foo();
|
||||
//! ```
|
||||
use crate::{
|
||||
tmtc::{ReceivesCcsdsTc, ReceivesTcCore},
|
||||
ValidatorU16Id,
|
||||
};
|
||||
use core::fmt::{Display, Formatter};
|
||||
use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader};
|
||||
#[cfg(feature = "std")]
|
||||
use std::error::Error;
|
||||
|
||||
/// Generic trait for a handler or dispatcher object handling CCSDS packets.
|
||||
///
|
||||
/// Users should implement this trait on their custom CCSDS packet handler and then pass a boxed
|
||||
/// instance of this handler to the [CcsdsDistributor]. The distributor will use the trait
|
||||
/// interface to dispatch received packets to the user based on the Application Process Identifier
|
||||
/// (APID) field of the CCSDS packet. The APID will be checked using the generic [ValidatorU16Id]
|
||||
/// trait.
|
||||
pub trait CcsdsPacketHandler: ValidatorU16Id {
|
||||
type Error;
|
||||
|
||||
fn handle_packet_with_valid_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
fn handle_packet_with_unknown_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler.
|
||||
pub struct CcsdsDistributor<PacketHandler: CcsdsPacketHandler<Error = E>, E> {
|
||||
/// User provided APID handler stored as a generic trait object.
|
||||
/// It can be cast back to the original concrete type using [Self::packet_handler] or
|
||||
/// the [Self::packet_handler_mut] method.
|
||||
packet_handler: PacketHandler,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum CcsdsError<E> {
|
||||
CustomError(E),
|
||||
ByteConversionError(ByteConversionError),
|
||||
}
|
||||
|
||||
impl<E: Display> Display for CcsdsError<E> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
Self::CustomError(e) => write!(f, "{e}"),
|
||||
Self::ByteConversionError(e) => write!(f, "{e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<E: Error> Error for CcsdsError<E> {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
Self::CustomError(e) => e.source(),
|
||||
Self::ByteConversionError(e) => e.source(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<PacketHandler: CcsdsPacketHandler<Error = E>, E: 'static> ReceivesCcsdsTc
|
||||
for CcsdsDistributor<PacketHandler, E>
|
||||
{
|
||||
type Error = CcsdsError<E>;
|
||||
|
||||
fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.dispatch_ccsds(header, tc_raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<PacketHandler: CcsdsPacketHandler<Error = E>, E: 'static> ReceivesTcCore
|
||||
for CcsdsDistributor<PacketHandler, E>
|
||||
{
|
||||
type Error = CcsdsError<E>;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
if tc_raw.len() < 7 {
|
||||
return Err(CcsdsError::ByteConversionError(
|
||||
ByteConversionError::FromSliceTooSmall {
|
||||
found: tc_raw.len(),
|
||||
expected: 7,
|
||||
},
|
||||
));
|
||||
}
|
||||
let (sp_header, _) =
|
||||
SpHeader::from_be_bytes(tc_raw).map_err(|e| CcsdsError::ByteConversionError(e))?;
|
||||
self.dispatch_ccsds(&sp_header, tc_raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<PacketHandler: CcsdsPacketHandler<Error = E>, E: 'static> CcsdsDistributor<PacketHandler, E> {
|
||||
pub fn new(packet_handler: PacketHandler) -> Self {
|
||||
CcsdsDistributor { packet_handler }
|
||||
}
|
||||
|
||||
pub fn packet_handler(&self) -> &PacketHandler {
|
||||
&self.packet_handler
|
||||
}
|
||||
|
||||
pub fn packet_handler_mut(&mut self) -> &mut PacketHandler {
|
||||
&mut self.packet_handler
|
||||
}
|
||||
|
||||
fn dispatch_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), CcsdsError<E>> {
|
||||
let valid_apid = self.packet_handler().validate(sp_header.apid());
|
||||
if valid_apid {
|
||||
self.packet_handler
|
||||
.handle_packet_with_valid_apid(sp_header, tc_raw)
|
||||
.map_err(|e| CcsdsError::CustomError(e))?;
|
||||
return Ok(());
|
||||
}
|
||||
self.packet_handler
|
||||
.handle_packet_with_unknown_apid(sp_header, tc_raw)
|
||||
.map_err(|e| CcsdsError::CustomError(e))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler};
|
||||
use spacepackets::ecss::tc::PusTcCreator;
|
||||
use spacepackets::ecss::WritablePusPacket;
|
||||
use spacepackets::CcsdsPacket;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::vec::Vec;
|
||||
|
||||
fn is_send<T: Send>(_: &T) {}
|
||||
|
||||
pub fn generate_ping_tc(buf: &mut [u8]) -> &[u8] {
|
||||
let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0);
|
||||
let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true);
|
||||
let size = pus_tc
|
||||
.write_to_bytes(buf)
|
||||
.expect("Error writing TC to buffer");
|
||||
assert_eq!(size, 13);
|
||||
&buf[0..size]
|
||||
}
|
||||
|
||||
pub fn generate_ping_tc_as_vec() -> Vec<u8> {
|
||||
let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0);
|
||||
PusTcCreator::new_simple(sph, 17, 1, &[], true)
|
||||
.to_vec()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
type SharedPacketQueue = Arc<Mutex<VecDeque<(u16, Vec<u8>)>>>;
|
||||
pub struct BasicApidHandlerSharedQueue {
|
||||
pub known_packet_queue: SharedPacketQueue,
|
||||
pub unknown_packet_queue: SharedPacketQueue,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct BasicApidHandlerOwnedQueue {
|
||||
pub known_packet_queue: VecDeque<(u16, Vec<u8>)>,
|
||||
pub unknown_packet_queue: VecDeque<(u16, Vec<u8>)>,
|
||||
}
|
||||
|
||||
impl ValidatorU16Id for BasicApidHandlerSharedQueue {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
[0x000, 0x002].contains(&packet_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl CcsdsPacketHandler for BasicApidHandlerSharedQueue {
|
||||
type Error = ();
|
||||
|
||||
fn handle_packet_with_valid_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
let mut vec = Vec::new();
|
||||
vec.extend_from_slice(tc_raw);
|
||||
self.known_packet_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push_back((sp_header.apid(), vec));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_packet_with_unknown_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
let mut vec = Vec::new();
|
||||
vec.extend_from_slice(tc_raw);
|
||||
self.unknown_packet_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push_back((sp_header.apid(), vec));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ValidatorU16Id for BasicApidHandlerOwnedQueue {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
[0x000, 0x002].contains(&packet_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl CcsdsPacketHandler for BasicApidHandlerOwnedQueue {
|
||||
type Error = ();
|
||||
|
||||
fn handle_packet_with_valid_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
let mut vec = Vec::new();
|
||||
vec.extend_from_slice(tc_raw);
|
||||
self.known_packet_queue.push_back((sp_header.apid(), vec));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_packet_with_unknown_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
let mut vec = Vec::new();
|
||||
vec.extend_from_slice(tc_raw);
|
||||
self.unknown_packet_queue.push_back((sp_header.apid(), vec));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_distribs_known_apid() {
|
||||
let known_packet_queue = Arc::new(Mutex::default());
|
||||
let unknown_packet_queue = Arc::new(Mutex::default());
|
||||
let apid_handler = BasicApidHandlerSharedQueue {
|
||||
known_packet_queue: known_packet_queue.clone(),
|
||||
unknown_packet_queue: unknown_packet_queue.clone(),
|
||||
};
|
||||
let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
|
||||
is_send(&ccsds_distrib);
|
||||
let mut test_buf: [u8; 32] = [0; 32];
|
||||
let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
|
||||
|
||||
ccsds_distrib.pass_tc(tc_slice).expect("Passing TC failed");
|
||||
let recvd = known_packet_queue.lock().unwrap().pop_front();
|
||||
assert!(unknown_packet_queue.lock().unwrap().is_empty());
|
||||
assert!(recvd.is_some());
|
||||
let (apid, packet) = recvd.unwrap();
|
||||
assert_eq!(apid, 0x002);
|
||||
assert_eq!(packet, tc_slice);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unknown_apid_handling() {
|
||||
let apid_handler = BasicApidHandlerOwnedQueue::default();
|
||||
let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
|
||||
let sph = SpHeader::new_for_unseg_tc(0x004, 0x34, 0);
|
||||
let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true);
|
||||
let mut test_buf: [u8; 32] = [0; 32];
|
||||
pus_tc
|
||||
.write_to_bytes(test_buf.as_mut_slice())
|
||||
.expect("Error writing TC to buffer");
|
||||
ccsds_distrib.pass_tc(&test_buf).expect("Passing TC failed");
|
||||
assert!(ccsds_distrib.packet_handler().known_packet_queue.is_empty());
|
||||
let apid_handler = ccsds_distrib.packet_handler_mut();
|
||||
let recvd = apid_handler.unknown_packet_queue.pop_front();
|
||||
assert!(recvd.is_some());
|
||||
let (apid, packet) = recvd.unwrap();
|
||||
assert_eq!(apid, 0x004);
|
||||
assert_eq!(packet.as_slice(), test_buf);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ccsds_distribution() {
|
||||
let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default());
|
||||
let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0);
|
||||
let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true);
|
||||
let tc_vec = pus_tc.to_vec().unwrap();
|
||||
ccsds_distrib
|
||||
.pass_ccsds(&sph, &tc_vec)
|
||||
.expect("passing CCSDS TC failed");
|
||||
let recvd = ccsds_distrib
|
||||
.packet_handler_mut()
|
||||
.known_packet_queue
|
||||
.pop_front();
|
||||
assert!(recvd.is_some());
|
||||
let recvd = recvd.unwrap();
|
||||
assert_eq!(recvd.0, 0x002);
|
||||
assert_eq!(recvd.1, tc_vec);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_distribution_short_packet_fails() {
|
||||
let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default());
|
||||
let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0);
|
||||
let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true);
|
||||
let tc_vec = pus_tc.to_vec().unwrap();
|
||||
let result = ccsds_distrib.pass_tc(&tc_vec[0..6]);
|
||||
assert!(result.is_err());
|
||||
let error = result.unwrap_err();
|
||||
if let CcsdsError::ByteConversionError(ByteConversionError::FromSliceTooSmall {
|
||||
found,
|
||||
expected,
|
||||
}) = error
|
||||
{
|
||||
assert_eq!(found, 6);
|
||||
assert_eq!(expected, 7);
|
||||
} else {
|
||||
panic!("Unexpected error variant");
|
||||
}
|
||||
}
|
||||
}
|
@ -5,111 +5,148 @@
|
||||
//! directly dispatch received packets to packet listeners based on packet fields like the CCSDS
|
||||
//! Application Process ID (APID) or the ECSS PUS service type. This allows for fast packet
|
||||
//! routing without the overhead and complication of using message queues. However, it also requires
|
||||
use std::sync::mpsc;
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
use downcast_rs::{impl_downcast, Downcast};
|
||||
use spacepackets::SpHeader;
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
pub mod ccsds_distrib;
|
||||
#[cfg(feature = "alloc")]
|
||||
pub mod pus_distrib;
|
||||
#[cfg(feature = "std")]
|
||||
pub mod tc_helper;
|
||||
pub mod tm_helper;
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler};
|
||||
#[cfg(feature = "alloc")]
|
||||
pub use pus_distrib::{PusDistributor, PusServiceDistributor};
|
||||
use crate::queue::GenericSendError;
|
||||
|
||||
/// Generic trait for object which can receive any telecommands in form of a raw bytestream, with
|
||||
/// no assumptions about the received protocol.
|
||||
///
|
||||
/// This trait is implemented by both the [crate::tmtc::pus_distrib::PusDistributor] and the
|
||||
/// [crate::tmtc::ccsds_distrib::CcsdsDistributor] which allows to pass the respective packets in
|
||||
/// raw byte format into them.
|
||||
pub trait ReceivesTcCore {
|
||||
/// This trait can also be implemented for sender components which forward the packet.
|
||||
/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender].
|
||||
pub trait ReceivesTc: Send {
|
||||
type Error;
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Extension trait of [ReceivesTcCore] which allows downcasting by implementing [Downcast] and
|
||||
/// is also sendable.
|
||||
#[cfg(feature = "std")]
|
||||
impl ReceivesTc for mpsc::Sender<alloc::vec::Vec<u8>> {
|
||||
type Error = GenericSendError;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.send(tc_raw.to_vec())
|
||||
.map_err(|_| GenericSendError::RxDisconnected)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl ReceivesTc for mpsc::SyncSender<alloc::vec::Vec<u8>> {
|
||||
type Error = GenericSendError;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.try_send(tc_raw.to_vec()).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Extension trait of [ReceivesTc] which allows downcasting by implementing [Downcast].
|
||||
#[cfg(feature = "alloc")]
|
||||
pub trait ReceivesTc: ReceivesTcCore + Downcast + Send {
|
||||
pub trait ReceivesTcExt: ReceivesTc + Downcast {
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast(&self) -> &dyn ReceivesTcCore<Error = Self::Error>;
|
||||
fn upcast(&self) -> &dyn ReceivesTc<Error = Self::Error>;
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore<Error = Self::Error>;
|
||||
fn upcast_mut(&mut self) -> &mut dyn ReceivesTc<Error = Self::Error>;
|
||||
}
|
||||
|
||||
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature
|
||||
/// is enabled.
|
||||
#[cfg(feature = "alloc")]
|
||||
impl<T> ReceivesTc for T
|
||||
impl<T> ReceivesTcExt for T
|
||||
where
|
||||
T: ReceivesTcCore + Send + 'static,
|
||||
T: ReceivesTc + Send + 'static,
|
||||
{
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast(&self) -> &dyn ReceivesTcCore<Error = Self::Error> {
|
||||
fn upcast(&self) -> &dyn ReceivesTc<Error = Self::Error> {
|
||||
self
|
||||
}
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore<Error = Self::Error> {
|
||||
fn upcast_mut(&mut self) -> &mut dyn ReceivesTc<Error = Self::Error> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
impl_downcast!(ReceivesTc assoc Error);
|
||||
impl_downcast!(ReceivesTcExt assoc Error);
|
||||
|
||||
/// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets
|
||||
/// for CCSDS File Delivery Protocol (CFDP) packets.
|
||||
///
|
||||
/// This trait is implemented by both the [crate::tmtc::pus_distrib::PusDistributor] and the
|
||||
/// [crate::tmtc::ccsds_distrib::CcsdsDistributor] which allows
|
||||
/// to pass the respective packets in raw byte format or in CCSDS format into them.
|
||||
pub trait ReceivesCcsdsTc {
|
||||
/// This trait can also be implemented for sender components which forward the packet.
|
||||
/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender].
|
||||
pub trait ReceivesCcsdsTc: Send {
|
||||
type Error;
|
||||
fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl ReceivesCcsdsTc for mpsc::Sender<alloc::vec::Vec<u8>> {
|
||||
type Error = GenericSendError;
|
||||
|
||||
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.send(tc_raw.to_vec())
|
||||
.map_err(|_| GenericSendError::RxDisconnected)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl ReceivesCcsdsTc for mpsc::SyncSender<alloc::vec::Vec<u8>> {
|
||||
type Error = GenericSendError;
|
||||
|
||||
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.try_send(tc_raw.to_vec()).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic trait for a TM packet source, with no restrictions on the type of TM.
|
||||
/// Implementors write the telemetry into the provided buffer and return the size of the telemetry.
|
||||
pub trait TmPacketSourceCore {
|
||||
pub trait TmPacketSource: Send {
|
||||
type Error;
|
||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
|
||||
}
|
||||
|
||||
/// Extension trait of [TmPacketSourceCore] which allows downcasting by implementing [Downcast] and
|
||||
/// is also sendable.
|
||||
/// Extension trait of [TmPacketSource] which allows downcasting by implementing [Downcast].
|
||||
#[cfg(feature = "alloc")]
|
||||
pub trait TmPacketSource: TmPacketSourceCore + Downcast + Send {
|
||||
pub trait TmPacketSourceExt: TmPacketSource + Downcast {
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast(&self) -> &dyn TmPacketSourceCore<Error = Self::Error>;
|
||||
fn upcast(&self) -> &dyn TmPacketSource<Error = Self::Error>;
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore<Error = Self::Error>;
|
||||
fn upcast_mut(&mut self) -> &mut dyn TmPacketSource<Error = Self::Error>;
|
||||
}
|
||||
|
||||
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature
|
||||
/// is enabled.
|
||||
#[cfg(feature = "alloc")]
|
||||
impl<T> TmPacketSource for T
|
||||
impl<T> TmPacketSourceExt for T
|
||||
where
|
||||
T: TmPacketSourceCore + Send + 'static,
|
||||
T: TmPacketSource + 'static,
|
||||
{
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast(&self) -> &dyn TmPacketSourceCore<Error = Self::Error> {
|
||||
fn upcast(&self) -> &dyn TmPacketSource<Error = Self::Error> {
|
||||
self
|
||||
}
|
||||
// Remove this once trait upcasting coercion has been implemented.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
|
||||
fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore<Error = Self::Error> {
|
||||
fn upcast_mut(&mut self) -> &mut dyn TmPacketSource<Error = Self::Error> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
@ -1,414 +0,0 @@
|
||||
//! ECSS PUS packet routing components.
|
||||
//!
|
||||
//! The routing components consist of two core components:
|
||||
//! 1. [PusDistributor] component which dispatches received packets to a user-provided handler.
|
||||
//! 2. [PusServiceDistributor] trait which should be implemented by the user-provided PUS packet
|
||||
//! handler.
|
||||
//!
|
||||
//! The [PusDistributor] implements the [ReceivesEcssPusTc], [ReceivesCcsdsTc] and the
|
||||
//! [ReceivesTcCore] trait which allows to pass raw packets, CCSDS packets and PUS TC packets into
|
||||
//! it. Upon receiving a packet, it performs the following steps:
|
||||
//!
|
||||
//! 1. It tries to extract the [SpHeader] and [spacepackets::ecss::tc::PusTcReader] objects from
|
||||
//! the raw bytestream. If this process fails, a [PusDistribError::PusError] is returned to the
|
||||
//! user.
|
||||
//! 2. If it was possible to extract both components, the packet will be passed to the
|
||||
//! [PusServiceDistributor::distribute_packet] method provided by the user.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```rust
|
||||
//! use spacepackets::ecss::WritablePusPacket;
|
||||
//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceDistributor};
|
||||
//! use satrs::tmtc::{ReceivesTc, ReceivesTcCore};
|
||||
//! use spacepackets::SpHeader;
|
||||
//! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader};
|
||||
//!
|
||||
//! struct ConcretePusHandler {
|
||||
//! handler_call_count: u32
|
||||
//! }
|
||||
//!
|
||||
//! // This is a very simple possible service provider. It increments an internal call count field,
|
||||
//! // which is used to verify the handler was called
|
||||
//! impl PusServiceDistributor for ConcretePusHandler {
|
||||
//! type Error = ();
|
||||
//! fn distribute_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||
//! assert_eq!(service, 17);
|
||||
//! assert_eq!(pus_tc.len_packed(), 13);
|
||||
//! self.handler_call_count += 1;
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! let service_handler = ConcretePusHandler {
|
||||
//! handler_call_count: 0
|
||||
//! };
|
||||
//! let mut pus_distributor = PusDistributor::new(service_handler);
|
||||
//!
|
||||
//! // Create and pass PUS ping telecommand with a valid APID
|
||||
//! let sp_header = SpHeader::new_for_unseg_tc(0x002, 0x34, 0);
|
||||
//! let mut pus_tc = PusTcCreator::new_simple(sp_header, 17, 1, &[], true);
|
||||
//! let mut test_buf: [u8; 32] = [0; 32];
|
||||
//! let mut size = pus_tc
|
||||
//! .write_to_bytes(test_buf.as_mut_slice())
|
||||
//! .expect("Error writing TC to buffer");
|
||||
//! let tc_slice = &test_buf[0..size];
|
||||
//!
|
||||
//! pus_distributor.pass_tc(tc_slice).expect("Passing PUS telecommand failed");
|
||||
//!
|
||||
//! // User helper function to retrieve concrete class. We check the call count here to verify
|
||||
//! // that the PUS ping telecommand was routed successfully.
|
||||
//! let concrete_handler = pus_distributor.service_distributor();
|
||||
//! assert_eq!(concrete_handler.handler_call_count, 1);
|
||||
//! ```
|
||||
use crate::pus::ReceivesEcssPusTc;
|
||||
use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore};
|
||||
use core::fmt::{Display, Formatter};
|
||||
use spacepackets::ecss::tc::PusTcReader;
|
||||
use spacepackets::ecss::{PusError, PusPacket};
|
||||
use spacepackets::SpHeader;
|
||||
#[cfg(feature = "std")]
|
||||
use std::error::Error;
|
||||
|
||||
/// Trait for a generic distributor object which can distribute PUS packets based on packet
|
||||
/// properties like the PUS service, space packet header or any other content of the PUS packet.
|
||||
pub trait PusServiceDistributor {
|
||||
type Error;
|
||||
fn distribute_packet(
|
||||
&mut self,
|
||||
service: u8,
|
||||
header: &SpHeader,
|
||||
pus_tc: &PusTcReader,
|
||||
) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Generic distributor object which dispatches received packets to a user provided handler.
|
||||
pub struct PusDistributor<ServiceDistributor: PusServiceDistributor<Error = E>, E> {
|
||||
service_distributor: ServiceDistributor,
|
||||
}
|
||||
|
||||
impl<ServiceDistributor: PusServiceDistributor<Error = E>, E>
|
||||
PusDistributor<ServiceDistributor, E>
|
||||
{
|
||||
pub fn new(service_provider: ServiceDistributor) -> Self {
|
||||
PusDistributor {
|
||||
service_distributor: service_provider,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum PusDistribError<E> {
|
||||
CustomError(E),
|
||||
PusError(PusError),
|
||||
}
|
||||
|
||||
impl<E: Display> Display for PusDistribError<E> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
PusDistribError::CustomError(e) => write!(f, "pus distribution error: {e}"),
|
||||
PusDistribError::PusError(e) => write!(f, "pus distribution error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<E: Error> Error for PusDistribError<E> {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
Self::CustomError(e) => e.source(),
|
||||
Self::PusError(e) => e.source(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static> ReceivesTcCore
|
||||
for PusDistributor<ServiceDistributor, E>
|
||||
{
|
||||
type Error = PusDistribError<E>;
|
||||
fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
// Convert to ccsds and call pass_ccsds
|
||||
let (sp_header, _) = SpHeader::from_be_bytes(tm_raw)
|
||||
.map_err(|e| PusDistribError::PusError(PusError::ByteConversion(e)))?;
|
||||
self.pass_ccsds(&sp_header, tm_raw)
|
||||
}
|
||||
}
|
||||
|
||||
impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static> ReceivesCcsdsTc
|
||||
for PusDistributor<ServiceDistributor, E>
|
||||
{
|
||||
type Error = PusDistribError<E>;
|
||||
fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?;
|
||||
self.pass_pus_tc(header, &tc)
|
||||
}
|
||||
}
|
||||
|
||||
impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static> ReceivesEcssPusTc
|
||||
for PusDistributor<ServiceDistributor, E>
|
||||
{
|
||||
type Error = PusDistribError<E>;
|
||||
fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||
self.service_distributor
|
||||
.distribute_packet(pus_tc.service(), header, pus_tc)
|
||||
.map_err(|e| PusDistribError::CustomError(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static>
|
||||
PusDistributor<ServiceDistributor, E>
|
||||
{
|
||||
pub fn service_distributor(&self) -> &ServiceDistributor {
|
||||
&self.service_distributor
|
||||
}
|
||||
|
||||
pub fn service_distributor_mut(&mut self) -> &mut ServiceDistributor {
|
||||
&mut self.service_distributor
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::queue::GenericSendError;
|
||||
use crate::tmtc::ccsds_distrib::tests::{
|
||||
generate_ping_tc, generate_ping_tc_as_vec, BasicApidHandlerOwnedQueue,
|
||||
BasicApidHandlerSharedQueue,
|
||||
};
|
||||
use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler};
|
||||
use crate::ValidatorU16Id;
|
||||
use alloc::format;
|
||||
use alloc::vec::Vec;
|
||||
use spacepackets::ecss::PusError;
|
||||
use spacepackets::CcsdsPacket;
|
||||
#[cfg(feature = "std")]
|
||||
use std::collections::VecDeque;
|
||||
#[cfg(feature = "std")]
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
fn is_send<T: Send>(_: &T) {}
|
||||
|
||||
pub struct PacketInfo {
|
||||
pub service: u8,
|
||||
pub apid: u16,
|
||||
pub packet: Vec<u8>,
|
||||
}
|
||||
|
||||
struct PusHandlerSharedQueue(Arc<Mutex<VecDeque<PacketInfo>>>);
|
||||
|
||||
#[derive(Default)]
|
||||
struct PusHandlerOwnedQueue(VecDeque<PacketInfo>);
|
||||
|
||||
impl PusServiceDistributor for PusHandlerSharedQueue {
|
||||
type Error = PusError;
|
||||
fn distribute_packet(
|
||||
&mut self,
|
||||
service: u8,
|
||||
sp_header: &SpHeader,
|
||||
pus_tc: &PusTcReader,
|
||||
) -> Result<(), Self::Error> {
|
||||
let mut packet: Vec<u8> = Vec::new();
|
||||
packet.extend_from_slice(pus_tc.raw_data());
|
||||
self.0
|
||||
.lock()
|
||||
.expect("Mutex lock failed")
|
||||
.push_back(PacketInfo {
|
||||
service,
|
||||
apid: sp_header.apid(),
|
||||
packet,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl PusServiceDistributor for PusHandlerOwnedQueue {
|
||||
type Error = PusError;
|
||||
fn distribute_packet(
|
||||
&mut self,
|
||||
service: u8,
|
||||
sp_header: &SpHeader,
|
||||
pus_tc: &PusTcReader,
|
||||
) -> Result<(), Self::Error> {
|
||||
let mut packet: Vec<u8> = Vec::new();
|
||||
packet.extend_from_slice(pus_tc.raw_data());
|
||||
self.0.push_back(PacketInfo {
|
||||
service,
|
||||
apid: sp_header.apid(),
|
||||
packet,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct ApidHandlerShared {
|
||||
pub pus_distrib: PusDistributor<PusHandlerSharedQueue, PusError>,
|
||||
pub handler_base: BasicApidHandlerSharedQueue,
|
||||
}
|
||||
|
||||
struct ApidHandlerOwned {
|
||||
pub pus_distrib: PusDistributor<PusHandlerOwnedQueue, PusError>,
|
||||
handler_base: BasicApidHandlerOwnedQueue,
|
||||
}
|
||||
|
||||
macro_rules! apid_handler_impl {
|
||||
() => {
|
||||
type Error = PusError;
|
||||
|
||||
fn handle_packet_with_valid_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
self.handler_base
|
||||
.handle_packet_with_valid_apid(&sp_header, tc_raw)
|
||||
.ok()
|
||||
.expect("Unexpected error");
|
||||
match self.pus_distrib.pass_ccsds(&sp_header, tc_raw) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => match e {
|
||||
PusDistribError::CustomError(_) => Ok(()),
|
||||
PusDistribError::PusError(e) => Err(e),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_packet_with_unknown_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
self.handler_base
|
||||
.handle_packet_with_unknown_apid(&sp_header, tc_raw)
|
||||
.ok()
|
||||
.expect("Unexpected error");
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl ValidatorU16Id for ApidHandlerOwned {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
[0x000, 0x002].contains(&packet_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl ValidatorU16Id for ApidHandlerShared {
|
||||
fn validate(&self, packet_id: u16) -> bool {
|
||||
[0x000, 0x002].contains(&packet_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl CcsdsPacketHandler for ApidHandlerOwned {
|
||||
apid_handler_impl!();
|
||||
}
|
||||
|
||||
impl CcsdsPacketHandler for ApidHandlerShared {
|
||||
apid_handler_impl!();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pus_distribution_as_raw_packet() {
|
||||
let mut pus_distrib = PusDistributor::new(PusHandlerOwnedQueue::default());
|
||||
let tc = generate_ping_tc_as_vec();
|
||||
let result = pus_distrib.pass_tc(&tc);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(pus_distrib.service_distributor_mut().0.len(), 1);
|
||||
let packet_info = pus_distrib.service_distributor_mut().0.pop_front().unwrap();
|
||||
assert_eq!(packet_info.service, 17);
|
||||
assert_eq!(packet_info.apid, 0x002);
|
||||
assert_eq!(packet_info.packet, tc);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pus_distribution_combined_handler() {
|
||||
let known_packet_queue = Arc::new(Mutex::default());
|
||||
let unknown_packet_queue = Arc::new(Mutex::default());
|
||||
let pus_queue = Arc::new(Mutex::default());
|
||||
let pus_handler = PusHandlerSharedQueue(pus_queue.clone());
|
||||
let handler_base = BasicApidHandlerSharedQueue {
|
||||
known_packet_queue: known_packet_queue.clone(),
|
||||
unknown_packet_queue: unknown_packet_queue.clone(),
|
||||
};
|
||||
|
||||
let pus_distrib = PusDistributor::new(pus_handler);
|
||||
is_send(&pus_distrib);
|
||||
let apid_handler = ApidHandlerShared {
|
||||
pus_distrib,
|
||||
handler_base,
|
||||
};
|
||||
let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
|
||||
let mut test_buf: [u8; 32] = [0; 32];
|
||||
let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
|
||||
|
||||
// Pass packet to distributor
|
||||
ccsds_distrib
|
||||
.pass_tc(tc_slice)
|
||||
.expect("Passing TC slice failed");
|
||||
let recvd_ccsds = known_packet_queue.lock().unwrap().pop_front();
|
||||
assert!(unknown_packet_queue.lock().unwrap().is_empty());
|
||||
assert!(recvd_ccsds.is_some());
|
||||
let (apid, packet) = recvd_ccsds.unwrap();
|
||||
assert_eq!(apid, 0x002);
|
||||
assert_eq!(packet.as_slice(), tc_slice);
|
||||
let recvd_pus = pus_queue.lock().unwrap().pop_front();
|
||||
assert!(recvd_pus.is_some());
|
||||
let packet_info = recvd_pus.unwrap();
|
||||
assert_eq!(packet_info.service, 17);
|
||||
assert_eq!(packet_info.apid, 0x002);
|
||||
assert_eq!(packet_info.packet, tc_slice);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accessing_combined_distributor() {
|
||||
let pus_handler = PusHandlerOwnedQueue::default();
|
||||
let handler_base = BasicApidHandlerOwnedQueue::default();
|
||||
let pus_distrib = PusDistributor::new(pus_handler);
|
||||
|
||||
let apid_handler = ApidHandlerOwned {
|
||||
pus_distrib,
|
||||
handler_base,
|
||||
};
|
||||
let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
|
||||
|
||||
let mut test_buf: [u8; 32] = [0; 32];
|
||||
let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
|
||||
|
||||
ccsds_distrib
|
||||
.pass_tc(tc_slice)
|
||||
.expect("Passing TC slice failed");
|
||||
|
||||
let apid_handler_casted_back = ccsds_distrib.packet_handler_mut();
|
||||
assert!(!apid_handler_casted_back
|
||||
.handler_base
|
||||
.known_packet_queue
|
||||
.is_empty());
|
||||
let handler_owned_queue = apid_handler_casted_back
|
||||
.pus_distrib
|
||||
.service_distributor_mut();
|
||||
assert!(!handler_owned_queue.0.is_empty());
|
||||
let packet_info = handler_owned_queue.0.pop_front().unwrap();
|
||||
assert_eq!(packet_info.service, 17);
|
||||
assert_eq!(packet_info.apid, 0x002);
|
||||
assert_eq!(packet_info.packet, tc_slice);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pus_distrib_error_custom_error() {
|
||||
let error = PusDistribError::CustomError(GenericSendError::RxDisconnected);
|
||||
let error_string = format!("{}", error);
|
||||
assert_eq!(
|
||||
error_string,
|
||||
"pus distribution error: rx side has disconnected"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pus_distrib_error_pus_error() {
|
||||
let error = PusDistribError::<GenericSendError>::PusError(PusError::CrcCalculationMissing);
|
||||
let error_string = format!("{}", error);
|
||||
assert_eq!(
|
||||
error_string,
|
||||
"pus distribution error: crc16 was not calculated"
|
||||
);
|
||||
}
|
||||
}
|
109
satrs/src/tmtc/tc_helper.rs
Normal file
109
satrs/src/tmtc/tc_helper.rs
Normal file
@ -0,0 +1,109 @@
|
||||
use std::sync::mpsc;
|
||||
|
||||
use spacepackets::{ecss::tc::PusTcReader, SpHeader};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::{
|
||||
pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError},
|
||||
pus::ReceivesEcssPusTc,
|
||||
queue::GenericSendError,
|
||||
};
|
||||
|
||||
use super::{ReceivesCcsdsTc, ReceivesTc};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
pub enum StoreAndSendError {
|
||||
#[error("Store error: {0}")]
|
||||
Store(#[from] StoreError),
|
||||
#[error("Genreric send error: {0}")]
|
||||
Send(#[from] GenericSendError),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedTcPool(pub SharedStaticMemoryPool);
|
||||
|
||||
impl SharedTcPool {
|
||||
pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
|
||||
let mut pg = self.0.write().expect("error locking TC store");
|
||||
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
|
||||
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
|
||||
})?;
|
||||
Ok(addr)
|
||||
}
|
||||
|
||||
pub fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<StoreAddr, StoreError> {
|
||||
self.add_raw_tc(tc_raw)
|
||||
}
|
||||
|
||||
pub fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<StoreAddr, StoreError> {
|
||||
let mut pg = self.0.write().expect("error locking TC store");
|
||||
let addr = pg.free_element(tc_raw.len(), |buf| {
|
||||
buf[0..tc_raw.len()].copy_from_slice(tc_raw);
|
||||
})?;
|
||||
Ok(addr)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TcSenderSharedPool {
|
||||
pub tc_source: mpsc::SyncSender<StoreAddr>,
|
||||
pub shared_pool: SharedTcPool,
|
||||
}
|
||||
|
||||
impl TcSenderSharedPool {
|
||||
pub fn new(tc_source: mpsc::SyncSender<StoreAddr>, shared_pool: SharedTcPool) -> Self {
|
||||
Self {
|
||||
tc_source,
|
||||
shared_pool,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn shared_pool(&self) -> SharedStaticMemoryPool {
|
||||
self.shared_pool.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesTc for TcSenderSharedPool {
|
||||
type Error = StoreAndSendError;
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let addr = self.shared_pool.add_raw_tc(tc_raw)?;
|
||||
self.tc_source.try_send(addr).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesEcssPusTc for TcSenderSharedPool {
|
||||
type Error = StoreAndSendError;
|
||||
|
||||
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||
let addr = self.shared_pool.add_pus_tc(pus_tc)?;
|
||||
self.tc_source.try_send(addr).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesCcsdsTc for TcSenderSharedPool {
|
||||
type Error = StoreAndSendError;
|
||||
|
||||
fn pass_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let addr = self.shared_pool.add_ccsds_tc(sp_header, tc_raw)?;
|
||||
self.tc_source.try_send(addr).map_err(|e| match e {
|
||||
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
|
||||
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// TODO: Add tests for shared pool TC sender component.
|
||||
}
|
@ -28,7 +28,7 @@ use satrs::{
|
||||
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig,
|
||||
TcpSpacepacketsServer, TcpTmtcInCobsServer,
|
||||
},
|
||||
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
||||
tmtc::{ReceivesTc, TmPacketSource},
|
||||
};
|
||||
use spacepackets::{
|
||||
ecss::{tc::PusTcCreator, WritablePusPacket},
|
||||
@ -66,7 +66,7 @@ struct SyncTcCacher {
|
||||
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl ReceivesTcCore for SyncTcCacher {
|
||||
impl ReceivesTc for SyncTcCacher {
|
||||
type Error = ();
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
@ -89,7 +89,7 @@ impl SyncTmSource {
|
||||
}
|
||||
}
|
||||
|
||||
impl TmPacketSourceCore for SyncTmSource {
|
||||
impl TmPacketSource for SyncTmSource {
|
||||
type Error = ();
|
||||
|
||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user