Merge pull request 'Larger update' (#49) from this-is-complex into main

Reviewed-on: #49
This commit is contained in:
Robin Müller 2023-07-08 15:02:41 +02:00
commit d59718bb04
32 changed files with 2925 additions and 1936 deletions

View File

@ -13,12 +13,12 @@ categories = ["aerospace", "aerospace::space-protocols", "no-std", "hardware-sup
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
delegate = ">=0.8, <0.10"
delegate = ">=0.8, <0.11"
paste = "1"
embed-doc-image = "0.1"
[dependencies.num_enum]
version = "0.5"
version = "0.6"
default-features = false
[dependencies.dyn-clone]
@ -26,7 +26,7 @@ version = "1"
optional = true
[dependencies.hashbrown]
version = "0.13"
version = "0.14"
optional = true
[dependencies.heapless]
@ -51,16 +51,20 @@ version= "0.5"
default-features = false
optional = true
[dependencies.thiserror]
version = "1"
optional = true
[dependencies.serde]
version = "1"
default-features = false
optional = true
[dependencies.spacepackets]
# version = "0.5.4"
version = "0.6"
# path = "../spacepackets"
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
rev = "ef4244c8cb5c"
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
# rev = "4485ed26699d32"
default-features = false
[dev-dependencies]
@ -70,7 +74,7 @@ once_cell = "1.13"
serde_json = "1"
[dev-dependencies.postcard]
version = "1.0"
version = "1"
[features]
default = ["std"]
@ -82,7 +86,8 @@ std = [
"crossbeam-channel/std",
"serde/std",
"spacepackets/std",
"num_enum/std"
"num_enum/std",
"thiserror"
]
alloc = [
"serde/alloc",

View File

@ -51,7 +51,7 @@ doc = ::embed_doc_image::embed_image!("event_man_arch", "images/event_man_arch.p
//!
//! # Examples
//!
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-core/tests/pus_events.rs)
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/pus_events.rs)
//! for a concrete example using multi-threading where events are routed to
//! different threads.
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};

View File

@ -630,7 +630,6 @@ impl<Severity: HasSeverity> PartialEq<EventU16TypedSev<Severity>> for EventU16 {
mod tests {
use super::EventU32TypedSev;
use super::*;
use spacepackets::ecss::EcssEnumeration;
use spacepackets::ByteConversionError;
use std::mem::size_of;

View File

@ -18,6 +18,7 @@ use std::vec::Vec;
///
/// ```
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
/// use spacepackets::ecss::SerializablePusPacket;
/// use satrs_core::hal::host::udp_server::UdpTcServer;
/// use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore};
/// use spacepackets::SpHeader;
@ -141,6 +142,7 @@ impl<E: 'static> UdpTcServer<E> {
mod tests {
use crate::hal::host::udp_server::{ReceiveResult, UdpTcServer};
use crate::tmtc::ReceivesTcCore;
use spacepackets::ecss::SerializablePusPacket;
use spacepackets::tc::PusTc;
use spacepackets::SpHeader;
use std::boxed::Box;

View File

@ -51,6 +51,7 @@
//! assert_eq!(example_obj.id, obj_id);
//! assert_eq!(example_obj.dummy, 42);
//! ```
use crate::tmtc::TargetId;
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
#[cfg(feature = "alloc")]
@ -64,7 +65,7 @@ use std::error::Error;
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
pub struct ObjectId {
pub id: u32,
pub id: TargetId,
pub name: &'static str,
}

View File

@ -138,6 +138,16 @@ pub struct StoreAddr {
pub(crate) packet_idx: NumBlocks,
}
impl Display for StoreAddr {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(
f,
"StoreAddr(pool index: {}, packet index: {})",
self.pool_idx, self.packet_idx
)
}
}
impl StoreAddr {
pub const INVALID_ADDR: u32 = 0xFFFFFFFF;
@ -402,7 +412,8 @@ impl PoolProvider for LocalPool {
fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> {
let curr_size = self.addr_check(addr)?;
let raw_pos = self.raw_pos(addr).unwrap();
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size];
let block =
&mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size];
Ok(block)
}
@ -779,4 +790,25 @@ mod tests {
drop(rw_guard);
assert!(!local_pool.has_element_at(&addr).expect("Invalid address"));
}
#[test]
fn modify_pool_index_above_0() {
let mut local_pool = basic_small_pool();
let test_buf_0: [u8; 4] = [1; 4];
let test_buf_1: [u8; 4] = [2; 4];
let test_buf_2: [u8; 4] = [3; 4];
let test_buf_3: [u8; 4] = [4; 4];
let addr0 = local_pool.add(&test_buf_0).expect("Adding data failed");
let addr1 = local_pool.add(&test_buf_1).expect("Adding data failed");
let addr2 = local_pool.add(&test_buf_2).expect("Adding data failed");
let addr3 = local_pool.add(&test_buf_3).expect("Adding data failed");
let tm0_raw = local_pool.modify(&addr0).expect("Modifying data failed");
assert_eq!(tm0_raw, test_buf_0);
let tm1_raw = local_pool.modify(&addr1).expect("Modifying data failed");
assert_eq!(tm1_raw, test_buf_1);
let tm2_raw = local_pool.modify(&addr2).expect("Modifying data failed");
assert_eq!(tm2_raw, test_buf_2);
let tm3_raw = local_pool.modify(&addr3).expect("Modifying data failed");
assert_eq!(tm3_raw, test_buf_3);
}
}

View File

@ -110,7 +110,7 @@ impl EventReporterBase {
) -> Result<(), EcssTmtcErrorWithSend<E>> {
let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?;
sender
.send_tm(tm)
.send_tm(tm.into())
.map_err(|e| EcssTmtcErrorWithSend::SendError(e))?;
self.msg_count += 1;
Ok(())
@ -243,8 +243,10 @@ mod tests {
use super::*;
use crate::events::{EventU32, Severity};
use crate::pus::tests::CommonTmInfo;
use crate::pus::{EcssSender, PusTmWrapper};
use crate::SenderId;
use spacepackets::ByteConversionError;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::vec::Vec;
@ -263,30 +265,41 @@ mod tests {
#[derive(Default, Clone)]
struct TestSender {
pub service_queue: VecDeque<TmInfo>,
pub service_queue: RefCell<VecDeque<TmInfo>>,
}
impl EcssSender for TestSender {
fn id(&self) -> SenderId {
0
}
}
impl EcssTmSenderCore for TestSender {
type Error = ();
fn id(&self) -> SenderId {
0
}
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
assert!(tm.source_data().is_some());
let src_data = tm.source_data().unwrap();
assert!(src_data.len() >= 4);
let event = EventU32::from(u32::from_be_bytes(src_data[0..4].try_into().unwrap()));
let mut aux_data = Vec::new();
if src_data.len() > 4 {
aux_data.extend_from_slice(&src_data[4..]);
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> {
match tm {
PusTmWrapper::InStore(_) => {
panic!("TestSender: unexpected call with address");
}
PusTmWrapper::Direct(tm) => {
assert!(tm.source_data().is_some());
let src_data = tm.source_data().unwrap();
assert!(src_data.len() >= 4);
let event =
EventU32::from(u32::from_be_bytes(src_data[0..4].try_into().unwrap()));
let mut aux_data = Vec::new();
if src_data.len() > 4 {
aux_data.extend_from_slice(&src_data[4..]);
}
self.service_queue.borrow_mut().push_back(TmInfo {
common: CommonTmInfo::new_from_tm(&tm),
event,
aux_data,
});
Ok(())
}
}
self.service_queue.push_back(TmInfo {
common: CommonTmInfo::new_from_tm(&tm),
event,
aux_data,
});
Ok(())
}
}
@ -355,8 +368,9 @@ mod tests {
severity,
error_data,
);
assert_eq!(sender.service_queue.len(), 1);
let tm_info = sender.service_queue.pop_front().unwrap();
let mut service_queue = sender.service_queue.borrow_mut();
assert_eq!(service_queue.len(), 1);
let tm_info = service_queue.pop_front().unwrap();
assert_eq!(
tm_info.common.subservice,
severity_to_subservice(severity) as u8
@ -413,7 +427,7 @@ mod tests {
let err = reporter.event_info(sender, &time_stamp_empty, event, None);
assert!(err.is_err());
let err = err.unwrap_err();
if let EcssTmErrorWithSend::EcssTmError(EcssTmtcError::ByteConversionError(
if let EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversion(
ByteConversionError::ToSliceTooSmall(missmatch),
)) = err
{

View File

@ -10,7 +10,7 @@ use hashbrown::HashSet;
#[cfg(feature = "alloc")]
pub use crate::pus::event::EventReporter;
use crate::pus::verification::{TcStateStarted, VerificationToken};
use crate::pus::verification::TcStateToken;
#[cfg(feature = "alloc")]
use crate::pus::EcssTmSenderCore;
use crate::pus::EcssTmtcErrorWithSend;
@ -91,7 +91,7 @@ pub enum EventRequest<Event: GenericEvent = EventU32> {
#[derive(Debug)]
pub struct EventRequestWithToken<Event: GenericEvent = EventU32> {
pub request: EventRequest<Event>,
pub token: VerificationToken<TcStateStarted>,
pub token: TcStateToken,
}
#[derive(Debug)]

View File

@ -0,0 +1,135 @@
use crate::events::EventU32;
use crate::pool::{SharedPool, StoreAddr};
use crate::pus::event_man::{EventRequest, EventRequestWithToken};
use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken,
};
use crate::pus::{
AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError,
PusServiceBase, PusServiceHandler,
};
use crate::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::event::Subservice;
use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc;
use std::sync::mpsc::{Receiver, Sender};
pub struct PusService5EventHandler {
psb: PusServiceBase,
event_request_tx: Sender<EventRequestWithToken>,
}
impl PusService5EventHandler {
pub fn new(
receiver: Receiver<AcceptedTc>,
tc_pool: SharedPool,
tm_tx: Sender<StoreAddr>,
tm_store: SharedTmStore,
tm_apid: u16,
verification_handler: StdVerifReporterWithSender,
event_request_tx: Sender<EventRequestWithToken>,
) -> Self {
Self {
psb: PusServiceBase::new(
receiver,
tc_pool,
tm_tx,
tm_store,
tm_apid,
verification_handler,
),
event_request_tx,
}
}
}
impl PusServiceHandler for PusService5EventHandler {
fn psb_mut(&mut self) -> &mut PusServiceBase {
&mut self.psb
}
fn psb(&self) -> &PusServiceBase {
&self.psb
}
fn handle_one_tc(
&mut self,
addr: StoreAddr,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap();
let subservice = tc.subservice();
let srv = Subservice::try_from(subservice);
if srv.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(),
token,
));
}
let handle_enable_disable_request = |enable: bool, stamp: [u8; 7]| {
if tc.user_data().is_none() || tc.user_data().unwrap().len() < 4 {
return Err(PusPacketHandlingError::NotEnoughAppData(
"At least 4 bytes event ID expected".into(),
));
}
let user_data = tc.user_data().unwrap();
let event_u32 = EventU32::from(u32::from_be_bytes(user_data[0..4].try_into().unwrap()));
let start_token = self
.psb
.verification_handler
.borrow_mut()
.start_success(token, Some(&stamp))
.map_err(|_| PartialPusHandlingError::Verification);
let partial_error = start_token.clone().err();
let mut token: TcStateToken = token.into();
if let Ok(start_token) = start_token {
token = start_token.into();
}
let event_req_with_token = if enable {
EventRequestWithToken {
request: EventRequest::Enable(event_u32),
token,
}
} else {
EventRequestWithToken {
request: EventRequest::Disable(event_u32),
token,
}
};
self.event_request_tx
.send(event_req_with_token)
.map_err(|_| {
PusPacketHandlingError::SendError("Forwarding event request failed".into())
})?;
if let Some(partial_error) = partial_error {
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
}
Ok(PusPacketHandlerResult::RequestHandled)
};
let mut partial_error = None;
let time_stamp = self.psb().get_current_timestamp(&mut partial_error);
match srv.unwrap() {
Subservice::TmInfoReport
| Subservice::TmLowSeverityReport
| Subservice::TmMediumSeverityReport
| Subservice::TmHighSeverityReport => {
return Err(PusPacketHandlingError::InvalidSubservice(tc.subservice()))
}
Subservice::TcEnableEventGeneration => {
handle_enable_disable_request(true, time_stamp)?;
}
Subservice::TcDisableEventGeneration => {
handle_enable_disable_request(false, time_stamp)?;
}
Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => {
return Ok(PusPacketHandlerResult::SubserviceNotImplemented(
subservice, token,
));
}
}
Ok(PusPacketHandlerResult::RequestHandled)
}
}

View File

@ -1,4 +1,9 @@
//! # PUS support modules
//!
//! This module contains structures to make working with the PUS C standard easier.
//! The satrs-example application contains various usage examples of these components.
use crate::pus::verification::TcStateToken;
use crate::SenderId;
#[cfg(feature = "alloc")]
use downcast_rs::{impl_downcast, Downcast};
#[cfg(feature = "alloc")]
@ -11,19 +16,40 @@ use spacepackets::{ByteConversionError, SizeMissmatch};
pub mod event;
pub mod event_man;
pub mod event_srv;
pub mod hk;
pub mod mode;
pub mod scheduler;
pub mod scheduler_srv;
#[cfg(feature = "std")]
pub mod scheduling;
pub mod test;
pub mod verification;
#[cfg(feature = "alloc")]
pub use alloc_mod::*;
use crate::SenderId;
use crate::pool::StoreAddr;
#[cfg(feature = "std")]
pub use std_mod::*;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PusTmWrapper<'tm> {
InStore(StoreAddr),
Direct(PusTm<'tm>),
}
impl From<StoreAddr> for PusTmWrapper<'_> {
fn from(value: StoreAddr) -> Self {
Self::InStore(value)
}
}
impl<'tm> From<PusTm<'tm>> for PusTmWrapper<'tm> {
fn from(value: PusTm<'tm>) -> Self {
Self::Direct(value)
}
}
#[derive(Debug, Clone)]
pub enum EcssTmtcErrorWithSend<E> {
/// Errors related to sending the telemetry to a TMTC recipient
@ -41,22 +67,22 @@ impl<E> From<EcssTmtcError> for EcssTmtcErrorWithSend<E> {
#[derive(Debug, Clone)]
pub enum EcssTmtcError {
/// Errors related to the time stamp format of the telemetry
TimestampError(TimestampError),
Timestamp(TimestampError),
/// Errors related to byte conversion, for example insufficient buffer size for given data
ByteConversionError(ByteConversionError),
ByteConversion(ByteConversionError),
/// Errors related to PUS packet format
PusError(PusError),
Pus(PusError),
}
impl From<PusError> for EcssTmtcError {
fn from(e: PusError) -> Self {
EcssTmtcError::PusError(e)
EcssTmtcError::Pus(e)
}
}
impl From<ByteConversionError> for EcssTmtcError {
fn from(e: ByteConversionError) -> Self {
EcssTmtcError::ByteConversionError(e)
EcssTmtcError::ByteConversion(e)
}
}
@ -73,16 +99,17 @@ pub trait EcssSender: Send {
pub trait EcssTmSenderCore: EcssSender {
type Error;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error>;
}
/// Generic trait for a user supplied sender object.
///
/// This sender object is responsible for sending PUS telecommands to a TC recipient.
/// This sender object is responsible for sending PUS telecommands to a TC recipient. Each
/// telecommand can optionally have a token which contains its verification state.
pub trait EcssTcSenderCore: EcssSender {
type Error;
fn send_tc(&mut self, tc: PusTc) -> Result<(), Self::Error>;
fn send_tc(&self, tc: PusTc, token: Option<TcStateToken>) -> Result<(), Self::Error>;
}
#[cfg(feature = "alloc")]
@ -133,42 +160,38 @@ mod alloc_mod {
#[cfg(feature = "std")]
pub mod std_mod {
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError};
use crate::pus::{EcssSender, EcssTcSenderCore, EcssTmSenderCore};
use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
};
use crate::pus::{EcssSender, EcssTmSenderCore, PusTmWrapper};
use crate::tmtc::tm_helper::SharedTmStore;
use crate::SenderId;
use alloc::vec::Vec;
use spacepackets::ecss::{PusError, SerializablePusPacket};
use spacepackets::tc::PusTc;
use spacepackets::tm::PusTm;
use std::sync::mpsc::SendError;
use spacepackets::time::cds::TimeProvider;
use spacepackets::time::{StdTimestampError, TimeWriter};
use std::cell::RefCell;
use std::format;
use std::string::String;
use std::sync::{mpsc, RwLockWriteGuard};
use thiserror::Error;
#[derive(Debug, Clone)]
pub enum MpscPusInStoreSendError {
LockError,
PusError(PusError),
StoreError(StoreError),
SendError(SendError<StoreAddr>),
RxDisconnected(StoreAddr),
}
impl From<PusError> for MpscPusInStoreSendError {
fn from(value: PusError) -> Self {
MpscPusInStoreSendError::PusError(value)
}
}
impl From<SendError<StoreAddr>> for MpscPusInStoreSendError {
fn from(value: SendError<StoreAddr>) -> Self {
MpscPusInStoreSendError::SendError(value)
}
}
impl From<StoreError> for MpscPusInStoreSendError {
fn from(value: StoreError) -> Self {
MpscPusInStoreSendError::StoreError(value)
}
#[derive(Debug, Clone, Error)]
pub enum MpscTmInStoreSenderError {
#[error("RwGuard lock error")]
StoreLock,
#[error("Generic PUS error: {0}")]
Pus(#[from] PusError),
#[error("Generic store error: {0}")]
Store(#[from] StoreError),
#[error("MPSC channel send error: {0}")]
Send(#[from] mpsc::SendError<StoreAddr>),
#[error("RX handle has disconnected")]
RxDisconnected,
}
#[derive(Clone)]
pub struct MpscTmtcInStoreSender {
pub struct MpscTmInStoreSender {
id: SenderId,
name: &'static str,
store_helper: SharedPool,
@ -176,7 +199,7 @@ pub mod std_mod {
pub ignore_poison_errors: bool,
}
impl EcssSender for MpscTmtcInStoreSender {
impl EcssSender for MpscTmInStoreSender {
fn id(&self) -> SenderId {
self.id
}
@ -186,13 +209,14 @@ pub mod std_mod {
}
}
impl EcssTmSenderCore for MpscTmtcInStoreSender {
type Error = MpscPusInStoreSendError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
impl MpscTmInStoreSender {
pub fn send_direct_tm(
&self,
tmtc: impl SerializablePusPacket,
) -> Result<(), MpscTmInStoreSenderError> {
let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, slice) = store.free_element(tm.len_packed())?;
tm.write_to_bytes(slice)?;
let (addr, slice) = store.free_element(tmtc.len_packed())?;
tmtc.write_to_bytes(slice)?;
self.sender.send(addr)?;
Ok(())
};
@ -202,37 +226,28 @@ pub mod std_mod {
if self.ignore_poison_errors {
operation(e.into_inner())
} else {
Err(MpscPusInStoreSendError::LockError)
Err(MpscTmInStoreSenderError::StoreLock)
}
}
}
}
}
impl EcssTcSenderCore for MpscTmtcInStoreSender {
type Error = MpscPusInStoreSendError;
impl EcssTmSenderCore for MpscTmInStoreSender {
type Error = MpscTmInStoreSenderError;
fn send_tc(&mut self, tc: PusTc) -> Result<(), Self::Error> {
let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, slice) = store.free_element(tc.len_packed())?;
tc.write_to_bytes(slice)?;
self.sender.send(addr)?;
Ok(())
};
match self.store_helper.write() {
Ok(pool) => operation(pool),
Err(e) => {
if self.ignore_poison_errors {
operation(e.into_inner())
} else {
Err(MpscPusInStoreSendError::LockError)
}
}
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> {
match tm {
PusTmWrapper::InStore(addr) => self
.sender
.send(addr)
.map_err(MpscTmInStoreSenderError::Send),
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
}
}
}
impl MpscTmtcInStoreSender {
impl MpscTmInStoreSender {
pub fn new(
id: SenderId,
name: &'static str,
@ -249,13 +264,23 @@ pub mod std_mod {
}
}
#[derive(Debug, Clone)]
pub enum MpscAsVecSenderError {
PusError(PusError),
SendError(SendError<Vec<u8>>),
#[derive(Debug, Clone, Error)]
pub enum MpscTmAsVecSenderError {
#[error("Generic PUS error: {0}")]
Pus(#[from] PusError),
#[error("MPSC channel send error: {0}")]
Send(#[from] mpsc::SendError<Vec<u8>>),
#[error("can not handle store addresses")]
CantSendAddr(StoreAddr),
#[error("RX handle has disconnected")]
RxDisconnected,
}
#[derive(Debug, Clone)]
/// This class can be used if frequent heap allocations during run-time are not an issue.
/// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation
/// of this class can not deal with store addresses, so it is assumed that is is always
/// going to be called with direct packets.
#[derive(Clone)]
pub struct MpscTmAsVecSender {
id: SenderId,
sender: mpsc::Sender<Vec<u8>>,
@ -278,29 +303,176 @@ pub mod std_mod {
}
impl EcssTmSenderCore for MpscTmAsVecSender {
type Error = MpscAsVecSenderError;
type Error = MpscTmAsVecSenderError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let mut vec = Vec::new();
tm.append_to_vec(&mut vec)
.map_err(MpscAsVecSenderError::PusError)?;
self.sender
.send(vec)
.map_err(MpscAsVecSenderError::SendError)?;
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> {
match tm {
PusTmWrapper::InStore(addr) => Err(MpscTmAsVecSenderError::CantSendAddr(addr)),
PusTmWrapper::Direct(tm) => {
let mut vec = Vec::new();
tm.append_to_vec(&mut vec)
.map_err(MpscTmAsVecSenderError::Pus)?;
self.sender
.send(vec)
.map_err(MpscTmAsVecSenderError::Send)?;
Ok(())
}
}
}
}
#[derive(Debug, Clone, Error)]
pub enum PusPacketHandlingError {
#[error("Generic PUS error: {0}")]
PusError(#[from] PusError),
#[error("Wrong service number {0} for packet handler")]
WrongService(u8),
#[error("Invalid subservice {0}")]
InvalidSubservice(u8),
#[error("Not enough application data available: {0}")]
NotEnoughAppData(String),
#[error("Invalid application data")]
InvalidAppData(String),
#[error("Generic store error: {0}")]
StoreError(#[from] StoreError),
#[error("Error with the pool RwGuard: {0}")]
RwGuardError(String),
#[error("MQ send error: {0}")]
SendError(String),
#[error("TX message queue side has disconnected")]
QueueDisconnected,
#[error("Other error {0}")]
OtherError(String),
}
#[derive(Debug, Clone, Error)]
pub enum PartialPusHandlingError {
#[error("Generic timestamp generation error")]
Time(StdTimestampError),
#[error("Error sending telemetry: {0}")]
TmSend(String),
#[error("Error sending verification message")]
Verification,
}
/// Generic result type for handlers which can process PUS packets.
#[derive(Debug, Clone)]
pub enum PusPacketHandlerResult {
RequestHandled,
RequestHandledPartialSuccess(PartialPusHandlingError),
SubserviceNotImplemented(u8, VerificationToken<TcStateAccepted>),
CustomSubservice(u8, VerificationToken<TcStateAccepted>),
Empty,
}
impl From<PartialPusHandlingError> for PusPacketHandlerResult {
fn from(value: PartialPusHandlingError) -> Self {
Self::RequestHandledPartialSuccess(value)
}
}
/// Generic abstraction for a telecommand being sent around after is has been accepted.
/// The actual telecommand is stored inside a pre-allocated pool structure.
pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
/// Base class for handlers which can handle PUS TC packets. Right now, the message queue
/// backend is constrained to [mpsc::channel]s and the verification reporter
/// is constrained to the [StdVerifReporterWithSender].
pub struct PusServiceBase {
pub tc_rx: mpsc::Receiver<AcceptedTc>,
pub tc_store: SharedPool,
pub tm_tx: mpsc::Sender<StoreAddr>,
pub tm_store: SharedTmStore,
pub tm_apid: u16,
/// The verification handler is wrapped in a [RefCell] to allow the interior mutability
/// pattern. This makes writing methods which are not mutable a lot easier.
pub verification_handler: RefCell<StdVerifReporterWithSender>,
pub pus_buf: [u8; 2048],
pub pus_size: usize,
}
impl PusServiceBase {
pub fn new(
receiver: mpsc::Receiver<AcceptedTc>,
tc_pool: SharedPool,
tm_tx: mpsc::Sender<StoreAddr>,
tm_store: SharedTmStore,
tm_apid: u16,
verification_handler: StdVerifReporterWithSender,
) -> Self {
Self {
tc_rx: receiver,
tc_store: tc_pool,
tm_apid,
tm_tx,
tm_store,
verification_handler: RefCell::new(verification_handler),
pus_buf: [0; 2048],
pus_size: 0,
}
}
pub fn get_current_timestamp(
&self,
partial_error: &mut Option<PartialPusHandlingError>,
) -> [u8; 7] {
let mut time_stamp: [u8; 7] = [0; 7];
let time_provider =
TimeProvider::from_now_with_u16_days().map_err(PartialPusHandlingError::Time);
if let Ok(time_provider) = time_provider {
// Can't fail, we have a buffer with the exact required size.
time_provider.write_to_bytes(&mut time_stamp).unwrap();
} else {
*partial_error = Some(time_provider.unwrap_err());
}
time_stamp
}
pub fn get_current_timestamp_ignore_error(&self) -> [u8; 7] {
let mut dummy = None;
self.get_current_timestamp(&mut dummy)
}
}
pub trait PusServiceHandler {
fn psb_mut(&mut self) -> &mut PusServiceBase;
fn psb(&self) -> &PusServiceBase;
fn handle_one_tc(
&mut self,
addr: StoreAddr,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError>;
fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> {
// Keep locked section as short as possible.
let psb_mut = self.psb_mut();
let mut tc_pool = psb_mut
.tc_store
.write()
.map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?;
let tc_guard = tc_pool.read_with_guard(addr);
let tc_raw = tc_guard.read().unwrap();
psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw);
Ok(())
}
fn handle_next_packet(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
return match self.psb().tc_rx.try_recv() {
Ok((addr, token)) => self.handle_one_tc(addr, token),
Err(e) => match e {
mpsc::TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty),
mpsc::TryRecvError::Disconnected => {
Err(PusPacketHandlingError::QueueDisconnected)
}
},
};
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum GenericTcCheckError {
NotEnoughAppData,
InvalidSubservice,
}
pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmtcError> {
if len > cap {
return Err(EcssTmtcError::ByteConversionError(
return Err(EcssTmtcError::ByteConversion(
ByteConversionError::ToSliceTooSmall(SizeMissmatch {
found: cap,
expected: len,

View File

@ -2,25 +2,24 @@
//!
//! The core data structure of this module is the [PusScheduler]. This structure can be used
//! to perform the scheduling of telecommands like specified in the ECSS standard.
use crate::pool::{PoolProvider, StoreAddr, StoreError};
use alloc::collections::btree_map::{Entry, Range};
use alloc::vec;
use alloc::vec::Vec;
use crate::pool::{StoreAddr, StoreError};
use core::fmt::{Debug, Display, Formatter};
use core::time::Duration;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use spacepackets::ecss::scheduling::TimeWindowType;
use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::ecss::PusError;
use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc};
use spacepackets::time::cds::DaysLen24Bits;
use spacepackets::time::{cds, CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp};
use spacepackets::time::{CcsdsTimeProvider, TimestampError, UnixTimestamp};
use spacepackets::CcsdsPacket;
use std::collections::BTreeMap;
#[cfg(feature = "std")]
use std::error::Error;
#[cfg(feature = "std")]
use std::time::SystemTimeError;
//#[cfg(feature = "std")]
//pub use std_mod::*;
#[cfg(feature = "alloc")]
pub use alloc_mod::*;
/// This is the request ID as specified in ECSS-E-ST-70-41C 5.4.11.2 of the standard.
///
@ -171,35 +170,6 @@ impl TcInfo {
}
}
/// This is the core data structure for scheduling PUS telecommands with [alloc] support.
///
/// It is assumed that the actual telecommand data is stored in a separate TC pool offering
/// a [crate::pool::PoolProvider] API. This data structure just tracks the store addresses and their
/// release times and offers a convenient API to insert and release telecommands and perform
/// other functionality specified by the ECSS standard in section 6.11. The time is tracked
/// as a [spacepackets::time::UnixTimestamp] but the only requirement to the timekeeping of
/// the user is that it is convertible to that timestamp.
///
/// The standard also specifies that the PUS scheduler can be enabled and disabled.
/// A disabled scheduler should still delete commands where the execution time has been reached
/// but should not release them to be executed.
///
/// The implementation uses an ordered map internally with the release timestamp being the key.
/// This allows efficient time based insertions and extractions which should be the primary use-case
/// for a time-based command scheduler.
/// There is no way to avoid duplicate [RequestId]s during insertion, which can occur even if the
/// user always correctly increment for sequence counter due to overflows. To avoid this issue,
/// it can make sense to split up telecommand groups by the APID to avoid overflows.
///
/// Currently, sub-schedules and groups are not supported.
#[derive(Debug)]
pub struct PusScheduler {
tc_map: BTreeMap<UnixTimestamp, Vec<TcInfo>>,
current_time: UnixTimestamp,
time_margin: Duration,
enabled: bool,
}
enum DeletionResult {
WithoutStoreDeletion(Option<StoreAddr>),
WithStoreDeletion(Result<bool, StoreError>),
@ -259,360 +229,410 @@ impl<TimeProvider: CcsdsTimeProvider + Clone> TimeWindow<TimeProvider> {
}
}
impl PusScheduler {
/// Create a new PUS scheduler.
///
/// # Arguments
///
/// * `init_current_time` - The time to initialize the scheduler with.
/// * `time_margin` - This time margin is used when inserting new telecommands into the
/// schedule. If the release time of a new telecommand is earlier than the time margin
/// added to the current time, it will not be inserted into the schedule.
pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self {
PusScheduler {
tc_map: Default::default(),
current_time: init_current_time,
time_margin,
enabled: true,
}
}
#[cfg(feature = "alloc")]
pub mod alloc_mod {
use crate::pool::{PoolProvider, StoreAddr, StoreError};
use crate::pus::scheduler::{DeletionResult, RequestId, ScheduleError, TcInfo, TimeWindow};
use alloc::collections::btree_map::{Entry, Range};
use alloc::collections::BTreeMap;
use alloc::vec;
use alloc::vec::Vec;
use core::time::Duration;
use spacepackets::ecss::scheduling::TimeWindowType;
use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc;
use spacepackets::time::cds::DaysLen24Bits;
use spacepackets::time::{cds, CcsdsTimeProvider, TimeReader, UnixTimestamp};
/// Like [Self::new], but sets the `init_current_time` parameter to the current system time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_with_current_init_time(time_margin: Duration) -> Result<Self, SystemTimeError> {
Ok(Self::new(UnixTimestamp::from_now()?, time_margin))
}
pub fn num_scheduled_telecommands(&self) -> u64 {
let mut num_entries = 0;
for entries in &self.tc_map {
num_entries += entries.1.len() as u64;
}
num_entries
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn enable(&mut self) {
self.enabled = true;
}
use std::time::SystemTimeError;
/// This is the core data structure for scheduling PUS telecommands with [alloc] support.
///
/// It is assumed that the actual telecommand data is stored in a separate TC pool offering
/// a [crate::pool::PoolProvider] API. This data structure just tracks the store addresses and their
/// release times and offers a convenient API to insert and release telecommands and perform
/// other functionality specified by the ECSS standard in section 6.11. The time is tracked
/// as a [spacepackets::time::UnixTimestamp] but the only requirement to the timekeeping of
/// the user is that it is convertible to that timestamp.
///
/// The standard also specifies that the PUS scheduler can be enabled and disabled.
/// A disabled scheduler should still delete commands where the execution time has been reached
/// but should not release them to be executed.
pub fn disable(&mut self) {
self.enabled = false;
}
/// This will disable the scheduler and clear the schedule as specified in 6.11.4.4.
/// Be careful with this command as it will delete all the commands in the schedule.
///
/// The holding store for the telecommands needs to be passed so all the stored telecommands
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
/// will be returned but the method will still try to delete all the commands in the schedule.
pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> {
self.enabled = false;
let mut deletion_ok = Ok(());
for tc_lists in &mut self.tc_map {
for tc in tc_lists.1 {
let res = store.delete(tc.addr);
if res.is_err() {
deletion_ok = res;
}
}
}
self.tc_map.clear();
deletion_ok
}
pub fn update_time(&mut self, current_time: UnixTimestamp) {
self.current_time = current_time;
}
pub fn current_time(&self) -> &UnixTimestamp {
&self.current_time
}
/// Insert a telecommand which was already unwrapped from the outer Service 11 packet and stored
/// inside the telecommand packet pool.
pub fn insert_unwrapped_and_stored_tc(
&mut self,
time_stamp: UnixTimestamp,
info: TcInfo,
) -> Result<(), ScheduleError> {
if time_stamp < self.current_time + self.time_margin {
return Err(ScheduleError::ReleaseTimeInTimeMargin(
self.current_time,
self.time_margin,
time_stamp,
));
}
match self.tc_map.entry(time_stamp) {
Entry::Vacant(e) => {
e.insert(vec![info]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(info);
}
}
Ok(())
}
/// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still
/// needs to be stored inside the telecommand pool.
pub fn insert_unwrapped_tc(
&mut self,
time_stamp: UnixTimestamp,
tc: &[u8],
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
let check_tc = PusTc::from_bytes(tc)?;
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
return Err(ScheduleError::NestedScheduledTc);
}
let req_id = RequestId::from_tc(&check_tc.0);
match pool.add(tc) {
Ok(addr) => {
let info = TcInfo::new(addr, req_id);
self.insert_unwrapped_and_stored_tc(time_stamp, info)?;
Ok(info)
}
Err(err) => Err(err.into()),
}
}
/// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp
/// provider needs to be supplied via a generic.
pub fn insert_wrapped_tc<TimeStamp: CcsdsTimeProvider + TimeReader>(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
if PusPacket::service(pus_tc) != 11 {
return Err(ScheduleError::WrongService);
}
if PusPacket::subservice(pus_tc) != 4 {
return Err(ScheduleError::WrongSubservice);
}
return if let Some(user_data) = pus_tc.user_data() {
let stamp: TimeStamp = TimeReader::from_bytes(user_data)?;
let unix_stamp = stamp.unix_stamp();
let stamp_len = stamp.len_as_bytes();
self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool)
} else {
Err(ScheduleError::TcDataEmpty)
};
}
/// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS
/// short timestamp with 16-bit length of days field.
pub fn insert_wrapped_tc_cds_short(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
self.insert_wrapped_tc::<cds::TimeProvider>(pus_tc, pool)
}
/// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS
/// long timestamp with a 24-bit length of days field.
pub fn insert_wrapped_tc_cds_long(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
self.insert_wrapped_tc::<cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
}
/// This function uses [Self::retrieve_by_time_filter] to extract all scheduled commands inside
/// the time range and then deletes them from the provided store.
/// The implementation uses an ordered map internally with the release timestamp being the key.
/// This allows efficient time based insertions and extractions which should be the primary use-case
/// for a time-based command scheduler.
/// There is no way to avoid duplicate [RequestId]s during insertion, which can occur even if the
/// user always correctly increment for sequence counter due to overflows. To avoid this issue,
/// it can make sense to split up telecommand groups by the APID to avoid overflows.
///
/// Like specified in the documentation of [Self::retrieve_by_time_filter], the range extraction
/// for deletion is always inclusive.
///
/// This function returns the number of deleted commands on success. In case any deletion fails,
/// the last deletion will be supplied in addition to the number of deleted commands.
pub fn delete_by_time_filter<TimeProvider: CcsdsTimeProvider + Clone>(
&mut self,
time_window: TimeWindow<TimeProvider>,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let range = self.retrieve_by_time_filter(time_window);
let mut del_packets = 0;
let mut res_if_fails = None;
let mut keys_to_delete = Vec::new();
for time_bucket in range {
for tc in time_bucket.1 {
match pool.delete(tc.addr) {
Ok(_) => del_packets += 1,
Err(e) => res_if_fails = Some(e),
}
}
keys_to_delete.push(*time_bucket.0);
}
for key in keys_to_delete {
self.tc_map.remove(&key);
}
if let Some(err) = res_if_fails {
return Err((del_packets, err));
}
Ok(del_packets)
/// Currently, sub-schedules and groups are not supported.
#[derive(Debug)]
pub struct PusScheduler {
tc_map: BTreeMap<UnixTimestamp, Vec<TcInfo>>,
pub(crate) current_time: UnixTimestamp,
time_margin: Duration,
enabled: bool,
}
/// Deletes all the scheduled commands. This also deletes the packets from the passed TC pool.
///
/// This function returns the number of deleted commands on success. In case any deletion fails,
/// the last deletion will be supplied in addition to the number of deleted commands.
pub fn delete_all(
&mut self,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
self.delete_by_time_filter(TimeWindow::<cds::TimeProvider>::new_select_all(), pool)
}
/// Retrieve a range over all scheduled commands.
pub fn retrieve_all(&mut self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
self.tc_map.range(..)
}
/// This retrieves scheduled telecommands which are inside the provided time window.
///
/// It should be noted that the ranged extraction is always inclusive. For example, a range
/// from 50 to 100 unix seconds would also include command scheduled at 100 unix seconds.
pub fn retrieve_by_time_filter<TimeProvider: CcsdsTimeProvider>(
&mut self,
time_window: TimeWindow<TimeProvider>,
) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
match time_window.time_window_type() {
TimeWindowType::SelectAll => self.tc_map.range(..),
TimeWindowType::TimeTagToTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let start_time = time_window.start_time().unwrap().unix_stamp();
let end_time = time_window.end_time().unwrap().unix_stamp();
self.tc_map.range(start_time..=end_time)
}
TimeWindowType::FromTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let start_time = time_window.start_time().unwrap().unix_stamp();
self.tc_map.range(start_time..)
}
TimeWindowType::ToTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let end_time = time_window.end_time().unwrap().unix_stamp();
self.tc_map.range(..=end_time)
impl PusScheduler {
/// Create a new PUS scheduler.
///
/// # Arguments
///
/// * `init_current_time` - The time to initialize the scheduler with.
/// * `time_margin` - This time margin is used when inserting new telecommands into the
/// schedule. If the release time of a new telecommand is earlier than the time margin
/// added to the current time, it will not be inserted into the schedule.
pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self {
PusScheduler {
tc_map: Default::default(),
current_time: init_current_time,
time_margin,
enabled: true,
}
}
}
/// Deletes a scheduled command with the given request ID. Returns the store address if a
/// scheduled command was found in the map and deleted, and None otherwise.
///
/// Please note that this function will stop on the first telecommand with a request ID match.
/// In case of duplicate IDs (which should generally not happen), this function needs to be
/// called repeatedly.
pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option<StoreAddr> {
if let DeletionResult::WithoutStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>)
{
return v;
/// Like [Self::new], but sets the `init_current_time` parameter to the current system time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_with_current_init_time(time_margin: Duration) -> Result<Self, SystemTimeError> {
Ok(Self::new(UnixTimestamp::from_now()?, time_margin))
}
panic!("unexpected deletion result");
}
/// This behaves like [Self::delete_by_request_id] but deletes the packet from the pool as well.
pub fn delete_by_request_id_and_from_pool(
&mut self,
req_id: &RequestId,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<bool, StoreError> {
if let DeletionResult::WithStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, Some(pool))
{
return v;
}
panic!("unexpected deletion result");
}
fn delete_by_request_id_internal(
&mut self,
req_id: &RequestId,
pool: Option<&mut (impl PoolProvider + ?Sized)>,
) -> DeletionResult {
let mut idx_found = None;
for time_bucket in &mut self.tc_map {
for (idx, tc_info) in time_bucket.1.iter().enumerate() {
if &tc_info.request_id == req_id {
idx_found = Some(idx);
}
}
if let Some(idx) = idx_found {
let addr = time_bucket.1.remove(idx).addr;
if let Some(pool) = pool {
return match pool.delete(addr) {
Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)),
Err(e) => DeletionResult::WithStoreDeletion(Err(e)),
};
}
return DeletionResult::WithoutStoreDeletion(Some(addr));
pub fn num_scheduled_telecommands(&self) -> u64 {
let mut num_entries = 0;
for entries in &self.tc_map {
num_entries += entries.1.len() as u64;
}
num_entries
}
if pool.is_none() {
DeletionResult::WithoutStoreDeletion(None)
} else {
DeletionResult::WithStoreDeletion(Ok(false))
pub fn is_enabled(&self) -> bool {
self.enabled
}
}
/// Retrieve all telecommands which should be release based on the current time.
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
self.tc_map.range(..=self.current_time)
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> {
self.current_time = UnixTimestamp::from_now()?;
Ok(())
}
pub fn enable(&mut self) {
self.enabled = true;
}
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
/// closure for each telecommand which should be released. This function will also delete
/// the telecommands from the holding store after calling the release closure, if the scheduler
/// is disabled.
///
/// # Arguments
///
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
/// the second argument is the telecommand information also containing the store address.
/// This closure should return whether the command should be deleted if the scheduler is
/// disabled to prevent memory leaks.
/// * `store` - The holding store of the telecommands.
pub fn release_telecommands<R: FnMut(bool, &TcInfo) -> bool>(
&mut self,
mut releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = 0;
let mut store_error = Ok(());
for tc in tcs_to_release {
for info in tc.1 {
let should_delete = releaser(self.enabled, info);
released_tcs += 1;
if should_delete && !self.is_enabled() {
let res = tc_store.delete(info.addr);
/// A disabled scheduler should still delete commands where the execution time has been reached
/// but should not release them to be executed.
pub fn disable(&mut self) {
self.enabled = false;
}
/// This will disable the scheduler and clear the schedule as specified in 6.11.4.4.
/// Be careful with this command as it will delete all the commands in the schedule.
///
/// The holding store for the telecommands needs to be passed so all the stored telecommands
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
/// will be returned but the method will still try to delete all the commands in the schedule.
pub fn reset(
&mut self,
store: &mut (impl PoolProvider + ?Sized),
) -> Result<(), StoreError> {
self.enabled = false;
let mut deletion_ok = Ok(());
for tc_lists in &mut self.tc_map {
for tc in tc_lists.1 {
let res = store.delete(tc.addr);
if res.is_err() {
store_error = res;
deletion_ok = res;
}
}
}
self.tc_map.clear();
deletion_ok
}
pub fn update_time(&mut self, current_time: UnixTimestamp) {
self.current_time = current_time;
}
pub fn current_time(&self) -> &UnixTimestamp {
&self.current_time
}
/// Insert a telecommand which was already unwrapped from the outer Service 11 packet and stored
/// inside the telecommand packet pool.
pub fn insert_unwrapped_and_stored_tc(
&mut self,
time_stamp: UnixTimestamp,
info: TcInfo,
) -> Result<(), ScheduleError> {
if time_stamp < self.current_time + self.time_margin {
return Err(ScheduleError::ReleaseTimeInTimeMargin(
self.current_time,
self.time_margin,
time_stamp,
));
}
match self.tc_map.entry(time_stamp) {
Entry::Vacant(e) => {
e.insert(vec![info]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(info);
}
}
Ok(())
}
/// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still
/// needs to be stored inside the telecommand pool.
pub fn insert_unwrapped_tc(
&mut self,
time_stamp: UnixTimestamp,
tc: &[u8],
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
let check_tc = PusTc::from_bytes(tc)?;
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
return Err(ScheduleError::NestedScheduledTc);
}
let req_id = RequestId::from_tc(&check_tc.0);
match pool.add(tc) {
Ok(addr) => {
let info = TcInfo::new(addr, req_id);
self.insert_unwrapped_and_stored_tc(time_stamp, info)?;
Ok(info)
}
Err(err) => Err(err.into()),
}
}
/// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp
/// provider needs to be supplied via a generic.
pub fn insert_wrapped_tc<TimeStamp: CcsdsTimeProvider + TimeReader>(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
if PusPacket::service(pus_tc) != 11 {
return Err(ScheduleError::WrongService);
}
if PusPacket::subservice(pus_tc) != 4 {
return Err(ScheduleError::WrongSubservice);
}
return if let Some(user_data) = pus_tc.user_data() {
let stamp: TimeStamp = TimeReader::from_bytes(user_data)?;
let unix_stamp = stamp.unix_stamp();
let stamp_len = stamp.len_as_bytes();
self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool)
} else {
Err(ScheduleError::TcDataEmpty)
};
}
/// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS
/// short timestamp with 16-bit length of days field.
pub fn insert_wrapped_tc_cds_short(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
self.insert_wrapped_tc::<cds::TimeProvider>(pus_tc, pool)
}
/// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS
/// long timestamp with a 24-bit length of days field.
pub fn insert_wrapped_tc_cds_long(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<TcInfo, ScheduleError> {
self.insert_wrapped_tc::<cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
}
/// This function uses [Self::retrieve_by_time_filter] to extract all scheduled commands inside
/// the time range and then deletes them from the provided store.
///
/// Like specified in the documentation of [Self::retrieve_by_time_filter], the range extraction
/// for deletion is always inclusive.
///
/// This function returns the number of deleted commands on success. In case any deletion fails,
/// the last deletion will be supplied in addition to the number of deleted commands.
pub fn delete_by_time_filter<TimeProvider: CcsdsTimeProvider + Clone>(
&mut self,
time_window: TimeWindow<TimeProvider>,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let range = self.retrieve_by_time_filter(time_window);
let mut del_packets = 0;
let mut res_if_fails = None;
let mut keys_to_delete = Vec::new();
for time_bucket in range {
for tc in time_bucket.1 {
match pool.delete(tc.addr) {
Ok(_) => del_packets += 1,
Err(e) => res_if_fails = Some(e),
}
}
keys_to_delete.push(*time_bucket.0);
}
for key in keys_to_delete {
self.tc_map.remove(&key);
}
if let Some(err) = res_if_fails {
return Err((del_packets, err));
}
Ok(del_packets)
}
/// Deletes all the scheduled commands. This also deletes the packets from the passed TC pool.
///
/// This function returns the number of deleted commands on success. In case any deletion fails,
/// the last deletion will be supplied in addition to the number of deleted commands.
pub fn delete_all(
&mut self,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
self.delete_by_time_filter(TimeWindow::<cds::TimeProvider>::new_select_all(), pool)
}
/// Retrieve a range over all scheduled commands.
pub fn retrieve_all(&mut self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
self.tc_map.range(..)
}
/// This retrieves scheduled telecommands which are inside the provided time window.
///
/// It should be noted that the ranged extraction is always inclusive. For example, a range
/// from 50 to 100 unix seconds would also include command scheduled at 100 unix seconds.
pub fn retrieve_by_time_filter<TimeProvider: CcsdsTimeProvider>(
&mut self,
time_window: TimeWindow<TimeProvider>,
) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
match time_window.time_window_type() {
TimeWindowType::SelectAll => self.tc_map.range(..),
TimeWindowType::TimeTagToTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let start_time = time_window.start_time().unwrap().unix_stamp();
let end_time = time_window.end_time().unwrap().unix_stamp();
self.tc_map.range(start_time..=end_time)
}
TimeWindowType::FromTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let start_time = time_window.start_time().unwrap().unix_stamp();
self.tc_map.range(start_time..)
}
TimeWindowType::ToTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let end_time = time_window.end_time().unwrap().unix_stamp();
self.tc_map.range(..=end_time)
}
}
}
/// Deletes a scheduled command with the given request ID. Returns the store address if a
/// scheduled command was found in the map and deleted, and None otherwise.
///
/// Please note that this function will stop on the first telecommand with a request ID match.
/// In case of duplicate IDs (which should generally not happen), this function needs to be
/// called repeatedly.
pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option<StoreAddr> {
if let DeletionResult::WithoutStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>)
{
return v;
}
panic!("unexpected deletion result");
}
/// This behaves like [Self::delete_by_request_id] but deletes the packet from the pool as well.
pub fn delete_by_request_id_and_from_pool(
&mut self,
req_id: &RequestId,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<bool, StoreError> {
if let DeletionResult::WithStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, Some(pool))
{
return v;
}
panic!("unexpected deletion result");
}
fn delete_by_request_id_internal(
&mut self,
req_id: &RequestId,
pool: Option<&mut (impl PoolProvider + ?Sized)>,
) -> DeletionResult {
let mut idx_found = None;
for time_bucket in &mut self.tc_map {
for (idx, tc_info) in time_bucket.1.iter().enumerate() {
if &tc_info.request_id == req_id {
idx_found = Some(idx);
}
}
if let Some(idx) = idx_found {
let addr = time_bucket.1.remove(idx).addr;
if let Some(pool) = pool {
return match pool.delete(addr) {
Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)),
Err(e) => DeletionResult::WithStoreDeletion(Err(e)),
};
}
return DeletionResult::WithoutStoreDeletion(Some(addr));
}
}
if pool.is_none() {
DeletionResult::WithoutStoreDeletion(None)
} else {
DeletionResult::WithStoreDeletion(Ok(false))
}
}
/// Retrieve all telecommands which should be release based on the current time.
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
self.tc_map.range(..=self.current_time)
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> {
self.current_time = UnixTimestamp::from_now()?;
Ok(())
}
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
/// closure for each telecommand which should be released. This function will also delete
/// the telecommands from the holding store after calling the release closure, if the scheduler
/// is disabled.
///
/// # Arguments
///
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
/// the second argument is the telecommand information also containing the store address.
/// This closure should return whether the command should be deleted if the scheduler is
/// disabled to prevent memory leaks.
/// * `store` - The holding store of the telecommands.
pub fn release_telecommands<R: FnMut(bool, &TcInfo) -> bool>(
&mut self,
mut releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = 0;
let mut store_error = Ok(());
for tc in tcs_to_release {
for info in tc.1 {
let should_delete = releaser(self.enabled, info);
released_tcs += 1;
if should_delete && !self.is_enabled() {
let res = tc_store.delete(info.addr);
if res.is_err() {
store_error = res;
}
}
}
}
self.tc_map.retain(|k, _| k > &self.current_time);
store_error
.map(|_| released_tcs)
.map_err(|e| (released_tcs, e))
}
self.tc_map.retain(|k, _| k > &self.current_time);
store_error
.map(|_| released_tcs)
.map_err(|e| (released_tcs, e))
}
}
@ -620,6 +640,8 @@ impl PusScheduler {
mod tests {
use super::*;
use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError};
use alloc::collections::btree_map::Range;
use spacepackets::ecss::SerializablePusPacket;
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::time::{cds, TimeWriter, UnixTimestamp};
use spacepackets::SpHeader;

View File

@ -0,0 +1,177 @@
use crate::pool::{SharedPool, StoreAddr};
use crate::pus::scheduler::PusScheduler;
use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken};
use crate::pus::{
AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler,
};
use crate::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::{scheduling, PusPacket};
use spacepackets::tc::PusTc;
use spacepackets::time::cds::TimeProvider;
use std::sync::mpsc::{Receiver, Sender};
/// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service)
/// packets. This handler is constrained to using the [PusScheduler], but is able to process
/// the most important PUS requests for a scheduling service.
///
/// Please note that this class does not do the regular periodic handling like releasing any
/// telecommands inside the scheduler. The user can retrieve the wrapped scheduler via the
/// [Self::scheduler] and [Self::scheduler_mut] function and then use the scheduler API to release
/// telecommands when applicable.
pub struct PusService11SchedHandler {
psb: PusServiceBase,
scheduler: PusScheduler,
}
impl PusService11SchedHandler {
pub fn new(
receiver: Receiver<AcceptedTc>,
tc_pool: SharedPool,
tm_tx: Sender<StoreAddr>,
tm_store: SharedTmStore,
tm_apid: u16,
verification_handler: StdVerifReporterWithSender,
scheduler: PusScheduler,
) -> Self {
Self {
psb: PusServiceBase::new(
receiver,
tc_pool,
tm_tx,
tm_store,
tm_apid,
verification_handler,
),
scheduler,
}
}
pub fn scheduler_mut(&mut self) -> &mut PusScheduler {
&mut self.scheduler
}
pub fn scheduler(&self) -> &PusScheduler {
&self.scheduler
}
}
impl PusServiceHandler for PusService11SchedHandler {
fn psb_mut(&mut self) -> &mut PusServiceBase {
&mut self.psb
}
fn psb(&self) -> &PusServiceBase {
&self.psb
}
fn handle_one_tc(
&mut self,
addr: StoreAddr,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap();
let std_service = scheduling::Subservice::try_from(tc.subservice());
if std_service.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(),
token,
));
}
let mut partial_error = None;
let time_stamp = self.psb().get_current_timestamp(&mut partial_error);
match std_service.unwrap() {
scheduling::Subservice::TcEnableScheduling => {
let start_token = self
.psb
.verification_handler
.get_mut()
.start_success(token, Some(&time_stamp))
.expect("Error sending start success");
self.scheduler.enable();
if self.scheduler.is_enabled() {
self.psb
.verification_handler
.get_mut()
.completion_success(start_token, Some(&time_stamp))
.expect("Error sending completion success");
} else {
panic!("Failed to enable scheduler");
}
}
scheduling::Subservice::TcDisableScheduling => {
let start_token = self
.psb
.verification_handler
.get_mut()
.start_success(token, Some(&time_stamp))
.expect("Error sending start success");
self.scheduler.disable();
if !self.scheduler.is_enabled() {
self.psb
.verification_handler
.get_mut()
.completion_success(start_token, Some(&time_stamp))
.expect("Error sending completion success");
} else {
panic!("Failed to disable scheduler");
}
}
scheduling::Subservice::TcResetScheduling => {
let start_token = self
.psb
.verification_handler
.get_mut()
.start_success(token, Some(&time_stamp))
.expect("Error sending start success");
let mut pool = self.psb.tc_store.write().expect("Locking pool failed");
self.scheduler
.reset(pool.as_mut())
.expect("Error resetting TC Pool");
self.psb
.verification_handler
.get_mut()
.completion_success(start_token, Some(&time_stamp))
.expect("Error sending completion success");
}
scheduling::Subservice::TcInsertActivity => {
let start_token = self
.psb
.verification_handler
.get_mut()
.start_success(token, Some(&time_stamp))
.expect("error sending start success");
let mut pool = self.psb.tc_store.write().expect("locking pool failed");
self.scheduler
.insert_wrapped_tc::<TimeProvider>(&tc, pool.as_mut())
.expect("insertion of activity into pool failed");
self.psb
.verification_handler
.get_mut()
.completion_success(start_token, Some(&time_stamp))
.expect("sending completion success failed");
}
_ => {
return Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(),
token,
));
}
}
if let Some(partial_error) = partial_error {
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
}
Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(),
token,
))
}
}

225
satrs-core/src/pus/test.rs Normal file
View File

@ -0,0 +1,225 @@
use crate::pool::{SharedPool, StoreAddr};
use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken};
use crate::pus::{
AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError,
PusServiceBase, PusServiceHandler,
};
use crate::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc;
use spacepackets::tm::{PusTm, PusTmSecondaryHeader};
use spacepackets::SpHeader;
use std::format;
use std::sync::mpsc::{Receiver, Sender};
/// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets.
/// This handler only processes ping requests and generates a ping reply for them accordingly.
pub struct PusService17TestHandler {
psb: PusServiceBase,
}
impl PusService17TestHandler {
pub fn new(
receiver: Receiver<AcceptedTc>,
tc_pool: SharedPool,
tm_tx: Sender<StoreAddr>,
tm_store: SharedTmStore,
tm_apid: u16,
verification_handler: StdVerifReporterWithSender,
) -> Self {
Self {
psb: PusServiceBase::new(
receiver,
tc_pool,
tm_tx,
tm_store,
tm_apid,
verification_handler,
),
}
}
}
impl PusServiceHandler for PusService17TestHandler {
fn psb_mut(&mut self) -> &mut PusServiceBase {
&mut self.psb
}
fn psb(&self) -> &PusServiceBase {
&self.psb
}
fn handle_one_tc(
&mut self,
addr: StoreAddr,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?;
if tc.service() != 17 {
return Err(PusPacketHandlingError::WrongService(tc.service()));
}
if tc.subservice() == 1 {
let mut partial_error = None;
let time_stamp = self.psb().get_current_timestamp(&mut partial_error);
let result = self
.psb
.verification_handler
.get_mut()
.start_success(token, Some(&time_stamp))
.map_err(|_| PartialPusHandlingError::Verification);
let start_token = if let Ok(result) = result {
Some(result)
} else {
partial_error = Some(result.unwrap_err());
None
};
// Sequence count will be handled centrally in TM funnel.
let mut reply_header = SpHeader::tm_unseg(self.psb.tm_apid, 0, 0).unwrap();
let tc_header = PusTmSecondaryHeader::new_simple(17, 2, &time_stamp);
let ping_reply = PusTm::new(&mut reply_header, tc_header, None, true);
let addr = self.psb.tm_store.add_pus_tm(&ping_reply);
if let Err(e) = self
.psb
.tm_tx
.send(addr)
.map_err(|e| PartialPusHandlingError::TmSend(format!("{e}")))
{
partial_error = Some(e);
}
if let Some(start_token) = start_token {
if self
.psb
.verification_handler
.get_mut()
.completion_success(start_token, Some(&time_stamp))
.is_err()
{
partial_error = Some(PartialPusHandlingError::Verification)
}
}
if let Some(partial_error) = partial_error {
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
};
return Ok(PusPacketHandlerResult::RequestHandled);
}
Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(),
token,
))
}
}
#[cfg(test)]
mod tests {
use crate::pool::{LocalPool, PoolCfg, SharedPool};
use crate::pus::test::PusService17TestHandler;
use crate::pus::verification::{
RequestId, StdVerifReporterWithSender, VerificationReporterCfg,
};
use crate::pus::{MpscTmInStoreSender, PusServiceHandler};
use crate::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::{PusPacket, SerializablePusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm;
use spacepackets::{SequenceFlags, SpHeader};
use std::boxed::Box;
use std::sync::{mpsc, RwLock};
use std::vec;
const TEST_APID: u16 = 0x101;
#[test]
fn test_basic_ping_processing() {
let mut pus_buf: [u8; 64] = [0; 64];
let pool_cfg = PoolCfg::new(vec![(16, 16), (8, 32), (4, 64)]);
let tc_pool = LocalPool::new(pool_cfg.clone());
let tm_pool = LocalPool::new(pool_cfg);
let tc_pool_shared = SharedPool::new(RwLock::new(Box::new(tc_pool)));
let tm_pool_shared = SharedPool::new(RwLock::new(Box::new(tm_pool)));
let shared_tm_store = SharedTmStore::new(tm_pool_shared.clone());
let (test_srv_tx, test_srv_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel();
let verif_sender = MpscTmInStoreSender::new(
0,
"verif_sender",
shared_tm_store.backing_pool(),
tm_tx.clone(),
);
let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let mut verification_handler =
StdVerifReporterWithSender::new(&verif_cfg, Box::new(verif_sender));
let mut pus_17_handler = PusService17TestHandler::new(
test_srv_rx,
tc_pool_shared.clone(),
tm_tx,
shared_tm_store,
TEST_APID,
verification_handler.clone(),
);
// Create a ping TC, verify acceptance.
let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap();
let sec_header = PusTcSecondaryHeader::new_simple(17, 1);
let ping_tc = PusTc::new(&mut sp_header, sec_header, None, true);
let token = verification_handler.add_tc(&ping_tc);
let token = verification_handler
.acceptance_success(token, None)
.unwrap();
let tc_size = ping_tc.write_to_bytes(&mut pus_buf).unwrap();
let mut tc_pool = tc_pool_shared.write().unwrap();
let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap();
drop(tc_pool);
// Send accepted TC to test service handler.
test_srv_tx.send((addr, token)).unwrap();
let result = pus_17_handler.handle_next_packet();
assert!(result.is_ok());
// We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and
// Completion TM
let mut next_msg = tm_rx.try_recv();
assert!(next_msg.is_ok());
let mut tm_addr = next_msg.unwrap();
let tm_pool = tm_pool_shared.read().unwrap();
let tm_raw = tm_pool.read(&tm_addr).unwrap();
let (tm, _) = PusTm::from_bytes(&tm_raw, 0).unwrap();
assert_eq!(tm.service(), 1);
assert_eq!(tm.subservice(), 1);
let req_id = RequestId::from_bytes(tm.user_data().unwrap()).unwrap();
assert_eq!(req_id, token.req_id());
// Acceptance TM
next_msg = tm_rx.try_recv();
assert!(next_msg.is_ok());
tm_addr = next_msg.unwrap();
let tm_raw = tm_pool.read(&tm_addr).unwrap();
// Is generated with CDS short timestamp.
let (tm, _) = PusTm::from_bytes(&tm_raw, 7).unwrap();
assert_eq!(tm.service(), 1);
assert_eq!(tm.subservice(), 3);
let req_id = RequestId::from_bytes(tm.user_data().unwrap()).unwrap();
assert_eq!(req_id, token.req_id());
// Ping reply
next_msg = tm_rx.try_recv();
assert!(next_msg.is_ok());
tm_addr = next_msg.unwrap();
let tm_raw = tm_pool.read(&tm_addr).unwrap();
// Is generated with CDS short timestamp.
let (tm, _) = PusTm::from_bytes(&tm_raw, 7).unwrap();
assert_eq!(tm.service(), 17);
assert_eq!(tm.subservice(), 2);
assert!(tm.user_data().is_none());
// TM completion
next_msg = tm_rx.try_recv();
assert!(next_msg.is_ok());
tm_addr = next_msg.unwrap();
let tm_raw = tm_pool.read(&tm_addr).unwrap();
// Is generated with CDS short timestamp.
let (tm, _) = PusTm::from_bytes(&tm_raw, 7).unwrap();
assert_eq!(tm.service(), 1);
assert_eq!(tm.subservice(), 7);
let req_id = RequestId::from_bytes(tm.user_data().unwrap()).unwrap();
assert_eq!(req_id, token.req_id());
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,8 @@
use core::cell::Cell;
use core::sync::atomic::{AtomicU16, Ordering};
#[cfg(feature = "alloc")]
use dyn_clone::DynClone;
use paste::paste;
use spacepackets::MAX_SEQ_COUNT;
#[cfg(feature = "std")]
pub use stdmod::*;
@ -15,21 +16,11 @@ pub trait SequenceCountProviderCore<Raw> {
fn increment(&self);
// TODO: Maybe remove this?
fn increment_mut(&mut self) {
self.increment();
}
fn get_and_increment(&self) -> Raw {
let val = self.get();
self.increment();
val
}
// TODO: Maybe remove this?
fn get_and_increment_mut(&mut self) -> Raw {
self.get_and_increment()
}
}
/// Extension trait which allows cloning a sequence count provider after it was turned into
@ -42,80 +33,218 @@ dyn_clone::clone_trait_object!(SequenceCountProvider<u16>);
impl<T, Raw> SequenceCountProvider<Raw> for T where T: SequenceCountProviderCore<Raw> + Clone {}
#[derive(Default, Clone)]
pub struct SeqCountProviderSimple {
seq_count: Cell<u16>,
pub struct SeqCountProviderSimple<T: Copy> {
seq_count: Cell<T>,
max_val: T,
}
impl SequenceCountProviderCore<u16> for SeqCountProviderSimple {
fn get(&self) -> u16 {
self.seq_count.get()
}
macro_rules! impl_for_primitives {
($($ty: ident,)+) => {
$(
paste! {
impl SeqCountProviderSimple<$ty> {
pub fn [<new_ $ty _max_val>](max_val: $ty) -> Self {
Self {
seq_count: Cell::new(0),
max_val,
}
}
fn increment(&self) {
self.get_and_increment();
}
pub fn [<new_ $ty>]() -> Self {
Self {
seq_count: Cell::new(0),
max_val: $ty::MAX
}
}
}
fn get_and_increment(&self) -> u16 {
let curr_count = self.seq_count.get();
impl SequenceCountProviderCore<$ty> for SeqCountProviderSimple<$ty> {
fn get(&self) -> $ty {
self.seq_count.get()
}
if curr_count == u16::MAX {
self.seq_count.set(0);
} else {
self.seq_count.set(curr_count + 1);
}
curr_count
fn increment(&self) {
self.get_and_increment();
}
fn get_and_increment(&self) -> $ty {
let curr_count = self.seq_count.get();
if curr_count == self.max_val {
self.seq_count.set(0);
} else {
self.seq_count.set(curr_count + 1);
}
curr_count
}
}
}
)+
}
}
pub struct SeqCountProviderAtomicRef {
atomic: AtomicU16,
ordering: Ordering,
impl_for_primitives!(u8, u16, u32, u64,);
/// This is a sequence count provider which wraps around at [MAX_SEQ_COUNT].
pub struct CcsdsSimpleSeqCountProvider {
provider: SeqCountProviderSimple<u16>,
}
impl SeqCountProviderAtomicRef {
pub const fn new(ordering: Ordering) -> Self {
impl CcsdsSimpleSeqCountProvider {
pub fn new() -> Self {
Self {
atomic: AtomicU16::new(0),
ordering,
provider: SeqCountProviderSimple::new_u16_max_val(MAX_SEQ_COUNT),
}
}
}
impl SequenceCountProviderCore<u16> for SeqCountProviderAtomicRef {
fn get(&self) -> u16 {
self.atomic.load(self.ordering)
impl Default for CcsdsSimpleSeqCountProvider {
fn default() -> Self {
Self::new()
}
}
fn increment(&self) {
self.atomic.fetch_add(1, self.ordering);
}
fn get_and_increment(&self) -> u16 {
self.atomic.fetch_add(1, self.ordering)
impl SequenceCountProviderCore<u16> for CcsdsSimpleSeqCountProvider {
delegate::delegate! {
to self.provider {
fn get(&self) -> u16;
fn increment(&self);
fn get_and_increment(&self) -> u16;
}
}
}
#[cfg(feature = "std")]
pub mod stdmod {
use super::*;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
#[derive(Clone, Default)]
pub struct SeqCountProviderSyncClonable {
seq_count: Arc<AtomicU16>,
macro_rules! sync_clonable_seq_counter_impl {
($($ty: ident,)+) => {
$(paste! {
/// These sequence counters can be shared between threads and can also be
/// configured to wrap around at specified maximum values. Please note that
/// that the API provided by this class will not panic und [Mutex] lock errors,
/// but it will yield 0 for the getter functions.
#[derive(Clone, Default)]
pub struct [<SeqCountProviderSync $ty:upper>] {
seq_count: Arc<Mutex<$ty>>,
max_val: $ty
}
impl [<SeqCountProviderSync $ty:upper>] {
pub fn new() -> Self {
Self::new_with_max_val($ty::MAX)
}
pub fn new_with_max_val(max_val: $ty) -> Self {
Self {
seq_count: Arc::default(),
max_val
}
}
}
impl SequenceCountProviderCore<$ty> for [<SeqCountProviderSync $ty:upper>] {
fn get(&self) -> $ty {
match self.seq_count.lock() {
Ok(counter) => *counter,
Err(_) => 0
}
}
fn increment(&self) {
self.get_and_increment();
}
fn get_and_increment(&self) -> $ty {
match self.seq_count.lock() {
Ok(mut counter) => {
let val = *counter;
if val == self.max_val {
*counter = 0;
} else {
*counter += 1;
}
val
}
Err(_) => 0,
}
}
}
})+
}
}
sync_clonable_seq_counter_impl!(u8, u16, u32, u64,);
}
#[cfg(test)]
mod tests {
use crate::seq_count::{
CcsdsSimpleSeqCountProvider, SeqCountProviderSimple, SeqCountProviderSyncU8,
SequenceCountProviderCore,
};
use spacepackets::MAX_SEQ_COUNT;
#[test]
fn test_u8_counter() {
let u8_counter = SeqCountProviderSimple::new_u8();
assert_eq!(u8_counter.get(), 0);
assert_eq!(u8_counter.get_and_increment(), 0);
assert_eq!(u8_counter.get_and_increment(), 1);
assert_eq!(u8_counter.get(), 2);
}
impl SequenceCountProviderCore<u16> for SeqCountProviderSyncClonable {
fn get(&self) -> u16 {
self.seq_count.load(Ordering::SeqCst)
#[test]
fn test_u8_counter_overflow() {
let u8_counter = SeqCountProviderSimple::new_u8();
for _ in 0..256 {
u8_counter.increment();
}
assert_eq!(u8_counter.get(), 0);
}
fn increment(&self) {
self.seq_count.fetch_add(1, Ordering::SeqCst);
}
#[test]
fn test_ccsds_counter() {
let ccsds_counter = CcsdsSimpleSeqCountProvider::default();
assert_eq!(ccsds_counter.get(), 0);
assert_eq!(ccsds_counter.get_and_increment(), 0);
assert_eq!(ccsds_counter.get_and_increment(), 1);
assert_eq!(ccsds_counter.get(), 2);
}
fn get_and_increment(&self) -> u16 {
self.seq_count.fetch_add(1, Ordering::SeqCst)
#[test]
fn test_ccsds_counter_overflow() {
let ccsds_counter = CcsdsSimpleSeqCountProvider::default();
for _ in 0..MAX_SEQ_COUNT + 1 {
ccsds_counter.increment();
}
assert_eq!(ccsds_counter.get(), 0);
}
#[test]
fn test_atomic_ref_counters() {
let sync_u8_counter = SeqCountProviderSyncU8::new();
assert_eq!(sync_u8_counter.get(), 0);
assert_eq!(sync_u8_counter.get_and_increment(), 0);
assert_eq!(sync_u8_counter.get_and_increment(), 1);
assert_eq!(sync_u8_counter.get(), 2);
}
#[test]
fn test_atomic_ref_counters_overflow() {
let sync_u8_counter = SeqCountProviderSyncU8::new();
for _ in 0..u8::MAX as u16 + 1 {
sync_u8_counter.increment();
}
assert_eq!(sync_u8_counter.get(), 0);
}
#[test]
fn test_atomic_ref_counters_overflow_custom_max_val() {
let sync_u8_counter = SeqCountProviderSyncU8::new_with_max_val(128);
for _ in 0..129 {
sync_u8_counter.increment();
}
assert_eq!(sync_u8_counter.get(), 0);
}
}

View File

@ -21,6 +21,7 @@
//! use satrs_core::tmtc::ccsds_distrib::{CcsdsPacketHandler, CcsdsDistributor};
//! use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore};
//! use spacepackets::{CcsdsPacket, SpHeader};
//! use spacepackets::ecss::SerializablePusPacket;
//! use spacepackets::tc::PusTc;
//!
//! #[derive (Default)]
@ -224,6 +225,7 @@ impl<E: 'static> CcsdsDistributor<E> {
pub(crate) mod tests {
use super::*;
use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler};
use spacepackets::ecss::SerializablePusPacket;
use spacepackets::tc::PusTc;
use spacepackets::CcsdsPacket;
use std::collections::VecDeque;

View File

@ -17,6 +17,7 @@
//! # Example
//!
//! ```rust
//! use spacepackets::ecss::SerializablePusPacket;
//! use satrs_core::tmtc::pus_distrib::{PusDistributor, PusServiceProvider};
//! use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore};
//! use spacepackets::SpHeader;

View File

@ -3,6 +3,40 @@ use spacepackets::time::TimeWriter;
use spacepackets::tm::{PusTm, PusTmSecondaryHeader};
use spacepackets::SpHeader;
#[cfg(feature = "std")]
pub use std_mod::*;
#[cfg(feature = "std")]
pub mod std_mod {
use crate::pool::{SharedPool, StoreAddr};
use spacepackets::ecss::SerializablePusPacket;
use spacepackets::tm::PusTm;
#[derive(Clone)]
pub struct SharedTmStore {
pool: SharedPool,
}
impl SharedTmStore {
pub fn new(backing_pool: SharedPool) -> Self {
Self { pool: backing_pool }
}
pub fn backing_pool(&self) -> SharedPool {
self.pool.clone()
}
pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr {
let mut pg = self.pool.write().expect("error locking TM store");
let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error");
pus_tm
.write_to_bytes(buf)
.expect("writing PUS TM to store failed");
addr
}
}
}
pub struct PusTmWithCdsShortHelper {
apid: u16,
cds_short_buf: [u8; 7],
@ -29,7 +63,7 @@ impl PusTmWithCdsShortHelper {
self.create_pus_tm_common(service, subservice, source_data, seq_count)
}
pub fn create_pus_tm_with_stamp<'a>(
pub fn create_pus_tm_with_stamper<'a>(
&'a mut self,
service: u8,
subservice: u8,

View File

@ -1,18 +1,18 @@
// TODO: Refactor this to also test the STD impl using mpsc
// TODO: Change back to cross-beam as soon as STD impl was added back for TM.
#[cfg(feature = "crossbeam")]
pub mod crossbeam_test {
use hashbrown::HashMap;
use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool};
use satrs_core::pus::verification::{
CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg,
VerificationReporterWithSender,
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
};
use satrs_core::seq_count::SeqCountProviderSyncClonable;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket};
use satrs_core::pus::MpscTmInStoreSender;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm;
use spacepackets::SpHeader;
use std::sync::{Arc, RwLock};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::Duration;
@ -32,24 +32,16 @@ pub mod crossbeam_test {
// We use a synced sequence count provider here because both verification reporters have the
// the same APID. If they had distinct APIDs, the more correct approach would be to have
// each reporter have an own sequence count provider.
let cfg = VerificationReporterCfg::new(
TEST_APID,
Box::new(SeqCountProviderSyncClonable::default()),
Box::new(SeqCountProviderSyncClonable::default()),
1,
2,
8,
)
.unwrap();
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
// Shared pool object to store the verification PUS telemetry
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
let shared_tm_pool: SharedPool =
Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = crossbeam_channel::bounded(5);
let (tx, rx) = mpsc::channel();
let sender =
CrossbeamVerifSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone());
MpscTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone());
let mut reporter_with_sender_0 =
VerificationReporterWithSender::new(&cfg, Box::new(sender));
let mut reporter_with_sender_1 = reporter_with_sender_0.clone();

View File

@ -14,6 +14,7 @@ delegate = "0.10"
zerocopy = "0.6"
csv = "1"
num_enum = "0.6"
thiserror = "1"
[dependencies.satrs-core]
path = "../satrs-core"

View File

@ -1,5 +1,6 @@
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs_core::events::{EventU32TypedSev, SeverityInfo};
use satrs_core::objects::ObjectId;
use std::net::Ipv4Addr;
use satrs_mib::res_code::{ResultU16, ResultU16Info};
@ -17,6 +18,11 @@ pub enum RequestTargetId {
AcsSubsystem = 1,
}
pub const ACS_OBJECT_ID: ObjectId = ObjectId {
id: RequestTargetId::AcsSubsystem as u32,
name: "ACS_SUBSYSTEM",
};
#[derive(Debug)]
pub enum GroupId {
Tmtc = 0,
@ -38,6 +44,8 @@ pub mod tmtc_err {
pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 1);
#[resultcode]
pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2);
#[resultcode]
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 3);
#[resultcode(
info = "Not enough data inside the TC application data field. Optionally includes: \

View File

@ -9,10 +9,14 @@ use log::{info, warn};
use crate::hk::AcsHkIds;
use crate::logging::setup_logger;
use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler};
use crate::pus::event::Pus5Wrapper;
use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler};
use crate::pus::scheduler::Pus11Wrapper;
use crate::pus::test::Service17CustomWrapper;
use crate::pus::PusTcMpscRouter;
use crate::requests::{Request, RequestWithToken};
use crate::tmtc::{
core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmStore, PUS_APID,
};
use crate::tmtc::{core_tmtc_task, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, PUS_APID};
use satrs_core::event_man::{
EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider,
};
@ -23,19 +27,23 @@ use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,
};
use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::hk::Subservice as HkSubservice;
use satrs_core::pus::verification::{
MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender,
};
use satrs_core::pus::MpscTmtcInStoreSender;
use satrs_core::seq_count::{SeqCountProviderSimple, SeqCountProviderSyncClonable};
use satrs_core::pus::scheduler::PusScheduler;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::test::PusService17TestHandler;
use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
use satrs_core::pus::MpscTmInStoreSender;
use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore};
use satrs_core::spacepackets::tm::PusTmZeroCopyWriter;
use satrs_core::spacepackets::{
time::cds::TimeProvider,
time::TimeWriter,
tm::{PusTm, PusTmSecondaryHeader},
SequenceFlags, SpHeader,
};
use satrs_core::tmtc::AddressableId;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{AddressableId, TargetId};
use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT};
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
@ -55,9 +63,8 @@ fn main() {
(15, 1024),
(15, 2048),
]));
let tm_store = TmStore {
pool: Arc::new(RwLock::new(Box::new(tm_pool))),
};
let tm_store = SharedTmStore::new(Arc::new(RwLock::new(Box::new(tm_pool))));
let tm_store_event = tm_store.clone();
let tc_pool = LocalPool::new(PoolCfg::new(vec![
(30, 32),
(15, 64),
@ -70,29 +77,19 @@ fn main() {
pool: Arc::new(RwLock::new(Box::new(tc_pool))),
};
let seq_count_provider = SeqCountProviderSyncClonable::default();
let seq_count_provider_verif = seq_count_provider.clone();
let seq_count_provider_tmtc = seq_count_provider;
let seq_count_provider = CcsdsSimpleSeqCountProvider::new();
let mut msg_counter_map: HashMap<u8, u16> = HashMap::new();
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel();
let verif_sender = MpscVerifSender::new(
let verif_sender = MpscTmInStoreSender::new(
0,
"verif_sender",
tm_store.pool.clone(),
tm_store.backing_pool(),
tm_funnel_tx.clone(),
);
let verif_cfg = VerificationReporterCfg::new(
PUS_APID,
Box::new(seq_count_provider_verif),
#[allow(clippy::box_default)]
Box::new(SeqCountProviderSimple::default()),
1,
2,
8,
)
.unwrap();
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender));
@ -107,6 +104,7 @@ fn main() {
// The event manager will receive the RX handle to receive all the events.
let (event_sender, event_man_rx) = channel();
let event_recv = MpscEventReceiver::<EventU32>::new(event_man_rx);
let test_srv_event_sender = event_sender;
let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv));
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
@ -123,24 +121,16 @@ fn main() {
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = HashMap::new();
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx);
request_map.insert(RequestTargetId::AcsSubsystem as TargetId, acs_thread_tx);
let tc_source = PusTcSource {
tc_store,
let tc_source_wrapper = PusTcSource {
tc_store: tc_store.clone(),
tc_source: tc_source_tx,
};
// Create clones here to allow moving the values
let core_args = OtherArgs {
sock_addr,
verif_reporter,
event_sender,
event_request_tx,
request_map,
seq_count_provider: seq_count_provider_tmtc,
};
let tc_args = TcArgs {
tc_source,
tc_source: tc_source_wrapper.clone(),
tc_receiver: tc_source_rx,
};
let tm_args = TmArgs {
@ -149,14 +139,86 @@ fn main() {
tm_server_rx,
};
let aocs_to_funnel = tm_funnel_tx.clone();
let aocs_tm_funnel = tm_funnel_tx.clone();
let mut aocs_tm_store = tm_store.clone();
let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel();
let (pus_sched_tx, pus_sched_rx) = channel();
let (pus_hk_tx, pus_hk_rx) = channel();
let (pus_action_tx, pus_action_rx) = channel();
let pus_router = PusTcMpscRouter {
test_service_receiver: pus_test_tx,
event_service_receiver: pus_event_tx,
sched_service_receiver: pus_sched_tx,
hk_service_receiver: pus_hk_tx,
action_service_receiver: pus_action_tx,
};
let pus17_handler = PusService17TestHandler::new(
pus_test_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
verif_reporter.clone(),
);
let mut pus_17_wrapper = Service17CustomWrapper {
pus17_handler,
test_srv_event_sender,
};
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed");
let pus_11_handler = PusService11SchedHandler::new(
pus_sched_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
verif_reporter.clone(),
scheduler,
);
let mut pus_11_wrapper = Pus11Wrapper {
pus_11_handler,
tc_source_wrapper,
};
let pus_5_handler = PusService5EventHandler::new(
pus_event_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
verif_reporter.clone(),
event_request_tx,
);
let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler };
let pus_8_handler = PusService8ActionHandler::new(
pus_action_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
verif_reporter.clone(),
request_map.clone(),
);
let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler };
let pus_3_handler = PusService3HkHandler::new(
pus_hk_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
verif_reporter.clone(),
request_map,
);
let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler };
info!("Starting TMTC task");
let jh0 = thread::Builder::new()
.name("TMTC".to_string())
.spawn(move || {
core_tmtc_task(core_args, tc_args, tm_args);
core_tmtc_task(sock_addr, tc_args, tm_args, verif_reporter, pus_router);
})
.unwrap();
@ -170,6 +232,29 @@ fn main() {
};
loop {
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = tm_store.backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
let mut zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw)
.expect("Creating TM zero copy writer failed");
zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(seq_count_provider.get_and_increment());
let entry = msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
tm_funnel
.tm_server_tx
.send(addr)
@ -184,12 +269,16 @@ fn main() {
.name("Event".to_string())
.spawn(move || {
let mut timestamp: [u8; 7] = [0; 7];
let mut sender =
MpscTmtcInStoreSender::new(1, "event_sender", tm_store.pool, tm_funnel_tx);
let mut sender = MpscTmInStoreSender::new(
1,
"event_sender",
tm_store_event.backing_pool(),
tm_funnel_tx,
);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
reporter_event_handler
.completion_success(event_req.token, Some(timestamp))
.completion_success(event_req.token.try_into().unwrap(), Some(timestamp))
.expect("Sending completion success failed");
};
loop {
@ -245,7 +334,7 @@ fn main() {
);
update_time(&mut time_provider, &mut timestamp);
match request.targeted_request.request {
Request::HkRequest(hk_req) => match hk_req {
Request::Hk(hk_req) => match hk_req {
HkRequest::OneShot(unique_id) => {
let target = request.targeted_request.target_id;
assert_eq!(target, RequestTargetId::AcsSubsystem as u32);
@ -277,16 +366,19 @@ fn main() {
true,
);
let addr = aocs_tm_store.add_pus_tm(&pus_tm);
aocs_to_funnel.send(addr).expect("Sending HK TM failed");
aocs_tm_funnel.send(addr).expect("Sending HK TM failed");
}
}
HkRequest::Enable(_) => {}
HkRequest::Disable(_) => {}
HkRequest::ModifyCollectionInterval(_, _) => {}
},
Request::ModeRequest(_mode_req) => {
Request::Mode(_mode_req) => {
warn!("mode request handling not implemented yet")
}
Request::Action(_action_req) => {
warn!("action request handling not implemented yet")
}
}
let started_token = reporter_aocs
.start_success(request.token, Some(&timestamp))
@ -307,10 +399,35 @@ fn main() {
})
.unwrap();
info!("Starting PUS handler thread");
let jh4 = thread::Builder::new()
.name("PUS".to_string())
.spawn(move || loop {
pus_11_wrapper.release_tcs();
loop {
let mut all_queues_empty = true;
let mut is_srv_finished = |srv_handler_finished: bool| {
if !srv_handler_finished {
all_queues_empty = false;
}
};
is_srv_finished(pus_17_wrapper.handle_next_packet());
is_srv_finished(pus_11_wrapper.handle_next_packet());
is_srv_finished(pus_5_wrapper.handle_next_packet());
is_srv_finished(pus_8_wrapper.handle_next_packet());
is_srv_finished(pus_3_wrapper.handle_next_packet());
if all_queues_empty {
break;
}
}
thread::sleep(Duration::from_millis(200));
})
.unwrap();
jh0.join().expect("Joining UDP TMTC server thread failed");
jh1.join().expect("Joining TM Funnel thread failed");
jh2.join().expect("Joining Event Manager thread failed");
jh3.join().expect("Joining AOCS thread failed");
jh4.join().expect("Joining PUS handler thread failed");
}
pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) {

View File

@ -1,689 +0,0 @@
use crate::requests::{Request, RequestWithToken};
use crate::tmtc::{PusTcSource, TmStore};
use log::{info, warn};
use satrs_core::events::EventU32;
use satrs_core::hk::{CollectionIntervalFactor, HkRequest};
use satrs_core::mode::{ModeAndSubmode, ModeRequest};
use satrs_core::params::Params;
use satrs_core::pool::{PoolProvider, StoreAddr};
use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken};
use satrs_core::pus::mode;
use satrs_core::pus::mode::Subservice;
use satrs_core::pus::scheduling::PusScheduler;
use satrs_core::pus::verification::{
pus_11_generic_tc_check, FailParams, StdVerifReporterWithSender, TcStateAccepted,
VerificationToken,
};
use satrs_core::pus::{event, GenericTcCheckError};
use satrs_core::pus::{hk, EcssTmSender, EcssTmSenderCore};
use satrs_core::res_code::ResultU16;
use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore};
use satrs_core::spacepackets::ecss::{scheduling, PusServiceId};
use satrs_core::spacepackets::time::CcsdsTimeProvider;
use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper;
use satrs_core::tmtc::{AddressableId, PusServiceProvider, TargetId};
use satrs_core::{
spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, spacepackets::time::cds::TimeProvider,
spacepackets::time::TimeWriter, spacepackets::SpHeader,
};
use satrs_example::{hk_err, tmtc_err, CustomPusServiceId, TEST_EVENT};
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::rc::Rc;
use std::sync::mpsc::{Receiver, Sender};
pub trait PusTcMultiplexer {
fn route_pus_tc(tc: &PusTc, apid: u16, service: u8, subservice: u8);
}
pub struct PusReceiver {
pub tm_helper: PusTmWithCdsShortHelper,
pub tm_args: PusTmArgs,
pub tc_args: PusTcArgs,
stamp_helper: TimeStampHelper,
}
pub struct PusTmArgs {
/// All telemetry is sent with this sender handle.
pub tm_tx: Sender<StoreAddr>,
/// All TM to be sent is stored here
pub tm_store: TmStore,
/// All verification reporting is done with this reporter.
pub verif_reporter: StdVerifReporterWithSender,
/// Sequence count provider for TMs sent from within pus demultiplexer
pub seq_count_provider: SeqCountProviderSyncClonable,
}
impl PusTmArgs {
fn vr(&mut self) -> &mut StdVerifReporterWithSender {
&mut self.verif_reporter
}
}
#[allow(dead_code)]
pub struct PusTcHandlerBase {
pub tc_store: Box<dyn PoolProvider>,
pub receiver: Receiver<(StoreAddr, VerificationToken<TcStateAccepted>)>,
pub verif_reporter: StdVerifReporterWithSender,
pub time_provider: Box<dyn CcsdsTimeProvider>,
}
pub trait TestHandlerNoPing {
fn handle_no_ping_tc(&mut self, tc: PusTc);
}
#[allow(dead_code)]
pub struct PusTestTcHandler {
pub base: PusTcHandlerBase,
handler: Option<Box<dyn TestHandlerNoPing>>,
}
#[allow(dead_code)]
pub struct PusScheduleTcHandler {
pub base: PusTestTcHandler,
}
impl PusTestTcHandler {
#[allow(dead_code)]
pub fn operation(&mut self) {
let (addr, token) = self.base.receiver.recv().unwrap();
let data = self.base.tc_store.read(&addr).unwrap();
let (pus_tc, _len) = PusTc::from_bytes(data).unwrap();
let stamp: [u8; 7] = [0; 7];
if pus_tc.subservice() == 1 {
self.base
.verif_reporter
.completion_success(token, Some(&stamp))
.unwrap();
} else if let Some(handler) = &mut self.handler {
handler.handle_no_ping_tc(pus_tc);
}
}
}
pub struct PusTcArgs {
pub event_request_tx: Sender<EventRequestWithToken>,
/// Request routing helper. Maps targeted requests to their recipient.
pub request_map: HashMap<TargetId, Sender<RequestWithToken>>,
/// Required for scheduling of telecommands.
pub tc_source: PusTcSource,
pub event_sender: Sender<(EventU32, Option<Params>)>,
pub scheduler: Rc<RefCell<PusScheduler>>,
}
struct TimeStampHelper {
stamper: TimeProvider,
time_stamp: [u8; 7],
}
impl TimeStampHelper {
pub fn new() -> Self {
Self {
stamper: TimeProvider::new_with_u16_days(0, 0),
time_stamp: [0; 7],
}
}
pub fn stamp(&self) -> &[u8] {
&self.time_stamp
}
pub fn update_from_now(&mut self) {
self.stamper
.update_from_now()
.expect("Updating timestamp failed");
self.stamper
.write_to_bytes(&mut self.time_stamp)
.expect("Writing timestamp failed");
}
}
impl PusReceiver {
pub fn new(apid: u16, tm_arguments: PusTmArgs, tc_arguments: PusTcArgs) -> Self {
Self {
tm_helper: PusTmWithCdsShortHelper::new(apid),
tm_args: tm_arguments,
tc_args: tc_arguments,
stamp_helper: TimeStampHelper::new(),
}
}
}
impl PusServiceProvider for PusReceiver {
type Error = ();
fn handle_pus_tc_packet(
&mut self,
service: u8,
_header: &SpHeader,
pus_tc: &PusTc,
) -> Result<(), Self::Error> {
let init_token = self.tm_args.verif_reporter.add_tc(pus_tc);
self.stamp_helper.update_from_now();
let accepted_token = self
.tm_args
.vr()
.acceptance_success(init_token, Some(self.stamp_helper.stamp()))
.expect("Acceptance success failure");
let service = PusServiceId::try_from(service);
match service {
Ok(standard_service) => match standard_service {
PusServiceId::Test => self.handle_test_service(pus_tc, accepted_token),
PusServiceId::Housekeeping => self.handle_hk_request(pus_tc, accepted_token),
PusServiceId::Event => self.handle_event_request(pus_tc, accepted_token),
PusServiceId::Scheduling => self.handle_scheduled_tc(pus_tc, accepted_token),
_ => self
.tm_args
.verif_reporter
.start_failure(
accepted_token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::PUS_SERVICE_NOT_IMPLEMENTED,
Some(&[standard_service as u8]),
),
)
.expect("Start failure verification failed"),
},
Err(e) => {
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
match custom_service {
CustomPusServiceId::Mode => {
self.handle_mode_service(pus_tc, accepted_token)
}
CustomPusServiceId::Health => {}
}
} else {
self.tm_args
.verif_reporter
.start_failure(
accepted_token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::INVALID_PUS_SUBSERVICE,
Some(&[e.number]),
),
)
.expect("Start failure verification failed")
}
}
}
Ok(())
}
}
impl PusReceiver {
fn handle_test_service(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
match PusPacket::subservice(pus_tc) {
1 => {
info!("Received PUS ping command TC[17,1]");
info!("Sending ping reply PUS TM[17,2]");
let start_token = self
.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("Error sending start success");
let ping_reply = self.tm_helper.create_pus_tm_timestamp_now(
17,
2,
None,
self.tm_args.seq_count_provider.get(),
);
let addr = self.tm_args.tm_store.add_pus_tm(&ping_reply);
self.tm_args
.tm_tx
.send(addr)
.expect("Sending TM to TM funnel failed");
self.tm_args.seq_count_provider.increment();
self.tm_args
.verif_reporter
.completion_success(start_token, Some(self.stamp_helper.stamp()))
.expect("Error sending completion success");
}
128 => {
info!("Generating test event");
self.tc_args
.event_sender
.send((TEST_EVENT.into(), None))
.expect("Sending test event failed");
let start_token = self
.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("Error sending start success");
self.tm_args
.verif_reporter
.completion_success(start_token, Some(self.stamp_helper.stamp()))
.expect("Error sending completion success");
}
_ => {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::INVALID_PUS_SUBSERVICE,
None,
),
)
.expect("Sending start failure TM failed");
}
}
}
fn handle_hk_request(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
if pus_tc.user_data().is_none() {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::NOT_ENOUGH_APP_DATA,
None,
),
)
.expect("Sending start failure TM failed");
return;
}
let user_data = pus_tc.user_data().unwrap();
if user_data.len() < 8 {
let err = if user_data.len() < 4 {
&hk_err::TARGET_ID_MISSING
} else {
&hk_err::UNIQUE_ID_MISSING
};
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(Some(self.stamp_helper.stamp()), err, None),
)
.expect("Sending start failure TM failed");
return;
}
let addressable_id = AddressableId::from_raw_be(user_data).unwrap();
if !self
.tc_args
.request_map
.contains_key(&addressable_id.target_id)
{
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&hk_err::UNKNOWN_TARGET_ID,
None,
),
)
.expect("Sending start failure TM failed");
return;
}
let send_request = |target: TargetId, request: HkRequest| {
let sender = self
.tc_args
.request_map
.get(&addressable_id.target_id)
.unwrap();
sender
.send(RequestWithToken::new(
target,
Request::HkRequest(request),
token,
))
.unwrap_or_else(|_| panic!("Sending HK request {request:?} failed"));
};
if PusPacket::subservice(pus_tc) == hk::Subservice::TcEnableHkGeneration as u8 {
send_request(
addressable_id.target_id,
HkRequest::Enable(addressable_id.unique_id),
);
} else if PusPacket::subservice(pus_tc) == hk::Subservice::TcDisableHkGeneration as u8 {
send_request(
addressable_id.target_id,
HkRequest::Disable(addressable_id.unique_id),
);
} else if PusPacket::subservice(pus_tc) == hk::Subservice::TcGenerateOneShotHk as u8 {
send_request(
addressable_id.target_id,
HkRequest::OneShot(addressable_id.unique_id),
);
} else if PusPacket::subservice(pus_tc)
== hk::Subservice::TcModifyHkCollectionInterval as u8
{
if user_data.len() < 12 {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&hk_err::COLLECTION_INTERVAL_MISSING,
None,
),
)
.expect("Sending start failure TM failed");
return;
}
send_request(
addressable_id.target_id,
HkRequest::ModifyCollectionInterval(
addressable_id.unique_id,
CollectionIntervalFactor::from_be_bytes(user_data[8..12].try_into().unwrap()),
),
);
}
}
fn handle_event_request(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
let send_start_failure = |vr: &mut StdVerifReporterWithSender,
timestamp: &[u8],
failure_code: &ResultU16,
failure_data: Option<&[u8]>| {
vr.start_failure(
token,
FailParams::new(Some(timestamp), failure_code, failure_data),
)
.expect("Sending start failure TM failed");
};
let send_start_acceptance = |vr: &mut StdVerifReporterWithSender, timestamp: &[u8]| {
vr.start_success(token, Some(timestamp))
.expect("Sending start success TM failed")
};
if pus_tc.user_data().is_none() {
send_start_failure(
&mut self.tm_args.verif_reporter,
self.stamp_helper.stamp(),
&tmtc_err::NOT_ENOUGH_APP_DATA,
None,
);
return;
}
let app_data = pus_tc.user_data().unwrap();
if app_data.len() < 4 {
send_start_failure(
&mut self.tm_args.verif_reporter,
self.stamp_helper.stamp(),
&tmtc_err::NOT_ENOUGH_APP_DATA,
None,
);
return;
}
let event_id = EventU32::from(u32::from_be_bytes(app_data.try_into().unwrap()));
match PusPacket::subservice(pus_tc).try_into() {
Ok(event::Subservice::TcEnableEventGeneration) => {
let start_token = send_start_acceptance(
&mut self.tm_args.verif_reporter,
self.stamp_helper.stamp(),
);
self.tc_args
.event_request_tx
.send(EventRequestWithToken {
request: EventRequest::Enable(event_id),
token: start_token,
})
.expect("Sending event request failed");
}
Ok(event::Subservice::TcDisableEventGeneration) => {
let start_token = send_start_acceptance(
&mut self.tm_args.verif_reporter,
self.stamp_helper.stamp(),
);
self.tc_args
.event_request_tx
.send(EventRequestWithToken {
request: EventRequest::Disable(event_id),
token: start_token,
})
.expect("Sending event request failed");
}
_ => {
send_start_failure(
&mut self.tm_args.verif_reporter,
self.stamp_helper.stamp(),
&tmtc_err::INVALID_PUS_SUBSERVICE,
None,
);
}
}
}
fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
let subservice = match pus_11_generic_tc_check(pus_tc) {
Ok(subservice) => subservice,
Err(e) => match e {
GenericTcCheckError::NotEnoughAppData => {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::NOT_ENOUGH_APP_DATA,
None,
),
)
.expect("could not sent verification error");
return;
}
GenericTcCheckError::InvalidSubservice => {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::INVALID_PUS_SUBSERVICE,
None,
),
)
.expect("could not sent verification error");
return;
}
},
};
match subservice {
scheduling::Subservice::TcEnableScheduling => {
let start_token = self
.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("Error sending start success");
let mut scheduler = self.tc_args.scheduler.borrow_mut();
scheduler.enable();
if scheduler.is_enabled() {
self.tm_args
.verif_reporter
.completion_success(start_token, Some(self.stamp_helper.stamp()))
.expect("Error sending completion success");
} else {
panic!("Failed to enable scheduler");
}
}
scheduling::Subservice::TcDisableScheduling => {
let start_token = self
.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("Error sending start success");
let mut scheduler = self.tc_args.scheduler.borrow_mut();
scheduler.disable();
if !scheduler.is_enabled() {
self.tm_args
.verif_reporter
.completion_success(start_token, Some(self.stamp_helper.stamp()))
.expect("Error sending completion success");
} else {
panic!("Failed to disable scheduler");
}
}
scheduling::Subservice::TcResetScheduling => {
let start_token = self
.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("Error sending start success");
let mut pool = self
.tc_args
.tc_source
.tc_store
.pool
.write()
.expect("Locking pool failed");
let mut scheduler = self.tc_args.scheduler.borrow_mut();
scheduler
.reset(pool.as_mut())
.expect("Error resetting TC Pool");
drop(scheduler);
self.tm_args
.verif_reporter
.completion_success(start_token, Some(self.stamp_helper.stamp()))
.expect("Error sending completion success");
}
scheduling::Subservice::TcInsertActivity => {
let start_token = self
.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("error sending start success");
let mut pool = self
.tc_args
.tc_source
.tc_store
.pool
.write()
.expect("locking pool failed");
let mut scheduler = self.tc_args.scheduler.borrow_mut();
scheduler
.insert_wrapped_tc::<TimeProvider>(pus_tc, pool.as_mut())
.expect("insertion of activity into pool failed");
drop(scheduler);
self.tm_args
.verif_reporter
.completion_success(start_token, Some(self.stamp_helper.stamp()))
.expect("sending completion success failed");
}
_ => {}
}
}
fn handle_mode_service(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
let mut app_data_len = 0;
let app_data = pus_tc.user_data();
if app_data.is_some() {
app_data_len = pus_tc.user_data().unwrap().len();
}
if app_data_len < 4 {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::NOT_ENOUGH_APP_DATA,
Some(format!("expected {} bytes, found {}", 4, app_data_len).as_bytes()),
),
)
.expect("Sending start failure TM failed");
}
let app_data = app_data.unwrap();
let mut invalid_subservice_handler = || {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::INVALID_PUS_SUBSERVICE,
Some(&[PusPacket::subservice(pus_tc)]),
),
)
.expect("Sending start failure TM failed");
};
let subservice = mode::Subservice::try_from(PusPacket::subservice(pus_tc));
if let Ok(subservice) = subservice {
let forward_mode_request = |target_id, mode_request: ModeRequest| match self
.tc_args
.request_map
.get(&target_id)
{
None => warn!("not mode request recipient for target ID {target_id} found"),
Some(sender_to_recipient) => {
sender_to_recipient
.send(RequestWithToken::new(
target_id,
Request::ModeRequest(mode_request),
token,
))
.expect("sending mode request failed");
}
};
let mut valid_subservice = true;
match subservice {
Subservice::TcSetMode => {
let target_id = u32::from_be_bytes(app_data[0..4].try_into().unwrap());
let min_len = ModeAndSubmode::raw_len() + 4;
if app_data_len < min_len {
self.tm_args
.verif_reporter
.start_failure(
token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::NOT_ENOUGH_APP_DATA,
Some(
format!("expected {min_len} bytes, found {app_data_len}")
.as_bytes(),
),
),
)
.expect("Sending start failure TM failed");
}
// Should never fail after size check
let mode_submode = ModeAndSubmode::from_be_bytes(
app_data[4..4 + ModeAndSubmode::raw_len()]
.try_into()
.unwrap(),
)
.unwrap();
forward_mode_request(target_id, ModeRequest::SetMode(mode_submode));
}
Subservice::TcReadMode => {
let target_id = u32::from_be_bytes(app_data[0..4].try_into().unwrap());
forward_mode_request(target_id, ModeRequest::ReadMode);
}
Subservice::TcAnnounceMode => {
let target_id = u32::from_be_bytes(app_data[0..4].try_into().unwrap());
forward_mode_request(target_id, ModeRequest::AnnounceMode);
}
Subservice::TcAnnounceModeRecursive => {
let target_id = u32::from_be_bytes(app_data[0..4].try_into().unwrap());
forward_mode_request(target_id, ModeRequest::AnnounceModeRecursive);
}
_ => {
warn!("Can not process mode request with subservice {subservice:?}");
invalid_subservice_handler();
valid_subservice = false;
}
}
if valid_subservice {
self.tm_args
.verif_reporter
.start_success(token, Some(self.stamp_helper.stamp()))
.expect("sending start success TM failed");
}
} else {
invalid_subservice_handler();
}
}
}

View File

@ -0,0 +1,181 @@
use crate::requests::{ActionRequest, Request, RequestWithToken};
use log::{error, warn};
use satrs_core::pool::{SharedPool, StoreAddr};
use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
};
use satrs_core::pus::{
AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler,
};
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::spacepackets::tc::PusTc;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::TargetId;
use satrs_example::tmtc_err;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
pub struct PusService8ActionHandler {
psb: PusServiceBase,
request_handlers: HashMap<TargetId, Sender<RequestWithToken>>,
}
impl PusService8ActionHandler {
pub fn new(
receiver: Receiver<AcceptedTc>,
tc_pool: SharedPool,
tm_tx: Sender<StoreAddr>,
tm_store: SharedTmStore,
tm_apid: u16,
verification_handler: StdVerifReporterWithSender,
request_handlers: HashMap<TargetId, Sender<RequestWithToken>>,
) -> Self {
Self {
psb: PusServiceBase::new(
receiver,
tc_pool,
tm_tx,
tm_store,
tm_apid,
verification_handler,
),
request_handlers,
}
}
}
impl PusService8ActionHandler {
fn handle_action_request_with_id(
&self,
token: VerificationToken<TcStateAccepted>,
tc: &PusTc,
time_stamp: &[u8],
) -> Result<(), PusPacketHandlingError> {
let user_data = tc.user_data();
if user_data.is_none() || user_data.unwrap().len() < 8 {
self.psb()
.verification_handler
.borrow_mut()
.start_failure(
token,
FailParams::new(Some(time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None),
)
.expect("Sending start failure failed");
return Err(PusPacketHandlingError::NotEnoughAppData(
"Expected at least 4 bytes".into(),
));
}
let user_data = user_data.unwrap();
let target_id = u32::from_be_bytes(user_data[0..4].try_into().unwrap());
let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap());
if let Some(sender) = self.request_handlers.get(&target_id) {
sender
.send(RequestWithToken::new(
target_id,
Request::Action(ActionRequest::CmdWithU32Id((
action_id,
Vec::from(&user_data[8..]),
))),
token,
))
.expect("Forwarding action request failed");
} else {
let mut fail_data: [u8; 4] = [0; 4];
fail_data.copy_from_slice(&target_id.to_be_bytes());
self.psb()
.verification_handler
.borrow_mut()
.start_failure(
token,
FailParams::new(
Some(time_stamp),
&tmtc_err::UNKNOWN_TARGET_ID,
Some(&fail_data),
),
)
.expect("Sending start failure failed");
return Err(PusPacketHandlingError::OtherError(format!(
"Unknown target ID {target_id}"
)));
}
Ok(())
}
}
impl PusServiceHandler for PusService8ActionHandler {
fn psb_mut(&mut self) -> &mut PusServiceBase {
&mut self.psb
}
fn psb(&self) -> &PusServiceBase {
&self.psb
}
fn handle_one_tc(
&mut self,
addr: StoreAddr,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb().pus_buf).unwrap();
let subservice = tc.subservice();
let mut partial_error = None;
let time_stamp = self.psb().get_current_timestamp(&mut partial_error);
match subservice {
128 => {
self.handle_action_request_with_id(token, &tc, &time_stamp)?;
}
_ => {
let fail_data = [subservice];
self.psb_mut()
.verification_handler
.get_mut()
.start_failure(
token,
FailParams::new(
Some(&time_stamp),
&tmtc_err::INVALID_PUS_SUBSERVICE,
Some(&fail_data),
),
)
.expect("Sending start failure failed");
return Err(PusPacketHandlingError::InvalidSubservice(subservice));
}
}
if let Some(partial_error) = partial_error {
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
}
Ok(PusPacketHandlerResult::RequestHandled)
}
}
pub struct Pus8Wrapper {
pub(crate) pus_8_handler: PusService8ActionHandler,
}
impl Pus8Wrapper {
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_8_handler.handle_next_packet() {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 8 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 8 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 8 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
}

View File

@ -0,0 +1,33 @@
use log::{error, warn};
use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler};
pub struct Pus5Wrapper {
pub pus_5_handler: PusService5EventHandler,
}
impl Pus5Wrapper {
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_5_handler.handle_next_packet() {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 5 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 5 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 5 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
}

195
satrs-example/src/pus/hk.rs Normal file
View File

@ -0,0 +1,195 @@
use crate::requests::{Request, RequestWithToken};
use log::{error, warn};
use satrs_core::hk::{CollectionIntervalFactor, HkRequest};
use satrs_core::pool::{SharedPool, StoreAddr};
use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
};
use satrs_core::pus::{
AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler,
};
use satrs_core::spacepackets::ecss::{hk, PusPacket};
use satrs_core::spacepackets::tc::PusTc;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{AddressableId, TargetId};
use satrs_example::{hk_err, tmtc_err};
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
pub struct PusService3HkHandler {
psb: PusServiceBase,
request_handlers: HashMap<TargetId, Sender<RequestWithToken>>,
}
impl PusService3HkHandler {
pub fn new(
receiver: Receiver<AcceptedTc>,
tc_pool: SharedPool,
tm_tx: Sender<StoreAddr>,
tm_store: SharedTmStore,
tm_apid: u16,
verification_handler: StdVerifReporterWithSender,
request_handlers: HashMap<TargetId, Sender<RequestWithToken>>,
) -> Self {
Self {
psb: PusServiceBase::new(
receiver,
tc_pool,
tm_tx,
tm_store,
tm_apid,
verification_handler,
),
request_handlers,
}
}
}
impl PusServiceHandler for PusService3HkHandler {
fn psb_mut(&mut self) -> &mut PusServiceBase {
&mut self.psb
}
fn psb(&self) -> &PusServiceBase {
&self.psb
}
fn handle_one_tc(
&mut self,
addr: StoreAddr,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb().pus_buf).unwrap();
let subservice = tc.subservice();
let mut partial_error = None;
let time_stamp = self.psb().get_current_timestamp(&mut partial_error);
let user_data = tc.user_data().unwrap();
if tc.user_data().is_none() {
self.psb
.verification_handler
.borrow_mut()
.start_failure(
token,
FailParams::new(Some(&time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None),
)
.expect("Sending start failure TM failed");
return Err(PusPacketHandlingError::NotEnoughAppData(
"Expected at least 8 bytes of app data".into(),
));
}
if user_data.len() < 8 {
let err = if user_data.len() < 4 {
&hk_err::TARGET_ID_MISSING
} else {
&hk_err::UNIQUE_ID_MISSING
};
self.psb
.verification_handler
.borrow_mut()
.start_failure(token, FailParams::new(Some(&time_stamp), err, None))
.expect("Sending start failure TM failed");
return Err(PusPacketHandlingError::NotEnoughAppData(
"Expected at least 8 bytes of app data".into(),
));
}
let addressable_id = AddressableId::from_raw_be(user_data).unwrap();
if !self
.request_handlers
.contains_key(&addressable_id.target_id)
{
self.psb
.verification_handler
.borrow_mut()
.start_failure(
token,
FailParams::new(Some(&time_stamp), &hk_err::UNKNOWN_TARGET_ID, None),
)
.expect("Sending start failure TM failed");
let tgt_id = addressable_id.target_id;
return Err(PusPacketHandlingError::NotEnoughAppData(format!(
"Unknown target ID {tgt_id}"
)));
}
let send_request = |target: TargetId, request: HkRequest| {
let sender = self
.request_handlers
.get(&addressable_id.target_id)
.unwrap();
sender
.send(RequestWithToken::new(target, Request::Hk(request), token))
.unwrap_or_else(|_| panic!("Sending HK request {request:?} failed"));
};
if subservice == hk::Subservice::TcEnableHkGeneration as u8 {
send_request(
addressable_id.target_id,
HkRequest::Enable(addressable_id.unique_id),
);
} else if subservice == hk::Subservice::TcDisableHkGeneration as u8 {
send_request(
addressable_id.target_id,
HkRequest::Disable(addressable_id.unique_id),
);
} else if subservice == hk::Subservice::TcGenerateOneShotHk as u8 {
send_request(
addressable_id.target_id,
HkRequest::OneShot(addressable_id.unique_id),
);
} else if subservice == hk::Subservice::TcModifyHkCollectionInterval as u8 {
if user_data.len() < 12 {
self.psb
.verification_handler
.borrow_mut()
.start_failure(
token,
FailParams::new(
Some(&time_stamp),
&hk_err::COLLECTION_INTERVAL_MISSING,
None,
),
)
.expect("Sending start failure TM failed");
return Err(PusPacketHandlingError::NotEnoughAppData(
"Collection interval missing".into(),
));
}
send_request(
addressable_id.target_id,
HkRequest::ModifyCollectionInterval(
addressable_id.unique_id,
CollectionIntervalFactor::from_be_bytes(user_data[8..12].try_into().unwrap()),
),
);
}
Ok(PusPacketHandlerResult::RequestHandled)
}
}
pub struct Pus3Wrapper {
pub(crate) pus_3_handler: PusService3HkHandler,
}
impl Pus3Wrapper {
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_3_handler.handle_next_packet() {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 3 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 3 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 3 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
}

View File

@ -0,0 +1,141 @@
use crate::tmtc::MpscStoreAndSendError;
use log::warn;
use satrs_core::pool::StoreAddr;
use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender};
use satrs_core::pus::{AcceptedTc, PusPacketHandlerResult};
use satrs_core::spacepackets::ecss::PusServiceId;
use satrs_core::spacepackets::tc::PusTc;
use satrs_core::spacepackets::time::cds::TimeProvider;
use satrs_core::spacepackets::time::TimeWriter;
use satrs_example::{tmtc_err, CustomPusServiceId};
use std::sync::mpsc::Sender;
pub mod action;
pub mod event;
pub mod hk;
pub mod scheduler;
pub mod test;
pub struct PusTcMpscRouter {
pub test_service_receiver: Sender<AcceptedTc>,
pub event_service_receiver: Sender<AcceptedTc>,
pub sched_service_receiver: Sender<AcceptedTc>,
pub hk_service_receiver: Sender<AcceptedTc>,
pub action_service_receiver: Sender<AcceptedTc>,
}
pub struct PusReceiver {
pub verif_reporter: StdVerifReporterWithSender,
pub pus_router: PusTcMpscRouter,
stamp_helper: TimeStampHelper,
}
struct TimeStampHelper {
stamper: TimeProvider,
time_stamp: [u8; 7],
}
impl TimeStampHelper {
pub fn new() -> Self {
Self {
stamper: TimeProvider::new_with_u16_days(0, 0),
time_stamp: [0; 7],
}
}
pub fn stamp(&self) -> &[u8] {
&self.time_stamp
}
pub fn update_from_now(&mut self) {
self.stamper
.update_from_now()
.expect("Updating timestamp failed");
self.stamper
.write_to_bytes(&mut self.time_stamp)
.expect("Writing timestamp failed");
}
}
impl PusReceiver {
pub fn new(verif_reporter: StdVerifReporterWithSender, pus_router: PusTcMpscRouter) -> Self {
Self {
verif_reporter,
pus_router,
stamp_helper: TimeStampHelper::new(),
}
}
}
impl PusReceiver {
pub fn handle_tc_packet(
&mut self,
store_addr: StoreAddr,
service: u8,
pus_tc: &PusTc,
) -> Result<PusPacketHandlerResult, MpscStoreAndSendError> {
let init_token = self.verif_reporter.add_tc(pus_tc);
self.stamp_helper.update_from_now();
let accepted_token = self
.verif_reporter
.acceptance_success(init_token, Some(self.stamp_helper.stamp()))
.expect("Acceptance success failure");
let service = PusServiceId::try_from(service);
match service {
Ok(standard_service) => match standard_service {
PusServiceId::Test => {
self.pus_router
.test_service_receiver
.send((store_addr, accepted_token))?;
}
PusServiceId::Housekeeping => self
.pus_router
.hk_service_receiver
.send((store_addr, accepted_token))?,
PusServiceId::Event => self
.pus_router
.event_service_receiver
.send((store_addr, accepted_token))?,
PusServiceId::Scheduling => self
.pus_router
.sched_service_receiver
.send((store_addr, accepted_token))?,
_ => {
let result = self.verif_reporter.start_failure(
accepted_token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::PUS_SERVICE_NOT_IMPLEMENTED,
Some(&[standard_service as u8]),
),
);
if result.is_err() {
warn!("Sending verification failure failed");
}
}
},
Err(e) => {
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
match custom_service {
CustomPusServiceId::Mode => {
//self.handle_mode_service(pus_tc, accepted_token)
}
CustomPusServiceId::Health => {}
}
} else {
self.verif_reporter
.start_failure(
accepted_token,
FailParams::new(
Some(self.stamp_helper.stamp()),
&tmtc_err::INVALID_PUS_SUBSERVICE,
Some(&[e.number]),
),
)
.expect("Start failure verification failed")
}
}
}
Ok(PusPacketHandlerResult::RequestHandled)
}
}

