restructure the crate
Some checks are pending
Rust/sat-rs/pipeline/head Build started...

- Add new shared subcrate satrs-shared to split off some shared
  components not expected to change very often.
- Renmame `satrs-core` to `satrs`. It is expected that sat-rs will remain
  the primary crate, so the core information is superfluous, and core also
  implies stability, which will not be the case for some time.
This commit is contained in:
2024-02-12 14:27:22 +01:00
parent f58a4eaee5
commit 3ea3a7acb6
86 changed files with 180 additions and 151 deletions

164
satrs/tests/hk_helpers.rs Normal file
View File

@ -0,0 +1,164 @@
#![allow(dead_code)]
use core::mem::size_of;
use serde::{Deserialize, Serialize};
use spacepackets::ecss::{PfcReal, PfcUnsigned, Ptc};
use spacepackets::time::cds::TimeProvider;
use spacepackets::time::{CcsdsTimeProvider, TimeWriter};
enum NumOfParamsInfo {
/// The parameter entry is a scalar field
Scalar = 0b00,
/// The parameter entry is a vector, and its length field is one byte wide (max. 255 entries)
VecLenFieldOneByte = 0b01,
/// The parameter entry is a vecotr, and its length field is two bytes wide (max. 65565 entries)
VecLenFieldTwoBytes = 0b10,
/// The parameter entry is a matrix, and its length field contains a one byte row number
/// and a one byte column number.
MatrixRowsAndColumns = 0b11,
}
const HAS_VALIDITY_MASK: u8 = 1 << 7;
struct ParamWithValidity<T> {
valid: bool,
val: T,
}
struct TestMgmHk {
temp: f32,
mgm_vals: [u16; 3],
}
struct TestMgmHkWithIndividualValidity {
temp: ParamWithValidity<f32>,
mgm_vals: ParamWithValidity<[u16; 3]>,
}
#[derive(Serialize, Deserialize)]
struct TestMgmHkWithGroupValidity {
last_valid_stamp: TimeProvider,
valid: bool,
temp: f32,
mgm_vals: [u16; 3],
}
impl TestMgmHk {
pub fn write_to_be_bytes(&self, buf: &mut [u8]) -> Result<usize, ()> {
let mut curr_idx = 0;
buf[curr_idx..curr_idx + size_of::<f32>()].copy_from_slice(&self.temp.to_be_bytes());
curr_idx += size_of::<f32>();
for val in self.mgm_vals {
buf[curr_idx..curr_idx + size_of::<u16>()].copy_from_slice(&val.to_be_bytes());
curr_idx += size_of::<u16>();
}
Ok(curr_idx)
}
}
/// This could in principle be auto-generated.
impl TestMgmHkWithIndividualValidity {
pub fn write_to_be_bytes_self_describing(&self, buf: &mut [u8]) -> Result<usize, ()> {
let mut curr_idx = 0;
buf[curr_idx] = 0;
buf[curr_idx] |= HAS_VALIDITY_MASK | (self.temp.valid as u8) << 6;
curr_idx += 1;
buf[curr_idx] = Ptc::Real as u8;
curr_idx += 1;
buf[curr_idx] = PfcReal::Float as u8;
curr_idx += 1;
buf[curr_idx..curr_idx + size_of::<f32>()].copy_from_slice(&self.temp.val.to_be_bytes());
curr_idx += size_of::<f32>();
buf[curr_idx] = 0;
buf[curr_idx] |= HAS_VALIDITY_MASK
| (self.mgm_vals.valid as u8) << 6
| (NumOfParamsInfo::VecLenFieldOneByte as u8) << 4;
curr_idx += 1;
buf[curr_idx] = Ptc::UnsignedInt as u8;
curr_idx += 1;
buf[curr_idx] = PfcUnsigned::TwoBytes as u8;
curr_idx += 1;
buf[curr_idx] = 3;
curr_idx += 1;
for val in self.mgm_vals.val {
buf[curr_idx..curr_idx + size_of::<u16>()].copy_from_slice(&val.to_be_bytes());
curr_idx += size_of::<u16>();
}
Ok(curr_idx)
}
}
impl TestMgmHkWithGroupValidity {
pub fn write_to_be_bytes_self_describing(&self, buf: &mut [u8]) -> Result<usize, ()> {
let mut curr_idx = 0;
buf[curr_idx] = self.valid as u8;
curr_idx += 1;
self.last_valid_stamp
.write_to_bytes(&mut buf[curr_idx..curr_idx + self.last_valid_stamp.len_as_bytes()])
.unwrap();
curr_idx += self.last_valid_stamp.len_as_bytes();
buf[curr_idx] = 0;
curr_idx += 1;
buf[curr_idx] = Ptc::Real as u8;
curr_idx += 1;
buf[curr_idx] = PfcReal::Float as u8;
curr_idx += 1;
buf[curr_idx..curr_idx + size_of::<f32>()].copy_from_slice(&self.temp.to_be_bytes());
curr_idx += size_of::<f32>();
buf[curr_idx] = 0;
buf[curr_idx] |= (NumOfParamsInfo::VecLenFieldOneByte as u8) << 4;
curr_idx += 1;
buf[curr_idx] = Ptc::UnsignedInt as u8;
curr_idx += 1;
buf[curr_idx] = PfcUnsigned::TwoBytes as u8;
curr_idx += 1;
buf[curr_idx] = 3;
for val in self.mgm_vals {
buf[curr_idx..curr_idx + size_of::<u16>()].copy_from_slice(&val.to_be_bytes());
curr_idx += size_of::<u16>();
}
Ok(curr_idx)
}
}
#[test]
pub fn main() {
let mut raw_buf: [u8; 32] = [0; 32];
let mgm_hk = TestMgmHk {
temp: 20.0,
mgm_vals: [0x1f1f, 0x2f2f, 0x3f3f],
};
// 4 byte float + 3 * 2 bytes MGM values
let written = mgm_hk.write_to_be_bytes(&mut raw_buf).unwrap();
assert_eq!(written, 10);
let mgm_hk_individual_validity = TestMgmHkWithIndividualValidity {
temp: ParamWithValidity {
valid: true,
val: 20.0,
},
mgm_vals: ParamWithValidity {
valid: true,
val: [0x1f1f, 0x2f2f, 0x3f3f],
},
};
let written = mgm_hk_individual_validity
.write_to_be_bytes_self_describing(&mut raw_buf)
.unwrap();
// 3 byte float description, 4 byte float, 4 byte MGM val description, 3 * 2 bytes MGM values
assert_eq!(written, 17);
// The easiest and probably best approach, trading off big advantages for TM downlink capacity:
// Use a JSON format
let mgm_hk_group_validity = TestMgmHkWithGroupValidity {
last_valid_stamp: TimeProvider::from_now_with_u16_days().unwrap(),
valid: false,
temp: 20.0,
mgm_vals: [0x1f1f, 0x2f2f, 0x3f3f],
};
let mgm_as_json_str = serde_json::to_string(&mgm_hk_group_validity).unwrap();
println!(
"JSON string with length {}: {}",
mgm_as_json_str.len(),
mgm_as_json_str
);
}

