re-worked TMTC modules
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
This commit is contained in:
@ -7,7 +7,7 @@ use satrs::params::U32Pair;
|
||||
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
|
||||
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher};
|
||||
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
|
||||
use satrs::pus::PusTmAsVec;
|
||||
use satrs::pus::PacketAsVec;
|
||||
use satrs::request::UniqueApidTargetId;
|
||||
use spacepackets::ecss::tm::PusTmReader;
|
||||
use spacepackets::ecss::{PusError, PusPacket};
|
||||
@ -37,7 +37,7 @@ fn test_threaded_usage() {
|
||||
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
|
||||
event_man.subscribe_all(pus_event_man_send_provider.target_id());
|
||||
event_man.add_sender(pus_event_man_send_provider);
|
||||
let (event_tx, event_rx) = mpsc::channel::<PusTmAsVec>();
|
||||
let (event_tx, event_rx) = mpsc::channel::<PacketAsVec>();
|
||||
let reporter =
|
||||
EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed");
|
||||
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
|
||||
|
@ -7,13 +7,12 @@ pub mod crossbeam_test {
|
||||
FailParams, RequestId, VerificationReporter, VerificationReporterCfg,
|
||||
VerificationReportingProvider,
|
||||
};
|
||||
use satrs::pus::TmInSharedPoolSenderWithCrossbeam;
|
||||
use satrs::tmtc::tm_helper::SharedTmPool;
|
||||
use satrs::tmtc::{PacketSenderWithSharedPool, SharedStaticMemoryPool};
|
||||
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
|
||||
use spacepackets::ecss::tm::PusTmReader;
|
||||
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, WritablePusPacket};
|
||||
use spacepackets::SpHeader;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::RwLock;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -36,12 +35,15 @@ pub mod crossbeam_test {
|
||||
// Shared pool object to store the verification PUS telemetry
|
||||
let pool_cfg =
|
||||
StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)], false);
|
||||
let shared_tm_pool = SharedTmPool::new(StaticMemoryPool::new(pool_cfg.clone()));
|
||||
let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg)));
|
||||
let shared_tc_pool_1 = shared_tc_pool_0.clone();
|
||||
let shared_tm_pool =
|
||||
SharedStaticMemoryPool::new(RwLock::new(StaticMemoryPool::new(pool_cfg.clone())));
|
||||
let shared_tc_pool =
|
||||
SharedStaticMemoryPool::new(RwLock::new(StaticMemoryPool::new(pool_cfg)));
|
||||
let shared_tc_pool_1 = shared_tc_pool.clone();
|
||||
let (tx, rx) = crossbeam_channel::bounded(10);
|
||||
let sender_0 = TmInSharedPoolSenderWithCrossbeam::new(shared_tm_pool.clone(), tx.clone());
|
||||
let sender_1 = sender_0.clone();
|
||||
let sender =
|
||||
PacketSenderWithSharedPool::new_with_shared_packet_pool(tx.clone(), &shared_tm_pool);
|
||||
let sender_1 = sender.clone();
|
||||
let mut reporter_with_sender_0 = VerificationReporter::new(TEST_COMPONENT_ID_0.id(), &cfg);
|
||||
let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
|
||||
// For test purposes, we retrieve the request ID from the TCs and pass them to the receiver
|
||||
@ -52,7 +54,7 @@ pub mod crossbeam_test {
|
||||
let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3);
|
||||
let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3);
|
||||
{
|
||||
let mut tc_guard = shared_tc_pool_0.write().unwrap();
|
||||
let mut tc_guard = shared_tc_pool.write().unwrap();
|
||||
let sph = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0);
|
||||
let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
|
||||
let pus_tc_0 = PusTcCreator::new_no_app_data(sph, tc_header, true);
|
||||
@ -81,7 +83,7 @@ pub mod crossbeam_test {
|
||||
.expect("Receive timeout");
|
||||
let tc_len;
|
||||
{
|
||||
let mut tc_guard = shared_tc_pool_0.write().unwrap();
|
||||
let mut tc_guard = shared_tc_pool.write().unwrap();
|
||||
let pg = tc_guard.read_with_guard(tc_addr);
|
||||
tc_len = pg.read(&mut tc_buf).unwrap();
|
||||
}
|
||||
@ -89,24 +91,24 @@ pub mod crossbeam_test {
|
||||
|
||||
let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0);
|
||||
let accepted_token = reporter_with_sender_0
|
||||
.acceptance_success(&sender_0, token, &FIXED_STAMP)
|
||||
.acceptance_success(&sender, token, &FIXED_STAMP)
|
||||
.expect("Acceptance success failed");
|
||||
|
||||
// Do some start handling here
|
||||
let started_token = reporter_with_sender_0
|
||||
.start_success(&sender_0, accepted_token, &FIXED_STAMP)
|
||||
.start_success(&sender, accepted_token, &FIXED_STAMP)
|
||||
.expect("Start success failed");
|
||||
// Do some step handling here
|
||||
reporter_with_sender_0
|
||||
.step_success(&sender_0, &started_token, &FIXED_STAMP, EcssEnumU8::new(0))
|
||||
.step_success(&sender, &started_token, &FIXED_STAMP, EcssEnumU8::new(0))
|
||||
.expect("Start success failed");
|
||||
|
||||
// Finish up
|
||||
reporter_with_sender_0
|
||||
.step_success(&sender_0, &started_token, &FIXED_STAMP, EcssEnumU8::new(1))
|
||||
.step_success(&sender, &started_token, &FIXED_STAMP, EcssEnumU8::new(1))
|
||||
.expect("Start success failed");
|
||||
reporter_with_sender_0
|
||||
.completion_success(&sender_0, started_token, &FIXED_STAMP)
|
||||
.completion_success(&sender, started_token, &FIXED_STAMP)
|
||||
.expect("Completion success failed");
|
||||
});
|
||||
|
||||
@ -145,9 +147,8 @@ pub mod crossbeam_test {
|
||||
.recv_timeout(Duration::from_millis(50))
|
||||
.expect("Packet reception timeout");
|
||||
let tm_len;
|
||||
let shared_tm_store = shared_tm_pool.clone_backing_pool();
|
||||
{
|
||||
let mut rg = shared_tm_store.write().expect("Error locking shared pool");
|
||||
let mut rg = shared_tm_pool.write().expect("Error locking shared pool");
|
||||
let store_guard = rg.read_with_guard(tm_in_pool.store_addr);
|
||||
tm_len = store_guard
|
||||
.read(&mut tm_buf)
|
||||
|
@ -17,7 +17,7 @@ use core::{
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
||||
sync::Mutex,
|
||||
sync::{mpsc, Mutex},
|
||||
thread,
|
||||
};
|
||||
|
||||
@ -28,7 +28,8 @@ use satrs::{
|
||||
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig,
|
||||
TcpSpacepacketsServer, TcpTmtcInCobsServer,
|
||||
},
|
||||
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
||||
tmtc::PacketSource,
|
||||
ComponentId,
|
||||
};
|
||||
use spacepackets::{
|
||||
ecss::{tc::PusTcCreator, WritablePusPacket},
|
||||
@ -61,21 +62,6 @@ impl ConnectionFinishedHandler {
|
||||
assert!(self.connection_info.is_empty());
|
||||
}
|
||||
}
|
||||
#[derive(Default, Clone)]
|
||||
struct SyncTcCacher {
|
||||
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl ReceivesTcCore for SyncTcCacher {
|
||||
type Error = ();
|
||||
|
||||
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
||||
println!("Received TC: {:x?}", tc_raw);
|
||||
tc_queue.push_back(tc_raw.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct SyncTmSource {
|
||||
@ -89,7 +75,7 @@ impl SyncTmSource {
|
||||
}
|
||||
}
|
||||
|
||||
impl TmPacketSourceCore for SyncTmSource {
|
||||
impl PacketSource for SyncTmSource {
|
||||
type Error = ();
|
||||
|
||||
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||
@ -111,20 +97,27 @@ impl TmPacketSourceCore for SyncTmSource {
|
||||
}
|
||||
}
|
||||
|
||||
const TCP_SERVER_ID: ComponentId = 0x05;
|
||||
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
|
||||
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1];
|
||||
const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
|
||||
#[test]
|
||||
fn test_cobs_server() {
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let (tc_sender, tc_receiver) = mpsc::channel();
|
||||
let mut tm_source = SyncTmSource::default();
|
||||
// Insert a telemetry packet which will be read back by the client at a later stage.
|
||||
tm_source.add_tm(&INVERTED_PACKET);
|
||||
let mut tcp_server = TcpTmtcInCobsServer::new(
|
||||
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
|
||||
ServerConfig::new(
|
||||
TCP_SERVER_ID,
|
||||
AUTO_PORT_ADDR,
|
||||
Duration::from_millis(2),
|
||||
1024,
|
||||
1024,
|
||||
),
|
||||
tm_source,
|
||||
tc_receiver.clone(),
|
||||
tc_sender.clone(),
|
||||
ConnectionFinishedHandler::default(),
|
||||
None,
|
||||
)
|
||||
@ -190,13 +183,10 @@ fn test_cobs_server() {
|
||||
panic!("connection was not handled properly");
|
||||
}
|
||||
// Check that the packet was received and decoded successfully.
|
||||
let mut tc_queue = tc_receiver
|
||||
.tc_queue
|
||||
.lock()
|
||||
.expect("locking tc queue failed");
|
||||
assert_eq!(tc_queue.len(), 1);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
||||
drop(tc_queue);
|
||||
let tc_with_sender = tc_receiver.try_recv().expect("no TC received");
|
||||
assert_eq!(tc_with_sender.packet, SIMPLE_PACKET);
|
||||
assert_eq!(tc_with_sender.sender_id, TCP_SERVER_ID);
|
||||
matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
|
||||
}
|
||||
|
||||
const TEST_APID_0: u16 = 0x02;
|
||||
@ -204,7 +194,7 @@ const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0);
|
||||
|
||||
#[test]
|
||||
fn test_ccsds_server() {
|
||||
let tc_receiver = SyncTcCacher::default();
|
||||
let (tc_sender, tc_receiver) = mpsc::channel();
|
||||
let mut tm_source = SyncTmSource::default();
|
||||
let sph = SpHeader::new_for_unseg_tc(TEST_APID_0, 0, 0);
|
||||
let verif_tm = PusTcCreator::new_simple(sph, 1, 1, &[], true);
|
||||
@ -213,9 +203,15 @@ fn test_ccsds_server() {
|
||||
let mut packet_id_lookup = HashSet::new();
|
||||
packet_id_lookup.insert(TEST_PACKET_ID_0);
|
||||
let mut tcp_server = TcpSpacepacketsServer::new(
|
||||
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
|
||||
ServerConfig::new(
|
||||
TCP_SERVER_ID,
|
||||
AUTO_PORT_ADDR,
|
||||
Duration::from_millis(2),
|
||||
1024,
|
||||
1024,
|
||||
),
|
||||
tm_source,
|
||||
tc_receiver.clone(),
|
||||
tc_sender,
|
||||
packet_id_lookup,
|
||||
ConnectionFinishedHandler::default(),
|
||||
None,
|
||||
@ -282,7 +278,8 @@ fn test_ccsds_server() {
|
||||
panic!("connection was not handled properly");
|
||||
}
|
||||
// Check that TC has arrived.
|
||||
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
|
||||
assert_eq!(tc_queue.len(), 1);
|
||||
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
|
||||
let tc_with_sender = tc_receiver.try_recv().expect("no TC received");
|
||||
assert_eq!(tc_with_sender.packet, tc_0);
|
||||
assert_eq!(tc_with_sender.sender_id, TCP_SERVER_ID);
|
||||
matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty));
|
||||
}
|
||||
|
Reference in New Issue
Block a user