sat-rs/fsrc-core/tests/pus_verification.rs

189 lines
8.1 KiB
Rust
Raw Normal View History

2022-09-10 13:34:04 +02:00
use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool};
2022-09-07 01:06:32 +02:00
use fsrc_core::pus::verification::{
2022-09-07 14:57:44 +02:00
CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg,
VerificationReporterWithSender,
2022-09-07 01:06:32 +02:00
};
2022-09-07 11:02:45 +02:00
use hashbrown::HashMap;
2022-09-07 01:06:32 +02:00
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm;
use spacepackets::SpHeader;
2022-09-07 11:02:45 +02:00
use std::sync::{Arc, Mutex, RwLock};
2022-09-07 01:06:32 +02:00
use std::thread;
2022-09-07 13:37:09 +02:00
use std::time::Duration;
2022-09-06 00:17:52 +02:00
2022-09-07 01:06:32 +02:00
const TEST_APID: u16 = 0x03;
const FIXED_STAMP: [u8; 7] = [0; 7];
const PACKETS_SENT: u8 = 8;
2022-09-07 14:57:44 +02:00
/// This test also shows how the verification report could be used in a multi-threaded context,
/// wrapping it into an [Arc] and [Mutex] and then passing it to two threads.
///
/// - The first thread generates a acceptance, a start, two steps and one completion report
/// - The second generates an acceptance and start success report and a completion failure
/// - The third thread is the verification receiver. In the test case, it verifies the other two
/// threads have sent the correct expected verification reports
2022-09-07 01:06:32 +02:00
#[test]
fn test_shared_reporter() {
2022-09-11 20:51:14 +02:00
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
2022-09-07 01:06:32 +02:00
// Shared pool object to store the verification PUS telemetry
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
2022-09-10 13:34:04 +02:00
let shared_tm_pool: SharedPool =
Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
2022-09-07 11:02:45 +02:00
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = crossbeam_channel::bounded(5);
2022-09-07 14:57:44 +02:00
let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone());
let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new(
cfg,
Box::new(sender),
)));
let reporter_with_sender_1 = reporter_with_sender_0.clone();
// For test purposes, we retrieve the request ID from the TCs and pass them to the receiver
// tread.
2022-09-07 11:08:11 +02:00
let req_id_0;
let req_id_1;
2022-09-07 01:06:32 +02:00
2022-09-07 11:02:45 +02:00
let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3);
let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3);
{
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap();
2022-09-07 01:06:32 +02:00
let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
2022-09-07 11:02:45 +02:00
let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true);
2022-09-07 11:08:11 +02:00
req_id_0 = RequestId::new(&pus_tc_0);
2022-09-07 11:02:45 +02:00
let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap();
2022-09-11 20:51:14 +02:00
pus_tc_0.write_to_bytes(&mut buf).unwrap();
2022-09-07 11:02:45 +02:00
tx_tc_0.send(addr).unwrap();
let mut sph = SpHeader::tc(TEST_APID, 1, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(5, 1);
let pus_tc_1 = PusTc::new(&mut sph, tc_header, None, true);
2022-09-07 11:08:11 +02:00
req_id_1 = RequestId::new(&pus_tc_1);
2022-09-07 11:02:45 +02:00
let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap();
2022-09-11 20:51:14 +02:00
pus_tc_1.write_to_bytes(&mut buf).unwrap();
2022-09-07 11:02:45 +02:00
tx_tc_1.send(addr).unwrap();
}
let verif_sender_0 = thread::spawn(move || {
let mut tc_buf: [u8; 1024] = [0; 1024];
2022-09-07 13:37:09 +02:00
let tc_addr = rx_tc_0
.recv_timeout(Duration::from_millis(20))
.expect("Receive timeout");
2022-09-07 11:02:45 +02:00
let tc_len;
{
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
let buf = pg.read().unwrap();
tc_len = buf.len();
tc_buf[0..tc_len].copy_from_slice(buf);
}
2022-09-13 10:43:07 +02:00
let (_tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap();
2022-09-07 14:35:33 +02:00
let accepted_token;
{
2022-09-07 14:57:44 +02:00
let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed");
2022-09-07 14:35:33 +02:00
let token = mg.add_tc_with_req_id(req_id_0);
accepted_token = mg
2022-09-07 14:57:44 +02:00
.acceptance_success(token, &FIXED_STAMP)
2022-09-07 14:35:33 +02:00
.expect("Acceptance success failed");
}
2022-09-07 11:02:45 +02:00
// Do some start handling here
2022-09-07 14:35:33 +02:00
let started_token;
{
2022-09-07 14:57:44 +02:00
let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed");
2022-09-07 14:35:33 +02:00
started_token = mg
2022-09-07 14:57:44 +02:00
.start_success(accepted_token, &FIXED_STAMP)
2022-09-07 14:35:33 +02:00
.expect("Start success failed");
// Do some step handling here
2022-09-07 14:57:44 +02:00
mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0))
.expect("Start success failed");
2022-09-07 14:35:33 +02:00
}
// Finish up
2022-09-07 14:57:44 +02:00
let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed");
mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1))
.expect("Start success failed");
mg.completion_success(started_token, &FIXED_STAMP)
2022-09-07 01:06:32 +02:00
.expect("Completion success failed");
});
let verif_sender_1 = thread::spawn(move || {
2022-09-07 11:02:45 +02:00
let mut tc_buf: [u8; 1024] = [0; 1024];
2022-09-07 13:37:09 +02:00
let tc_addr = rx_tc_1
.recv_timeout(Duration::from_millis(20))
.expect("Receive timeout");
2022-09-07 11:02:45 +02:00
let tc_len;
{
let mut tc_guard = shared_tc_pool_1.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
let buf = pg.read().unwrap();
tc_len = buf.len();
tc_buf[0..tc_len].copy_from_slice(buf);
}
2022-09-13 10:43:07 +02:00
let (tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap();
2022-09-07 14:57:44 +02:00
let mut mg = reporter_with_sender_1
.lock()
.expect("Locking reporter failed");
2022-09-07 11:02:45 +02:00
let token = mg.add_tc(&tc);
2022-09-07 01:06:32 +02:00
let accepted_token = mg
2022-09-07 14:57:44 +02:00
.acceptance_success(token, &FIXED_STAMP)
2022-09-07 01:06:32 +02:00
.expect("Acceptance success failed");
let started_token = mg
2022-09-07 14:57:44 +02:00
.start_success(accepted_token, &FIXED_STAMP)
2022-09-07 01:06:32 +02:00
.expect("Start success failed");
let fail_code = EcssEnumU16::new(2);
let params = FailParams::new(&FIXED_STAMP, &fail_code, None);
2022-09-07 14:57:44 +02:00
mg.completion_failure(started_token, params)
2022-09-07 01:06:32 +02:00
.expect("Completion success failed");
});
let verif_receiver = thread::spawn(move || {
let mut packet_counter = 0;
let mut tm_buf: [u8; 1024] = [0; 1024];
2022-09-07 11:02:45 +02:00
let mut verif_map = HashMap::new();
2022-09-07 01:06:32 +02:00
while packet_counter < PACKETS_SENT {
2022-09-07 13:37:09 +02:00
let verif_addr = rx
2022-09-07 14:28:55 +02:00
.recv_timeout(Duration::from_millis(50))
2022-09-07 13:37:09 +02:00
.expect("Packet reception timeout");
2022-09-07 14:35:33 +02:00
let tm_len;
{
let mut rg = shared_tm_pool.write().expect("Error locking shared pool");
let store_guard = rg.read_with_guard(verif_addr);
let slice = store_guard.read().expect("Error reading TM slice");
tm_len = slice.len();
tm_buf[0..tm_len].copy_from_slice(slice);
}
2022-09-13 10:43:07 +02:00
let (pus_tm, _) =
PusTm::from_bytes(&tm_buf[0..tm_len], 7).expect("Error reading verification TM");
2022-09-07 01:06:32 +02:00
let req_id = RequestId::from_bytes(
&pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES],
)
.unwrap();
2022-09-07 11:02:45 +02:00
if !verif_map.contains_key(&req_id) {
let mut content = Vec::new();
content.push(pus_tm.subservice());
verif_map.insert(req_id, content);
} else {
let content = verif_map.get_mut(&req_id).unwrap();
content.push(pus_tm.subservice())
}
2022-09-07 01:06:32 +02:00
packet_counter += 1;
}
2022-09-07 11:08:11 +02:00
for (req_id, content) in verif_map {
2022-09-07 14:28:55 +02:00
if req_id == req_id_1 {
2022-09-07 11:02:45 +02:00
assert_eq!(content[0], 1);
assert_eq!(content[1], 3);
assert_eq!(content[2], 8);
2022-09-07 14:28:55 +02:00
} else if req_id == req_id_0 {
2022-09-07 11:02:45 +02:00
assert_eq!(content[0], 1);
assert_eq!(content[1], 3);
assert_eq!(content[2], 5);
assert_eq!(content[3], 5);
assert_eq!(content[4], 7);
2022-09-07 11:08:11 +02:00
} else {
panic!("Unexpected request ID {:?}", req_id);
2022-09-07 11:02:45 +02:00
}
}
2022-09-07 01:06:32 +02:00
});
verif_sender_0.join().expect("Joining thread 0 failed");
verif_sender_1.join().expect("Joining thread 1 failed");
2022-09-07 14:28:55 +02:00
verif_receiver.join().expect("Joining thread 2 failed");
2022-09-07 01:06:32 +02:00
}