38
satrs/tests/pools.rs Normal file
View File

@ -0,0 +1,38 @@
use satrs::pool::{PoolGuard, PoolProvider, StaticMemoryPool, StaticPoolConfig, StoreAddr};
use std::ops::DerefMut;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3];
#[test]
fn threaded_usage() {
let pool_cfg = StaticPoolConfig::new(vec![(16, 6), (32, 3), (8, 12)], false);
let shared_pool = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg)));
let shared_clone = shared_pool.clone();
let (tx, rx): (Sender<StoreAddr>, Receiver<StoreAddr>) = mpsc::channel();
let jh0 = thread::spawn(move || {
let mut dummy = shared_pool.write().unwrap();
let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed");
tx.send(addr).expect("Sending store address failed");
});
let jh1 = thread::spawn(move || {
let addr;
{
addr = rx.recv().expect("Receiving store address failed");
let mut pool_access = shared_clone.write().unwrap();
let pg = PoolGuard::new(pool_access.deref_mut(), addr);
let mut read_buf: [u8; 4] = [0; 4];
let read_bytes = pg.read(&mut read_buf).expect("Reading failed");
assert_eq!(read_buf, DUMMY_DATA);
assert_eq!(read_bytes, 4);
}
let pool_access = shared_clone.read().unwrap();
assert!(!pool_access.has_element_at(&addr).expect("Invalid address"));
});
jh0.join().unwrap();
jh1.join().unwrap();
}