View File

@ -0,0 +1,69 @@
use crate::tmtc::PusTcSource;
use log::{error, info, warn};
use satrs_core::pus::scheduler::TcInfo;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler};
pub struct Pus11Wrapper {
pub pus_11_handler: PusService11SchedHandler,
pub tc_source_wrapper: PusTcSource,
}
impl Pus11Wrapper {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, info: &TcInfo| -> bool {
if enabled {
self.tc_source_wrapper
.tc_source
.send(info.addr())
.expect("sending TC to TC source failed");
}
true
};
let mut pool = self
.tc_source_wrapper
.tc_store
.pool
.write()
.expect("error locking pool");
self.pus_11_handler
.scheduler_mut()
.update_time_from_now()
.unwrap();
if let Ok(released_tcs) = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(releaser, pool.as_mut())
{
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_next_packet() {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
}

View File

@ -0,0 +1,85 @@
use log::{info, warn};
use satrs_core::events::EventU32;
use satrs_core::params::Params;
use satrs_core::pus::test::PusService17TestHandler;
use satrs_core::pus::verification::FailParams;
use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler};
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::spacepackets::tc::PusTc;
use satrs_core::spacepackets::time::cds::TimeProvider;
use satrs_core::spacepackets::time::TimeWriter;
use satrs_example::{tmtc_err, TEST_EVENT};
use std::sync::mpsc::Sender;
pub struct Service17CustomWrapper {
pub pus17_handler: PusService17TestHandler,
pub test_srv_event_sender: Sender<(EventU32, Option<Params>)>,
}
impl Service17CustomWrapper {
pub fn handle_next_packet(&mut self) -> bool {
let res = self.pus17_handler.handle_next_packet();
if res.is_err() {
warn!("PUS17 handler failed with error {:?}", res.unwrap_err());
return true;
}
match res.unwrap() {
PusPacketHandlerResult::RequestHandled => {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
}
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => {
warn!(
"Handled PUS ping command with partial success: {:?}",
partial_err
);
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS17: Subservice {subservice} not implemented")
}
PusPacketHandlerResult::CustomSubservice(subservice, token) => {
let psb_mut = self.pus17_handler.psb_mut();
let buf = psb_mut.pus_buf;
let (tc, _) = PusTc::from_bytes(&buf).unwrap();
let time_stamper = TimeProvider::from_now_with_u16_days().unwrap();
let mut stamp_buf: [u8; 7] = [0; 7];
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
if subservice == 128 {
info!("Generating test event");
self.test_srv_event_sender
.send((TEST_EVENT.into(), None))
.expect("Sending test event failed");
let start_token = psb_mut
.verification_handler
.get_mut()
.start_success(token, Some(&stamp_buf))
.expect("Error sending start success");
psb_mut
.verification_handler
.get_mut()
.completion_success(start_token, Some(&stamp_buf))
.expect("Error sending completion success");
} else {
let fail_data = [tc.subservice()];
self.pus17_handler
.psb_mut()
.verification_handler
.get_mut()
.start_failure(
token,
FailParams::new(
Some(&stamp_buf),
&tmtc_err::INVALID_PUS_SUBSERVICE,
Some(&fail_data),
),
)
.expect("Sending start failure verification failed");
}
}
PusPacketHandlerResult::Empty => {
return true;
}
}
false
}
}

View File

@ -3,14 +3,23 @@ use satrs_core::mode::ModeRequest;
use satrs_core::pus::verification::{TcStateAccepted, VerificationToken};
use satrs_core::tmtc::TargetId;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[non_exhaustive]
pub enum Request {
HkRequest(HkRequest),
ModeRequest(ModeRequest),
#[allow(dead_code)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ActionRequest {
CmdWithU32Id((u32, Vec<u8>)),
CmdWithStringId((String, Vec<u8>)),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[allow(dead_code)]
#[derive(Clone, Eq, PartialEq, Debug)]
#[non_exhaustive]
pub enum Request {
Hk(HkRequest),
Mode(ModeRequest),
Action(ActionRequest),
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct TargetedRequest {
pub(crate) target_id: TargetId,
pub(crate) request: Request,
@ -22,7 +31,7 @@ impl TargetedRequest {
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct RequestWithToken {
pub(crate) targeted_request: TargetedRequest,
pub(crate) token: VerificationToken<TcStateAccepted>,

View File

@ -1,44 +1,26 @@
use log::info;
use satrs_core::events::EventU32;
use log::{info, warn};
use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer};
use satrs_core::params::Params;
use std::cell::RefCell;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::thread;
use std::time::Duration;
use thiserror::Error;
use crate::ccsds::CcsdsReceiver;
use crate::pus::{PusReceiver, PusTcArgs, PusTmArgs};
use crate::requests::RequestWithToken;
use crate::pus::{PusReceiver, PusTcMpscRouter};
use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
use satrs_core::pus::event_man::EventRequestWithToken;
use satrs_core::pus::scheduling::{PusScheduler, TcInfo};
use satrs_core::pus::verification::StdVerifReporterWithSender;
use satrs_core::seq_count::SeqCountProviderSyncClonable;
use satrs_core::spacepackets::ecss::SerializablePusPacket;
use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader};
use satrs_core::tmtc::{
CcsdsDistributor, CcsdsError, PusServiceProvider, ReceivesCcsdsTc, ReceivesEcssPusTc,
};
use satrs_core::pus::AcceptedTc;
use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket};
use satrs_core::spacepackets::tc::PusTc;
use satrs_core::spacepackets::SpHeader;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, ReceivesEcssPusTc};
pub const PUS_APID: u16 = 0x02;
pub struct OtherArgs {
pub sock_addr: SocketAddr,
pub verif_reporter: StdVerifReporterWithSender,
pub event_sender: Sender<(EventU32, Option<Params>)>,
pub event_request_tx: Sender<EventRequestWithToken>,
pub request_map: HashMap<u32, Sender<RequestWithToken>>,
pub seq_count_provider: SeqCountProviderSyncClonable,
}
pub struct TmArgs {
pub tm_store: TmStore,
pub tm_store: SharedTmStore,
pub tm_sink_sender: Sender<StoreAddr>,
pub tm_server_rx: Receiver<StoreAddr>,
}
@ -55,49 +37,14 @@ impl TcArgs {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum MpscStoreAndSendError {
StoreError(StoreError),
SendError(SendError<StoreAddr>),
}
impl Display for MpscStoreAndSendError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
MpscStoreAndSendError::StoreError(s) => {
write!(f, "store error {s}")
}
MpscStoreAndSendError::SendError(s) => {
write!(f, "send error {s}")
}
}
}
}
impl Error for MpscStoreAndSendError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
MpscStoreAndSendError::StoreError(s) => Some(s),
MpscStoreAndSendError::SendError(s) => Some(s),
}
}
}
impl From<StoreError> for MpscStoreAndSendError {
fn from(value: StoreError) -> Self {
Self::StoreError(value)
}
}
impl From<SendError<StoreAddr>> for MpscStoreAndSendError {
fn from(value: SendError<StoreAddr>) -> Self {
Self::SendError(value)
}
}
#[derive(Clone)]
pub struct TmStore {
pub pool: SharedPool,
#[error("Store error: {0}")]
Store(#[from] StoreError),
#[error("TC send error: {0}")]
TcSend(#[from] SendError<AcceptedTc>),
#[error("TMTC send error: {0}")]
TmTcSend(#[from] SendError<StoreAddr>),
}
#[derive(Clone)]
@ -105,17 +52,6 @@ pub struct TcStore {
pub pool: SharedPool,
}
impl TmStore {
pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr {
let mut pg = self.pool.write().expect("error locking TM store");
let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error");
pus_tm
.write_to_bytes(buf)
.expect("writing PUS TM to store failed");
addr
}
}
impl TcStore {
pub fn add_pus_tc(&mut self, pus_tc: &PusTc) -> Result<StoreAddr, StoreError> {
let mut pg = self.pool.write().expect("error locking TC store");
@ -166,26 +102,14 @@ impl ReceivesCcsdsTc for PusTcSource {
}
}
pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
let scheduler = Rc::new(RefCell::new(
PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(),
));
let sched_clone = scheduler.clone();
let pus_tm_args = PusTmArgs {
tm_tx: tm_args.tm_sink_sender,
tm_store: tm_args.tm_store.clone(),
verif_reporter: args.verif_reporter,
seq_count_provider: args.seq_count_provider.clone(),
};
let pus_tc_args = PusTcArgs {
event_request_tx: args.event_request_tx,
request_map: args.request_map,
tc_source: tc_args.tc_source.clone(),
event_sender: args.event_sender,
scheduler: sched_clone,
};
let mut pus_receiver = PusReceiver::new(PUS_APID, pus_tm_args, pus_tc_args);
pub fn core_tmtc_task(
socket_addr: SocketAddr,
mut tc_args: TcArgs,
tm_args: TmArgs,
verif_reporter: StdVerifReporterWithSender,
pus_router: PusTcMpscRouter,
) {
let mut pus_receiver = PusReceiver::new(verif_reporter, pus_router);
let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(),
@ -193,24 +117,22 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor))
.expect("Creating UDP TMTC server failed");
let udp_tc_server = UdpTcServer::new(socket_addr, 2048, Box::new(ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_rx: tm_args.tm_server_rx,
tm_store: tm_args.tm_store.pool.clone(),
tm_store: tm_args.tm_store.backing_pool(),
};
let mut tc_buf: [u8; 4096] = [0; 4096];
loop {
let tmtc_sched = scheduler.clone();
core_tmtc_loop(
&mut udp_tmtc_server,
&mut tc_args,
&mut tc_buf,
&mut pus_receiver,
tmtc_sched,
);
thread::sleep(Duration::from_millis(400));
}
@ -221,36 +143,7 @@ fn core_tmtc_loop(
tc_args: &mut TcArgs,
tc_buf: &mut [u8],
pus_receiver: &mut PusReceiver,
scheduler: Rc<RefCell<PusScheduler>>,
) {
let releaser = |enabled: bool, info: &TcInfo| -> bool {
if enabled {
tc_args
.tc_source
.tc_source
.send(info.addr())
.expect("sending TC to TC source failed");
}
true
};
let mut pool = tc_args
.tc_source
.tc_store
.pool
.write()
.expect("error locking pool");
let mut scheduler = scheduler.borrow_mut();
scheduler.update_time_from_now().unwrap();
if let Ok(released_tcs) = scheduler.release_telecommands(releaser, pool.as_mut()) {
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
drop(pool);
drop(scheduler);
while poll_tc_server(udp_tmtc_server) {}
match tc_args.tc_receiver.try_recv() {
Ok(addr) => {
@ -266,18 +159,18 @@ fn core_tmtc_loop(
match PusTc::from_bytes(tc_buf) {
Ok((pus_tc, _)) => {
pus_receiver
.handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc)
.handle_tc_packet(addr, pus_tc.service(), &pus_tc)
.ok();
}
Err(e) => {
println!("error creating PUS TC from raw data: {e}");
println!("raw data: {tc_buf:x?}");
warn!("error creating PUS TC from raw data: {e}");
warn!("raw data: {tc_buf:x?}");
}
}
}
Err(e) => {
if let TryRecvError::Disconnected = e {
println!("tmtc thread: sender disconnected")
warn!("tmtc thread: sender disconnected")
}
}
}
@ -292,16 +185,16 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool {
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
println!("Got packet error: {e:?}");
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(_) => {
println!("Unknown receiver error");
CcsdsError::CustomError(e) => {
warn!("mpsc store and send error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
println!("IO error {e}");
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
@ -311,12 +204,19 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool {
fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) {
while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() {
let mut store_lock = udp_tmtc_server
.tm_store
.write()
.expect("locking TM store failed");
let store_lock = udp_tmtc_server.tm_store.write();
if store_lock.is_err() {
warn!("Locking TM store failed");
continue;
}
let mut store_lock = store_lock.unwrap();
let pg = store_lock.read_with_guard(addr);
let buf = pg.read().expect("error reading TM pool data");
let read_res = pg.read();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
if buf.len() > 9 {
let service = buf[7];
let subservice = buf[8];
@ -324,10 +224,9 @@ fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr)
} else {
info!("Sending PUS TM");
}
udp_tmtc_server
.udp_tc_server
.socket
.send_to(buf, recv_addr)
.expect("sending TM failed");
let result = udp_tmtc_server.udp_tc_server.socket.send_to(buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
}

View File

@ -20,7 +20,7 @@ pub fn resultcode(
let item = parse_macro_input!(item as ItemConst);
// Generate additional generated info struct used for introspection.
let result_code_name = item.ident.clone();
let result_code_name = &item.ident;
let name_as_str = result_code_name.to_string();
let gen_struct_name = format_ident!("{}_EXT", result_code_name);
let info_str = info_str.map_or(String::from(""), |v| v.value());