View File

@ -0,0 +1,94 @@
#![allow(dead_code, unused_imports)]
use satrs::events::{
EventU32, EventU32TypedSev, GenericEvent, HasSeverity, LargestEventRaw, LargestGroupIdRaw,
Severity, SeverityInfo, SeverityLow, SeverityMedium,
};
use std::convert::AsRef;
#[derive(Debug)]
struct GroupIdIntrospection {
name: &'static str,
id: LargestGroupIdRaw,
}
#[derive(Debug)]
struct EventIntrospection {
name: &'static str,
group_id: GroupIdIntrospection,
event: &'static EventU32,
info: &'static str,
}
//#[event(descr="This is some info event")]
const INFO_EVENT_0: EventU32TypedSev<SeverityInfo> = EventU32TypedSev::const_new(0, 0);
const INFO_EVENT_0_ERASED: EventU32 = EventU32::const_from_info(INFO_EVENT_0);
// This is ideally auto-generated
const INFO_EVENT_0_INTROSPECTION: EventIntrospection = EventIntrospection {
name: "INFO_EVENT_0",
group_id: GroupIdIntrospection {
id: 0,
name: "Group ID 0 without name",
},
event: &INFO_EVENT_0_ERASED,
info: "This is some info event",
};
//#[event(descr="This is some low severity event")]
const SOME_LOW_SEV_EVENT: EventU32TypedSev<SeverityLow> = EventU32TypedSev::const_new(0, 12);
//const EVENT_LIST: [&'static Event; 2] = [&INFO_EVENT_0, &SOME_LOW_SEV_EVENT];
//#[event_group]
const TEST_GROUP_NAME: u16 = 1;
// Auto-generated?
const TEST_GROUP_NAME_NAME: &str = "TEST_GROUP_NAME";
//#[event(desc="Some medium severity event")]
const MEDIUM_SEV_EVENT_IN_OTHER_GROUP: EventU32TypedSev<SeverityMedium> =
EventU32TypedSev::const_new(TEST_GROUP_NAME, 0);
const MEDIUM_SEV_EVENT_IN_OTHER_GROUP_REDUCED: EventU32 =
EventU32::const_from_medium(MEDIUM_SEV_EVENT_IN_OTHER_GROUP);
// Also auto-generated
const MEDIUM_SEV_EVENT_IN_OTHER_GROUP_INTROSPECTION: EventIntrospection = EventIntrospection {
name: "MEDIUM_SEV_EVENT_IN_OTHER_GROUP",
group_id: GroupIdIntrospection {
name: TEST_GROUP_NAME_NAME,
id: TEST_GROUP_NAME,
},
event: &MEDIUM_SEV_EVENT_IN_OTHER_GROUP_REDUCED,
info: "Some medium severity event",
};
const CONST_SLICE: &'static [u8] = &[0, 1, 2, 3];
const INTROSPECTION_FOR_TEST_GROUP_0: [&EventIntrospection; 2] =
[&INFO_EVENT_0_INTROSPECTION, &INFO_EVENT_0_INTROSPECTION];
//const INTROSPECTION_FOR_TABLE: &'static [&EventIntrospection] = &INTROSPECTION_FOR_TEST_GROUP_0;
const INTROSPECTION_FOR_TEST_GROUP_NAME: [&EventIntrospection; 1] =
[&MEDIUM_SEV_EVENT_IN_OTHER_GROUP_INTROSPECTION];
//const BLAH: &'static [&EventIntrospection] = &INTROSPECTION_FOR_TEST_GROUP_NAME;
const ALL_EVENTS: [&[&EventIntrospection]; 2] = [
&INTROSPECTION_FOR_TEST_GROUP_0,
&INTROSPECTION_FOR_TEST_GROUP_NAME,
];
#[test]
fn main() {
//let test = stringify!(INFO_EVENT);
//println!("{:?}", test);
//for event in EVENT_LIST {
// println!("{:?}", event);
//}
//for events in ALL_EVENTS.into_iter().flatten() {
// dbg!("{:?}", events);
//}
//for introspection_info in INTROSPECTION_FOR_TEST_GROUP {
// dbg!("{:?}", introspection_info);
//}
//let test_struct =
}

158
satrs/tests/pus_events.rs Normal file
View File

@ -0,0 +1,158 @@
use satrs::event_man::{
EventManagerWithMpscQueue, MpscEventU32Receiver, MpscEventU32SendProvider, SendEventProvider,
};
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
use satrs::params::U32Pair;
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs::pus::event_man::{DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher};
use satrs::pus::MpscTmAsVecSender;
use spacepackets::ecss::tm::PusTmReader;
use spacepackets::ecss::{PusError, PusPacket};
use std::sync::mpsc::{channel, SendError, TryRecvError};
use std::thread;
const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::const_new(1, 0);
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
const EMPTY_STAMP: [u8; 7] = [0; 7];
#[derive(Debug, Clone)]
pub enum CustomTmSenderError {
SendError(SendError<Vec<u8>>),
PusError(PusError),
}
#[test]
fn test_threaded_usage() {
let (event_sender, event_man_receiver) = channel();
let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_receiver));
let (pus_event_man_tx, pus_event_man_rx) = channel();
let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.id());
event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = channel();
let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed");
let backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
let mut pus_event_man = PusEventDispatcher::new(reporter, Box::new(backend));
// PUS + Generic event manager thread
let jh0 = thread::spawn(move || {
let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx);
let mut event_cnt = 0;
let mut params_array: [u8; 128] = [0; 128];
loop {
let res = event_man.try_event_handling();
assert!(res.is_ok());
match pus_event_man_rx.try_recv() {
Ok((event, aux_data)) => {
let mut gen_event = |aux_data| {
pus_event_man.generate_pus_event_tm_generic(
&mut sender,
&EMPTY_STAMP,
event,
aux_data,
)
};
let res = if let Some(aux_data) = aux_data {
match aux_data {
Params::Heapless(heapless) => match heapless {
ParamsHeapless::Raw(raw) => {
raw.write_to_be_bytes(&mut params_array)
.expect("Writing raw parameter failed");
gen_event(Some(&params_array[0..raw.raw_len()]))
}
ParamsHeapless::EcssEnum(e) => {
e.write_to_be_bytes(&mut params_array)
.expect("Writing ECSS enum failed");
gen_event(Some(&params_array[0..e.raw_len()]))
}
},
Params::Vec(vec) => gen_event(Some(vec.as_slice())),
Params::String(str) => gen_event(Some(str.as_bytes())),
Params::Store(_) => gen_event(None),
}
} else {
gen_event(None)
};
event_cnt += 1;
assert!(res.is_ok());
assert!(res.unwrap());
if event_cnt == 2 {
break;
}
}
Err(e) => {
if let TryRecvError::Disconnected = e {
panic!("Event receiver disconnected!")
}
}
}
}
});
// Event sender and TM checker thread
let jh1 = thread::spawn(move || {
event_sender
.send((INFO_EVENT.into(), None))
.expect("Sending info event failed");
loop {
match event_rx.try_recv() {
// Event TM received successfully
Ok(event_tm) => {
let tm =
PusTmReader::new(event_tm.as_slice(), 7).expect("Deserializing TM failed");
assert_eq!(tm.0.service(), 5);
assert_eq!(tm.0.subservice(), 1);
let src_data = tm.0.source_data();
assert!(!src_data.is_empty());
assert_eq!(src_data.len(), 4);
let event =
EventU32::from(u32::from_be_bytes(src_data[0..4].try_into().unwrap()));
assert_eq!(event, INFO_EVENT);
break;
}
Err(e) => {
if let TryRecvError::Disconnected = e {
panic!("Event sender disconnected!")
}
}
}
}
event_sender
.send((
LOW_SEV_EVENT.into(),
Some(Params::Heapless((2_u32, 3_u32).into())),
))
.expect("Sending low severity event failed");
loop {
match event_rx.try_recv() {
// Event TM received successfully
Ok(event_tm) => {
let tm =
PusTmReader::new(event_tm.as_slice(), 7).expect("Deserializing TM failed");
assert_eq!(tm.0.service(), 5);
assert_eq!(tm.0.subservice(), 2);
let src_data = tm.0.source_data();
assert!(!src_data.is_empty());
assert_eq!(src_data.len(), 12);
let event =
EventU32::from(u32::from_be_bytes(src_data[0..4].try_into().unwrap()));
assert_eq!(event, LOW_SEV_EVENT);
let u32_pair: U32Pair =
src_data[4..].try_into().expect("Creating U32Pair failed");
assert_eq!(u32_pair.0, 2);
assert_eq!(u32_pair.1, 3);
break;
}
Err(e) => {
if let TryRecvError::Disconnected = e {
panic!("Event sender disconnected!")
}
}
}
}
});
jh0.join().expect("Joining manager thread failed");
jh1.join().expect("Joining creator thread failed");
}

View File

@ -0,0 +1,190 @@
//#[cfg(feature = "crossbeam")]
pub mod crossbeam_test {
use hashbrown::HashMap;
use satrs::pool::{PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};
use satrs::pus::verification::{
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
};
use satrs::pus::CrossbeamTmInStoreSender;
use satrs::tmtc::tm_helper::SharedTmPool;
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
use spacepackets::ecss::tm::PusTmReader;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, WritablePusPacket};
use spacepackets::SpHeader;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
const TEST_APID: u16 = 0x03;
const FIXED_STAMP: [u8; 7] = [0; 7];
const PACKETS_SENT: u8 = 8;
/// This test also shows how the verification report could be used in a multi-threaded context,
/// wrapping it into an [Arc] and [Mutex] and then passing it to two threads.
///
/// - The first thread generates a acceptance, a start, two steps and one completion report
/// - The second generates an acceptance and start success report and a completion failure
/// - The third thread is the verification receiver. In the test case, it verifies the other two
/// threads have sent the correct expected verification reports
#[test]
fn test_shared_reporter() {
// We use a synced sequence count provider here because both verification reporters have the
// the same APID. If they had distinct APIDs, the more correct approach would be to have
// each reporter have an own sequence count provider.
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
// Shared pool object to store the verification PUS telemetry
let pool_cfg =
StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)], false);
let shared_tm_pool = SharedTmPool::new(StaticMemoryPool::new(pool_cfg.clone()));
let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = crossbeam_channel::bounded(10);
let sender =
CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone());
let mut reporter_with_sender_0 =
VerificationReporterWithSender::new(&cfg, Box::new(sender));
let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
// For test purposes, we retrieve the request ID from the TCs and pass them to the receiver
// tread.
let req_id_0;
let req_id_1;
let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3);
let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3);
{
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
let pus_tc_0 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true);
req_id_0 = RequestId::new(&pus_tc_0);
let addr = tc_guard
.free_element(pus_tc_0.len_written(), |buf| {
pus_tc_0.write_to_bytes(buf).unwrap();
})
.unwrap();
tx_tc_0.send(addr).unwrap();
let mut sph = SpHeader::tc_unseg(TEST_APID, 1, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(5, 1);
let pus_tc_1 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true);
req_id_1 = RequestId::new(&pus_tc_1);
let addr = tc_guard
.free_element(pus_tc_0.len_written(), |buf| {
pus_tc_1.write_to_bytes(buf).unwrap();
})
.unwrap();
tx_tc_1.send(addr).unwrap();
}
let verif_sender_0 = thread::spawn(move || {
let mut tc_buf: [u8; 1024] = [0; 1024];
let tc_addr = rx_tc_0
.recv_timeout(Duration::from_millis(20))
.expect("Receive timeout");
let tc_len;
{
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
tc_len = pg.read(&mut tc_buf).unwrap();
}
let (_tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap();
let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0);
let accepted_token = reporter_with_sender_0
.acceptance_success(token, Some(&FIXED_STAMP))
.expect("Acceptance success failed");
// Do some start handling here
let started_token = reporter_with_sender_0
.start_success(accepted_token, Some(&FIXED_STAMP))
.expect("Start success failed");
// Do some step handling here
reporter_with_sender_0
.step_success(&started_token, Some(&FIXED_STAMP), EcssEnumU8::new(0))
.expect("Start success failed");
// Finish up
reporter_with_sender_0
.step_success(&started_token, Some(&FIXED_STAMP), EcssEnumU8::new(1))
.expect("Start success failed");
reporter_with_sender_0
.completion_success(started_token, Some(&FIXED_STAMP))
.expect("Completion success failed");
});
let verif_sender_1 = thread::spawn(move || {
let mut tc_buf: [u8; 1024] = [0; 1024];
let tc_addr = rx_tc_1
.recv_timeout(Duration::from_millis(20))
.expect("Receive timeout");
let tc_len;
{
let mut tc_guard = shared_tc_pool_1.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
tc_len = pg.read(&mut tc_buf).unwrap();
}
let (tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap();
let token = reporter_with_sender_1.add_tc(&tc);
let accepted_token = reporter_with_sender_1
.acceptance_success(token, Some(&FIXED_STAMP))
.expect("Acceptance success failed");
let started_token = reporter_with_sender_1
.start_success(accepted_token, Some(&FIXED_STAMP))
.expect("Start success failed");
let fail_code = EcssEnumU16::new(2);
let params = FailParams::new(Some(&FIXED_STAMP), &fail_code, None);
reporter_with_sender_1
.completion_failure(started_token, params)
.expect("Completion success failed");
});
let verif_receiver = thread::spawn(move || {
let mut packet_counter = 0;
let mut tm_buf: [u8; 1024] = [0; 1024];
let mut verif_map = HashMap::new();
while packet_counter < PACKETS_SENT {
let verif_addr = rx
.recv_timeout(Duration::from_millis(50))
.expect("Packet reception timeout");
let tm_len;
let shared_tm_store = shared_tm_pool.clone_backing_pool();
{
let mut rg = shared_tm_store.write().expect("Error locking shared pool");
let store_guard = rg.read_with_guard(verif_addr);
tm_len = store_guard
.read(&mut tm_buf)
.expect("Error reading TM slice");
}
let (pus_tm, _) =
PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM");
let req_id =
RequestId::from_bytes(&pus_tm.source_data()[0..RequestId::SIZE_AS_BYTES])
.expect("reading request ID from PUS TM source data failed");
if !verif_map.contains_key(&req_id) {
let content = vec![pus_tm.subservice()];
verif_map.insert(req_id, content);
} else {
let content = verif_map.get_mut(&req_id).unwrap();
content.push(pus_tm.subservice())
}
packet_counter += 1;
}
for (req_id, content) in verif_map {
if req_id == req_id_1 {
assert_eq!(content[0], 1);
assert_eq!(content[1], 3);
assert_eq!(content[2], 8);
} else if req_id == req_id_0 {
assert_eq!(content[0], 1);
assert_eq!(content[1], 3);
assert_eq!(content[2], 5);
assert_eq!(content[3], 5);
assert_eq!(content[4], 7);
} else {
panic!("Unexpected request ID {:?}", req_id);
}
}
});
verif_sender_0.join().expect("Joining thread 0 failed");
verif_sender_1.join().expect("Joining thread 1 failed");
verif_receiver.join().expect("Joining thread 2 failed");
}
}

241
satrs/tests/tcp_servers.rs Normal file
View File

@ -0,0 +1,241 @@
//! This serves as both an integration test and an example application showcasing all major
//! features of the TCP COBS server by performing following steps:
//!
//! 1. It defines both a TC receiver and a TM source which are [Sync].
//! 2. A telemetry packet is inserted into the TM source. The packet will be handled by the
//! TCP server after handling all TCs.
//! 3. It instantiates the TCP server on localhost with automatic port assignment and assigns
//! the TC receiver and TM source created previously.
//! 4. It moves the TCP server to a different thread and calls the
//! [TcpTmtcInCobsServer::handle_next_connection] call inside that thread
//! 5. The main threads connects to the server, sends a test telecommand and then reads back
//! the test telemetry insertd in to the TM source previously.
use core::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use std::{
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::Mutex,
thread,
};
use hashbrown::HashSet;
use satrs::{
encoding::cobs::encode_packet_with_cobs,
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer},
tmtc::{ReceivesTcCore, TmPacketSourceCore},
};
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
PacketId, SpHeader,
};
use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
#[derive(Default, Clone)]
struct SyncTcCacher {
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTcCore for SyncTcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
println!("Received TC: {:x?}", tc_raw);
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Clone)]
struct SyncTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl SyncTmSource {
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
tm_queue.push_back(tm.to_vec());
}
}
impl TmPacketSourceCore for SyncTmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
if !tm_queue.is_empty() {
let next_vec = tm_queue.front().unwrap();
if buffer.len() < next_vec.len() {
panic!(
"provided buffer too small, must be at least {} bytes",
next_vec.len()
);
}
println!("Sending and encoding TM: {:x?}", next_vec);
let next_vec = tm_queue.pop_front().unwrap();
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
return Ok(next_vec.len());
}
Ok(0)
}
}
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1];
const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
#[test]
fn test_cobs_server() {
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
// Insert a telemetry packet which will be read back by the client at a later stage.
tm_source.add_tm(&INVERTED_PACKET);
let mut tcp_server = TcpTmtcInCobsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_receiver.clone(),
)
.expect("TCP server generation failed");
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1, "No TC received");
assert_eq!(conn_result.num_sent_tms, 1, "No TM received");
// Signal the main thread we are done.
set_if_done.store(true, Ordering::Relaxed);
});
// Send TC to server now.
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.write_all(&encoded_buf[..current_idx])
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 16] = [0; 16];
let read_len = stream.read(&mut read_buf).expect("read failed");
drop(stream);
// 1 byte encoding overhead, 2 sentinel bytes.
assert_eq!(read_len, 8);
assert_eq!(read_buf[0], 0);
assert_eq!(read_buf[read_len - 1], 0);
let decoded_len =
cobs::decode_in_place(&mut read_buf[1..read_len]).expect("COBS decoding failed");
assert_eq!(decoded_len, 5);
// Skip first sentinel byte.
assert_eq!(&read_buf[1..1 + INVERTED_PACKET.len()], &INVERTED_PACKET);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
}
const TEST_APID_0: u16 = 0x02;
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
#[test]
fn test_ccsds_server() {
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
let tm_0 = verif_tm.to_vec().expect("tm generation failed");
tm_source.add_tm(&tm_0);
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = TcpSpacepacketsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_receiver.clone(),
Box::new(packet_id_lookup),
)
.expect("TCP server generation failed");
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 1);
set_if_done.store(true, Ordering::Relaxed);
});
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting reas timeout failed");
// Send ping telecommand.
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let tc_0 = ping_tc.to_vec().expect("packet creation failed");
stream
.write_all(&tc_0)
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
// Now read all the telemetry from the server.
let mut read_buf: [u8; 16] = [0; 16];
let mut read_len_total = 0;
// Timeout ensures this does not block forever.
while read_len_total < tm_0.len() {
let read_len = stream.read(&mut read_buf).expect("read failed");
read_len_total += read_len;
assert_eq!(read_buf[..read_len], tm_0);
}
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
}