that was insane

This commit is contained in:
Robin Müller 2023-07-10 00:29:31 +02:00
parent 92147a00b3
commit fffd1af81f
Signed by: muellerr
GPG Key ID: A649FB78196E3849
18 changed files with 324 additions and 255 deletions

View File

@ -64,7 +64,7 @@ optional = true
# version = "0.6" # version = "0.6"
# path = "../spacepackets" # path = "../spacepackets"
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
rev = "e3d2d885385" rev = "784564a20ed"
default-features = false default-features = false
[dev-dependencies] [dev-dependencies]

View File

@ -66,7 +66,7 @@ use core::slice::Iter;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
use hashbrown::HashMap; use hashbrown::HashMap;
use crate::SenderId; use crate::ChannelId;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use stdmod::*; pub use stdmod::*;
@ -88,7 +88,7 @@ pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> { pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> {
type Error; type Error;
fn id(&self) -> SenderId; fn id(&self) -> ChannelId;
fn send_no_data(&self, event: Provider) -> Result<(), Self::Error> { fn send_no_data(&self, event: Provider) -> Result<(), Self::Error> {
self.send(event, None) self.send(event, None)
} }
@ -109,16 +109,16 @@ pub trait EventReceiver<Event: GenericEvent, AuxDataProvider = Params> {
pub trait ListenerTable { pub trait ListenerTable {
fn get_listeners(&self) -> Vec<ListenerKey>; fn get_listeners(&self) -> Vec<ListenerKey>;
fn contains_listener(&self, key: &ListenerKey) -> bool; fn contains_listener(&self, key: &ListenerKey) -> bool;
fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<SenderId>>; fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<ChannelId>>;
fn add_listener(&mut self, key: ListenerKey, sender_id: SenderId) -> bool; fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool;
fn remove_duplicates(&mut self, key: &ListenerKey); fn remove_duplicates(&mut self, key: &ListenerKey);
} }
pub trait SenderTable<SendProviderError, Event: GenericEvent = EventU32, AuxDataProvider = Params> { pub trait SenderTable<SendProviderError, Event: GenericEvent = EventU32, AuxDataProvider = Params> {
fn contains_send_event_provider(&self, id: &SenderId) -> bool; fn contains_send_event_provider(&self, id: &ChannelId) -> bool;
fn get_send_event_provider( fn get_send_event_provider(
&mut self, &mut self,
id: &SenderId, id: &ChannelId,
) -> Option<&mut Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>>; ) -> Option<&mut Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>>;
fn add_send_event_provider( fn add_send_event_provider(
&mut self, &mut self,
@ -171,7 +171,7 @@ pub enum EventRoutingResult<Event: GenericEvent, AuxDataProvider> {
pub enum EventRoutingError<E> { pub enum EventRoutingError<E> {
SendError(E), SendError(E),
NoSendersForKey(ListenerKey), NoSendersForKey(ListenerKey),
NoSenderForId(SenderId), NoSenderForId(ChannelId),
} }
#[derive(Debug)] #[derive(Debug)]
@ -186,12 +186,12 @@ impl<E, Event: GenericEvent + Copy> EventManager<E, Event> {
} }
/// Subscribe for a unique event. /// Subscribe for a unique event.
pub fn subscribe_single(&mut self, event: &Event, sender_id: SenderId) { pub fn subscribe_single(&mut self, event: &Event, sender_id: ChannelId) {
self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id); self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id);
} }
/// Subscribe for an event group. /// Subscribe for an event group.
pub fn subscribe_group(&mut self, group_id: LargestGroupIdRaw, sender_id: SenderId) { pub fn subscribe_group(&mut self, group_id: LargestGroupIdRaw, sender_id: ChannelId) {
self.update_listeners(ListenerKey::Group(group_id), sender_id); self.update_listeners(ListenerKey::Group(group_id), sender_id);
} }
@ -199,7 +199,7 @@ impl<E, Event: GenericEvent + Copy> EventManager<E, Event> {
/// ///
/// For example, this can be useful for a handler component which sends every event as /// For example, this can be useful for a handler component which sends every event as
/// a telemetry packet. /// a telemetry packet.
pub fn subscribe_all(&mut self, sender_id: SenderId) { pub fn subscribe_all(&mut self, sender_id: ChannelId) {
self.update_listeners(ListenerKey::All, sender_id); self.update_listeners(ListenerKey::All, sender_id);
} }
} }
@ -245,7 +245,7 @@ impl<E, Event: GenericEvent + Copy, AuxDataProvider: Clone>
} }
} }
fn update_listeners(&mut self, key: ListenerKey, sender_id: SenderId) { fn update_listeners(&mut self, key: ListenerKey, sender_id: ChannelId) {
self.listener_table.add_listener(key, sender_id); self.listener_table.add_listener(key, sender_id);
} }
@ -311,7 +311,7 @@ impl<E, Event: GenericEvent + Copy, AuxDataProvider: Clone>
#[derive(Default)] #[derive(Default)]
pub struct DefaultListenerTableProvider { pub struct DefaultListenerTableProvider {
listeners: HashMap<ListenerKey, Vec<SenderId>>, listeners: HashMap<ListenerKey, Vec<ChannelId>>,
} }
pub struct DefaultSenderTableProvider< pub struct DefaultSenderTableProvider<
@ -320,7 +320,7 @@ pub struct DefaultSenderTableProvider<
AuxDataProvider = Params, AuxDataProvider = Params,
> { > {
senders: HashMap< senders: HashMap<
SenderId, ChannelId,
Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>, Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>,
>, >,
} }
@ -348,11 +348,11 @@ impl ListenerTable for DefaultListenerTableProvider {
self.listeners.contains_key(key) self.listeners.contains_key(key)
} }
fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<SenderId>> { fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<ChannelId>> {
self.listeners.get(key).map(|vec| vec.iter()) self.listeners.get(key).map(|vec| vec.iter())
} }
fn add_listener(&mut self, key: ListenerKey, sender_id: SenderId) -> bool { fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool {
if let Some(existing_list) = self.listeners.get_mut(&key) { if let Some(existing_list) = self.listeners.get_mut(&key) {
existing_list.push(sender_id); existing_list.push(sender_id);
} else { } else {
@ -374,13 +374,13 @@ impl<SendProviderError, Event: GenericEvent, AuxDataProvider>
SenderTable<SendProviderError, Event, AuxDataProvider> SenderTable<SendProviderError, Event, AuxDataProvider>
for DefaultSenderTableProvider<SendProviderError, Event, AuxDataProvider> for DefaultSenderTableProvider<SendProviderError, Event, AuxDataProvider>
{ {
fn contains_send_event_provider(&self, id: &SenderId) -> bool { fn contains_send_event_provider(&self, id: &ChannelId) -> bool {
self.senders.contains_key(id) self.senders.contains_key(id)
} }
fn get_send_event_provider( fn get_send_event_provider(
&mut self, &mut self,
id: &SenderId, id: &ChannelId,
) -> Option<&mut Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>> ) -> Option<&mut Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>>
{ {
self.senders.get_mut(id).filter(|sender| sender.id() == *id) self.senders.get_mut(id).filter(|sender| sender.id() == *id)
@ -486,7 +486,7 @@ mod tests {
fn id(&self) -> u32 { fn id(&self) -> u32 {
self.id self.id
} }
fn send(&mut self, event: EventU32, aux_data: Option<Params>) -> Result<(), Self::Error> { fn send(&self, event: EventU32, aux_data: Option<Params>) -> Result<(), Self::Error> {
self.mpsc_sender.send((event, aux_data)) self.mpsc_sender.send((event, aux_data))
} }
} }

View File

@ -45,5 +45,5 @@ pub mod tmtc;
pub use spacepackets; pub use spacepackets;
// Generic sender ID type. // Generic channel ID type.
pub type SenderId = u32; pub type ChannelId = u32;

View File

@ -244,7 +244,7 @@ mod tests {
use crate::events::{EventU32, Severity}; use crate::events::{EventU32, Severity};
use crate::pus::tests::CommonTmInfo; use crate::pus::tests::CommonTmInfo;
use crate::pus::{EcssChannel, PusTmWrapper}; use crate::pus::{EcssChannel, PusTmWrapper};
use crate::SenderId; use crate::ChannelId;
use spacepackets::ByteConversionError; use spacepackets::ByteConversionError;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -269,7 +269,7 @@ mod tests {
} }
impl EcssChannel for TestSender { impl EcssChannel for TestSender {
fn id(&self) -> SenderId { fn id(&self) -> ChannelId {
0 0
} }
} }
@ -425,9 +425,9 @@ mod tests {
let err = reporter.event_info(sender, &time_stamp_empty, event, None); let err = reporter.event_info(sender, &time_stamp_empty, event, None);
assert!(err.is_err()); assert!(err.is_err());
let err = err.unwrap_err(); let err = err.unwrap_err();
if let EcssTmtcError::EcssTmtcError(EcssTmtcError::ByteConversion( if let EcssTmtcError::Pus(PusError::ByteConversion(ByteConversionError::ToSliceTooSmall(
ByteConversionError::ToSliceTooSmall(missmatch), missmatch,
)) = err ))) = err
{ {
assert_eq!(missmatch.expected, 4); assert_eq!(missmatch.expected, 4);
assert_eq!(missmatch.found, expected_found_len); assert_eq!(missmatch.found, expected_found_len);

View File

@ -223,7 +223,7 @@ pub mod alloc_mod {
self.backend.disable_event_reporting(event.as_ref()) self.backend.disable_event_reporting(event.as_ref())
} }
pub fn generate_pus_event_tm<E, Severity: HasSeverity>( pub fn generate_pus_event_tm<Severity: HasSeverity>(
&mut self, &mut self,
sender: &mut (impl EcssTmSenderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized),
time_stamp: &[u8], time_stamp: &[u8],

View File

@ -1,18 +1,18 @@
use crate::events::EventU32; use crate::events::EventU32;
use crate::pool::{PoolGuard, SharedPool, StoreAddr}; use crate::pool::{SharedPool, StoreAddr};
use crate::pus::event_man::{EventRequest, EventRequestWithToken}; use crate::pus::event_man::{EventRequest, EventRequestWithToken};
use crate::pus::verification::{ use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken,
}; };
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult,
PusPacketHandlingError, PusServiceBase, PusServiceHandler, ReceivedTcWrapper, PusPacketHandlingError, PusServiceBase, PusServiceHandler,
}; };
use spacepackets::ecss::event::Subservice; use spacepackets::ecss::event::Subservice;
use spacepackets::ecss::PusPacket; use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc; use spacepackets::tc::PusTc;
use std::boxed::Box; use std::boxed::Box;
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::Sender;
pub struct PusService5EventHandler { pub struct PusService5EventHandler {
psb: PusServiceBase, psb: PusServiceBase,
@ -22,13 +22,20 @@ pub struct PusService5EventHandler {
impl PusService5EventHandler { impl PusService5EventHandler {
pub fn new( pub fn new(
tc_receiver: Box<dyn EcssTcReceiver>, tc_receiver: Box<dyn EcssTcReceiver>,
shared_tc_store: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
event_request_tx: Sender<EventRequestWithToken>, event_request_tx: Sender<EventRequestWithToken>,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(
tc_receiver,
shared_tc_store,
tm_sender,
tm_apid,
verification_handler,
),
event_request_tx, event_request_tx,
} }
} }
@ -44,16 +51,17 @@ impl PusServiceHandler for PusService5EventHandler {
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
tc: PusTc, addr: StoreAddr,
_tc_guard: PoolGuard,
token: VerificationToken<TcStateAccepted>, token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?;
let subservice = tc.subservice(); let subservice = tc.subservice();
let srv = Subservice::try_from(subservice); let srv = Subservice::try_from(subservice);
if srv.is_err() { if srv.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
token.into(), token,
)); ));
} }
let handle_enable_disable_request = |enable: bool, stamp: [u8; 7]| { let handle_enable_disable_request = |enable: bool, stamp: [u8; 7]| {
@ -115,8 +123,7 @@ impl PusServiceHandler for PusService5EventHandler {
} }
Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => { Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => {
return Ok(PusPacketHandlerResult::SubserviceNotImplemented( return Ok(PusPacketHandlerResult::SubserviceNotImplemented(
subservice, subservice, token,
token.into(),
)); ));
} }
} }

View File

@ -2,7 +2,7 @@
//! //!
//! This module contains structures to make working with the PUS C standard easier. //! This module contains structures to make working with the PUS C standard easier.
//! The satrs-example application contains various usage examples of these components. //! The satrs-example application contains various usage examples of these components.
use crate::SenderId; use crate::ChannelId;
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
use downcast_rs::{impl_downcast, Downcast}; use downcast_rs::{impl_downcast, Downcast};
@ -28,7 +28,7 @@ pub mod verification;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
use crate::pool::{PoolGuard, PoolRwGuard, StoreAddr, StoreError}; use crate::pool::{StoreAddr, StoreError};
use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken}; use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken};
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use std_mod::*; pub use std_mod::*;
@ -175,7 +175,7 @@ impl Error for EcssTmtcError {
} }
pub trait EcssChannel: Send { pub trait EcssChannel: Send {
/// Each sender can have an ID associated with it /// Each sender can have an ID associated with it
fn id(&self) -> SenderId; fn id(&self) -> ChannelId;
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
"unset" "unset"
} }
@ -196,15 +196,38 @@ pub trait EcssTcSenderCore: EcssChannel {
fn send_tc(&self, tc: PusTc, token: Option<TcStateToken>) -> Result<(), EcssTmtcError>; fn send_tc(&self, tc: PusTc, token: Option<TcStateToken>) -> Result<(), EcssTmtcError>;
} }
pub struct ReceivedTcWrapper<'raw_tc> { pub struct ReceivedTcWrapper {
pub pool_guard: PoolGuard<'raw_tc>, pub store_addr: StoreAddr,
pub tc: PusTc<'raw_tc>,
pub token: Option<TcStateToken>, pub token: Option<TcStateToken>,
} }
#[derive(Debug, Clone)]
pub enum TryRecvTmtcError {
Error(EcssTmtcError),
Empty,
}
impl From<EcssTmtcError> for TryRecvTmtcError {
fn from(value: EcssTmtcError) -> Self {
Self::Error(value)
}
}
impl From<PusError> for TryRecvTmtcError {
fn from(value: PusError) -> Self {
Self::Error(value.into())
}
}
impl From<StoreError> for TryRecvTmtcError {
fn from(value: StoreError) -> Self {
Self::Error(value.into())
}
}
/// Generic trait for a user supplied receiver object. /// Generic trait for a user supplied receiver object.
pub trait EcssTcReceiverCore: EcssChannel { pub trait EcssTcReceiverCore: EcssChannel {
fn recv_tc<'buf>(&self, buf: &'buf mut [u8]) -> Result<ReceivedTcWrapper<'buf>, EcssTmtcError>; fn recv_tc(&self) -> Result<ReceivedTcWrapper, TryRecvTmtcError>;
} }
/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is /// Generic trait for objects which can receive ECSS PUS telecommands. This trait is
@ -271,37 +294,34 @@ mod alloc_mod {
/// [Clone]. /// [Clone].
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast + DynClone {} pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast {}
/// Blanket implementation for all types which implement [EcssTcReceiverCore] and are clonable. /// Blanket implementation for all types which implement [EcssTcReceiverCore] and are clonable.
impl<T> EcssTcReceiver for T where T: EcssTcReceiverCore + Clone + 'static {} impl<T> EcssTcReceiver for T where T: EcssTcReceiverCore + 'static {}
dyn_clone::clone_trait_object!(EcssTcReceiver);
impl_downcast!(EcssTcReceiver); impl_downcast!(EcssTcReceiver);
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod std_mod { pub mod std_mod {
use crate::pool::{PoolGuard, SharedPool, StoreAddr}; use crate::pool::{SharedPool, StoreAddr};
use crate::pus::verification::{ use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
}; };
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, EcssTmSenderCore,
EcssTmSenderCore, EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, ReceivedTcWrapper,
ReceivedTcWrapper, TcAddrWithToken, TcAddrWithToken, TryRecvTmtcError,
}; };
use crate::tmtc::tm_helper::SharedTmStore; use crate::tmtc::tm_helper::SharedTmStore;
use crate::SenderId; use crate::ChannelId;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::vec::Vec; use alloc::vec::Vec;
use spacepackets::ecss::PusError; use spacepackets::ecss::PusError;
use spacepackets::tc::PusTc;
use spacepackets::time::cds::TimeProvider; use spacepackets::time::cds::TimeProvider;
use spacepackets::time::std_mod::StdTimestampError; use spacepackets::time::StdTimestampError;
use spacepackets::time::TimeWriter; use spacepackets::time::TimeWriter;
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use spacepackets::{ByteConversionError, SizeMissmatch};
use std::cell::RefCell; use std::cell::RefCell;
use std::string::String; use std::string::String;
use std::sync::mpsc; use std::sync::mpsc;
@ -310,7 +330,7 @@ pub mod std_mod {
#[derive(Clone)] #[derive(Clone)]
pub struct MpscTmInStoreSender { pub struct MpscTmInStoreSender {
id: SenderId, id: ChannelId,
name: &'static str, name: &'static str,
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
sender: mpsc::Sender<StoreAddr>, sender: mpsc::Sender<StoreAddr>,
@ -318,7 +338,7 @@ pub mod std_mod {
} }
impl EcssChannel for MpscTmInStoreSender { impl EcssChannel for MpscTmInStoreSender {
fn id(&self) -> SenderId { fn id(&self) -> ChannelId {
self.id self.id
} }
@ -352,7 +372,7 @@ pub mod std_mod {
impl MpscTmInStoreSender { impl MpscTmInStoreSender {
pub fn new( pub fn new(
id: SenderId, id: ChannelId,
name: &'static str, name: &'static str,
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
sender: mpsc::Sender<StoreAddr>, sender: mpsc::Sender<StoreAddr>,
@ -368,15 +388,14 @@ pub mod std_mod {
} }
pub struct MpscTcInStoreReceiver { pub struct MpscTcInStoreReceiver {
id: SenderId, id: ChannelId,
name: &'static str, name: &'static str,
shared_tc_pool: SharedPool,
receiver: mpsc::Receiver<TcAddrWithToken>, receiver: mpsc::Receiver<TcAddrWithToken>,
pub ignore_poison_errors: bool, pub ignore_poison_errors: bool,
} }
impl EcssChannel for MpscTcInStoreReceiver { impl EcssChannel for MpscTcInStoreReceiver {
fn id(&self) -> SenderId { fn id(&self) -> ChannelId {
self.id self.id
} }
@ -386,36 +405,32 @@ pub mod std_mod {
} }
impl EcssTcReceiverCore for MpscTcInStoreReceiver { impl EcssTcReceiverCore for MpscTcInStoreReceiver {
fn recv_tc<'buf>( fn recv_tc(&self) -> Result<ReceivedTcWrapper, TryRecvTmtcError> {
&self, let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e {
buf: &'buf mut [u8], TryRecvError::Empty => TryRecvTmtcError::Empty,
) -> Result<ReceivedTcWrapper<'buf>, EcssTmtcError> { TryRecvError::Disconnected => {
let (addr, token) = self.receiver.try_recv().map_err(|e| match e { TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected))
TryRecvError::Empty => GenericRecvError::Empty, }
TryRecvError::Disconnected => GenericRecvError::TxDisconnected,
})?; })?;
let mut shared_tc_pool = self Ok(ReceivedTcWrapper {
.shared_tc_pool store_addr,
.read()
.map_err(|_| EcssTmtcError::StoreLock)?;
let pool_guard = shared_tc_pool.read_with_guard(addr);
let tc_raw = pool_guard.read()?;
if buf.len() < tc_raw.len() {
return Err(
PusError::ByteConversion(ByteConversionError::ToSliceTooSmall(SizeMissmatch {
found: buf.len(),
expected: tc_raw.len(),
}))
.into(),
);
}
buf[..tc_raw.len()].copy_from_slice(tc_raw);
let (tc, _) = PusTc::from_bytes(buf)?;
Ok((ReceivedTcWrapper {
tc,
pool_guard,
token: Some(token), token: Some(token),
})) })
}
}
impl MpscTcInStoreReceiver {
pub fn new(
id: ChannelId,
name: &'static str,
receiver: mpsc::Receiver<TcAddrWithToken>,
) -> Self {
Self {
id,
name,
receiver,
ignore_poison_errors: false,
}
} }
} }
@ -425,7 +440,7 @@ pub mod std_mod {
/// going to be called with direct packets. /// going to be called with direct packets.
#[derive(Clone)] #[derive(Clone)]
pub struct MpscTmAsVecSender { pub struct MpscTmAsVecSender {
id: SenderId, id: ChannelId,
sender: mpsc::Sender<Vec<u8>>, sender: mpsc::Sender<Vec<u8>>,
name: &'static str, name: &'static str,
} }
@ -443,7 +458,7 @@ pub mod std_mod {
} }
impl EcssChannel for MpscTmAsVecSender { impl EcssChannel for MpscTmAsVecSender {
fn id(&self) -> SenderId { fn id(&self) -> ChannelId {
self.id self.id
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
@ -502,8 +517,8 @@ pub mod std_mod {
pub enum PusPacketHandlerResult { pub enum PusPacketHandlerResult {
RequestHandled, RequestHandled,
RequestHandledPartialSuccess(PartialPusHandlingError), RequestHandledPartialSuccess(PartialPusHandlingError),
SubserviceNotImplemented(u8, TcStateToken), SubserviceNotImplemented(u8, VerificationToken<TcStateAccepted>),
CustomSubservice(u8, TcStateToken), CustomSubservice(u8, VerificationToken<TcStateAccepted>),
Empty, Empty,
} }
@ -518,6 +533,7 @@ pub mod std_mod {
/// is constrained to the [StdVerifReporterWithSender]. /// is constrained to the [StdVerifReporterWithSender].
pub struct PusServiceBase { pub struct PusServiceBase {
pub tc_receiver: Box<dyn EcssTcReceiver>, pub tc_receiver: Box<dyn EcssTcReceiver>,
pub shared_tc_store: SharedPool,
pub tm_sender: Box<dyn EcssTmSender>, pub tm_sender: Box<dyn EcssTmSender>,
pub tm_apid: u16, pub tm_apid: u16,
/// The verification handler is wrapped in a [RefCell] to allow the interior mutability /// The verification handler is wrapped in a [RefCell] to allow the interior mutability
@ -530,12 +546,14 @@ pub mod std_mod {
impl PusServiceBase { impl PusServiceBase {
pub fn new( pub fn new(
tc_receiver: Box<dyn EcssTcReceiver>, tc_receiver: Box<dyn EcssTcReceiver>,
shared_tc_store: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
) -> Self { ) -> Self {
Self { Self {
tc_receiver, tc_receiver,
shared_tc_store,
tm_apid, tm_apid,
tm_sender, tm_sender,
verification_handler: RefCell::new(verification_handler), verification_handler: RefCell::new(verification_handler),
@ -571,31 +589,37 @@ pub mod std_mod {
fn psb(&self) -> &PusServiceBase; fn psb(&self) -> &PusServiceBase;
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
tc: PusTc, addr: StoreAddr,
tc_guard: PoolGuard,
token: VerificationToken<TcStateAccepted>, token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError>; ) -> Result<PusPacketHandlerResult, PusPacketHandlingError>;
fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> {
// Keep locked section as short as possible.
let psb_mut = self.psb_mut();
let mut tc_pool = psb_mut
.shared_tc_store
.write()
.map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?;
let tc_guard = tc_pool.read_with_guard(addr);
let tc_raw = tc_guard.read().unwrap();
psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw);
Ok(())
}
fn handle_next_packet(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { fn handle_next_packet(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
match self.psb().tc_receiver.recv_tc(&mut self.psb_mut().pus_buf) { match self.psb().tc_receiver.recv_tc() {
Ok(ReceivedTcWrapper { Ok(ReceivedTcWrapper { store_addr, token }) => {
tc,
pool_guard,
token,
}) => {
if token.is_none() { if token.is_none() {
return Err(PusPacketHandlingError::InvalidVerificationToken); return Err(PusPacketHandlingError::InvalidVerificationToken);
} }
let token = token.unwrap(); let token = token.unwrap();
self.handle_one_tc(tc, pool_guard, token.try_into().unwrap()) let accepted_token = VerificationToken::<TcStateAccepted>::try_from(token)
.map_err(|_| PusPacketHandlingError::InvalidVerificationToken)?;
self.handle_one_tc(store_addr, accepted_token)
} }
Err(e) => match e { Err(e) => match e {
EcssTmtcError::StoreLock => {} TryRecvTmtcError::Error(e) => Err(PusPacketHandlingError::EcssTmtc(e)),
EcssTmtcError::Store(_) => {} TryRecvTmtcError::Empty => Ok(PusPacketHandlerResult::Empty),
EcssTmtcError::Pus(_) => {}
EcssTmtcError::CantSendAddr(_) => {}
EcssTmtcError::Send(_) => {}
EcssTmtcError::Recv(_) => {}
}, },
} }
} }

View File

@ -1,15 +1,14 @@
use crate::pool::{PoolGuard, SharedPool, StoreAddr}; use crate::pool::{SharedPool, StoreAddr};
use crate::pus::scheduler::PusScheduler; use crate::pus::scheduler::PusScheduler;
use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken};
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase,
PusServiceBase, PusServiceHandler, ReceivedTcWrapper, PusServiceHandler,
}; };
use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::ecss::{scheduling, PusPacket};
use spacepackets::tc::PusTc; use spacepackets::tc::PusTc;
use spacepackets::time::cds::TimeProvider; use spacepackets::time::cds::TimeProvider;
use std::boxed::Box; use std::boxed::Box;
use std::sync::mpsc::Receiver;
/// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service) /// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service)
/// packets. This handler is constrained to using the [PusScheduler], but is able to process /// packets. This handler is constrained to using the [PusScheduler], but is able to process
@ -21,22 +20,26 @@ use std::sync::mpsc::Receiver;
/// telecommands when applicable. /// telecommands when applicable.
pub struct PusService11SchedHandler { pub struct PusService11SchedHandler {
psb: PusServiceBase, psb: PusServiceBase,
shared_tc_store: SharedPool,
scheduler: PusScheduler, scheduler: PusScheduler,
} }
impl PusService11SchedHandler { impl PusService11SchedHandler {
pub fn new( pub fn new(
tc_receiver: Box<dyn EcssTcReceiver>, tc_receiver: Box<dyn EcssTcReceiver>,
shared_tc_store: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
shared_tc_store: SharedPool,
scheduler: PusScheduler, scheduler: PusScheduler,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(
shared_tc_store, tc_receiver,
shared_tc_store,
tm_sender,
tm_apid,
verification_handler,
),
scheduler, scheduler,
} }
} }
@ -60,15 +63,17 @@ impl PusServiceHandler for PusService11SchedHandler {
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
tc: PusTc, addr: StoreAddr,
tc_guard: PoolGuard,
token: VerificationToken<TcStateAccepted>, token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
let std_service = scheduling::Subservice::try_from(tc.subservice()); self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?;
let subservice = tc.subservice();
let std_service = scheduling::Subservice::try_from(subservice);
if std_service.is_err() { if std_service.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
token.into(), token,
)); ));
} }
let mut partial_error = None; let mut partial_error = None;
@ -120,7 +125,11 @@ impl PusServiceHandler for PusService11SchedHandler {
.start_success(token, Some(&time_stamp)) .start_success(token, Some(&time_stamp))
.expect("Error sending start success"); .expect("Error sending start success");
let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); let mut pool = self
.psb
.shared_tc_store
.write()
.expect("Locking pool failed");
self.scheduler self.scheduler
.reset(pool.as_mut()) .reset(pool.as_mut())
@ -140,7 +149,11 @@ impl PusServiceHandler for PusService11SchedHandler {
.start_success(token, Some(&time_stamp)) .start_success(token, Some(&time_stamp))
.expect("error sending start success"); .expect("error sending start success");
let mut pool = self.shared_tc_store.write().expect("locking pool failed"); let mut pool = self
.psb
.shared_tc_store
.write()
.expect("locking pool failed");
self.scheduler self.scheduler
.insert_wrapped_tc::<TimeProvider>(&tc, pool.as_mut()) .insert_wrapped_tc::<TimeProvider>(&tc, pool.as_mut())
.expect("insertion of activity into pool failed"); .expect("insertion of activity into pool failed");
@ -154,7 +167,7 @@ impl PusServiceHandler for PusService11SchedHandler {
_ => { _ => {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
token.into(), token,
)); ));
} }
} }
@ -165,7 +178,7 @@ impl PusServiceHandler for PusService11SchedHandler {
} }
Ok(PusPacketHandlerResult::CustomSubservice( Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
token.into(), token,
)) ))
} }
} }

View File

@ -1,15 +1,14 @@
use crate::pool::{PoolGuard, SharedPool, StoreAddr}; use crate::pool::{SharedPool, StoreAddr};
use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken};
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult,
PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, ReceivedTcWrapper, PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper,
}; };
use spacepackets::ecss::PusPacket; use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc; use spacepackets::tc::PusTc;
use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::tm::{PusTm, PusTmSecondaryHeader};
use spacepackets::SpHeader; use spacepackets::SpHeader;
use std::boxed::Box; use std::boxed::Box;
use std::sync::mpsc::Receiver;
/// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets.
/// This handler only processes ping requests and generates a ping reply for them accordingly. /// This handler only processes ping requests and generates a ping reply for them accordingly.
@ -20,12 +19,19 @@ pub struct PusService17TestHandler {
impl PusService17TestHandler { impl PusService17TestHandler {
pub fn new( pub fn new(
tc_receiver: Box<dyn EcssTcReceiver>, tc_receiver: Box<dyn EcssTcReceiver>,
shared_tc_store: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(
tc_receiver,
shared_tc_store,
tm_sender,
tm_apid,
verification_handler,
),
} }
} }
} }
@ -40,10 +46,11 @@ impl PusServiceHandler for PusService17TestHandler {
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
tc: PusTc, addr: StoreAddr,
_tc_guard: PoolGuard,
token: VerificationToken<TcStateAccepted>, token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?;
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?;
if tc.service() != 17 { if tc.service() != 17 {
return Err(PusPacketHandlingError::WrongService(tc.service())); return Err(PusPacketHandlingError::WrongService(tc.service()));
} }
@ -95,7 +102,7 @@ impl PusServiceHandler for PusService17TestHandler {
} }
Ok(PusPacketHandlerResult::CustomSubservice( Ok(PusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
token.into(), token,
)) ))
} }
} }
@ -107,7 +114,7 @@ mod tests {
use crate::pus::verification::{ use crate::pus::verification::{
RequestId, StdVerifReporterWithSender, VerificationReporterCfg, RequestId, StdVerifReporterWithSender, VerificationReporterCfg,
}; };
use crate::pus::{MpscTmInStoreSender, PusServiceHandler}; use crate::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender, PusServiceHandler};
use crate::tmtc::tm_helper::SharedTmStore; use crate::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::{PusPacket, SerializablePusPacket}; use spacepackets::ecss::{PusPacket, SerializablePusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
@ -126,24 +133,21 @@ mod tests {
let tc_pool = LocalPool::new(pool_cfg.clone()); let tc_pool = LocalPool::new(pool_cfg.clone());
let tm_pool = LocalPool::new(pool_cfg); let tm_pool = LocalPool::new(pool_cfg);
let tc_pool_shared = SharedPool::new(RwLock::new(Box::new(tc_pool))); let tc_pool_shared = SharedPool::new(RwLock::new(Box::new(tc_pool)));
let tm_pool_shared = SharedPool::new(RwLock::new(Box::new(tm_pool))); let shared_tm_store = SharedTmStore::new(Box::new(tm_pool));
let shared_tm_store = SharedTmStore::new(tm_pool_shared.clone()); let tm_pool_shared = shared_tm_store.clone_backing_pool();
let (test_srv_tx, test_srv_rx) = mpsc::channel(); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel();
let verif_sender = MpscTmInStoreSender::new( let verif_sender =
0, MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tm_tx.clone());
"verif_sender",
shared_tm_store.backing_pool(),
tm_tx.clone(),
);
let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let mut verification_handler = let mut verification_handler =
StdVerifReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); StdVerifReporterWithSender::new(&verif_cfg, Box::new(verif_sender));
let test_srv_tm_sender = MpscTmInStoreSender::new(0, "TEST_SENDER", shared_tm_store, tm_tx);
let test_srv_tc_receiver = MpscTcInStoreReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx);
let mut pus_17_handler = PusService17TestHandler::new( let mut pus_17_handler = PusService17TestHandler::new(
test_srv_rx, Box::new(test_srv_tc_receiver),
tc_pool_shared.clone(), tc_pool_shared.clone(),
tm_tx, Box::new(test_srv_tm_sender),
shared_tm_store,
TEST_APID, TEST_APID,
verification_handler.clone(), verification_handler.clone(),
); );
@ -160,7 +164,7 @@ mod tests {
let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap(); let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap();
drop(tc_pool); drop(tc_pool);
// Send accepted TC to test service handler. // Send accepted TC to test service handler.
test_srv_tx.send((addr, token)).unwrap(); test_srv_tc_tx.send((addr, token.into())).unwrap();
let result = pus_17_handler.handle_next_packet(); let result = pus_17_handler.handle_next_packet();
assert!(result.is_ok()); assert!(result.is_ok());
// We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and // We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and

View File

@ -19,6 +19,7 @@
//! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
//! use satrs_core::seq_count::SeqCountProviderSimple; //! use satrs_core::seq_count::SeqCountProviderSimple;
//! use satrs_core::pus::MpscTmInStoreSender; //! use satrs_core::pus::MpscTmInStoreSender;
//! use satrs_core::tmtc::tm_helper::SharedTmStore;
//! use spacepackets::ecss::PusPacket; //! use spacepackets::ecss::PusPacket;
//! use spacepackets::SpHeader; //! use spacepackets::SpHeader;
//! use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; //! use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
@ -28,9 +29,11 @@
//! const TEST_APID: u16 = 0x02; //! const TEST_APID: u16 = 0x02;
//! //!
//! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let tm_pool = LocalPool::new(pool_cfg.clone());
//! let shared_tm_store = SharedTmStore::new(Box::new(tm_pool));
//! let tm_store = shared_tm_store.clone_backing_pool();
//! let (verif_tx, verif_rx) = mpsc::channel(); //! let (verif_tx, verif_rx) = mpsc::channel();
//! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_pool.clone(), verif_tx); //! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_store, verif_tx);
//! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
//! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender));
//! //!
@ -51,7 +54,7 @@
//! let addr = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap(); //! let addr = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap();
//! let tm_len; //! let tm_len;
//! { //! {
//! let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); //! let mut rg = tm_store.write().expect("Error locking shared pool");
//! let store_guard = rg.read_with_guard(addr); //! let store_guard = rg.read_with_guard(addr);
//! let slice = store_guard.read().expect("Error reading TM slice"); //! let slice = store_guard.read().expect("Error reading TM slice");
//! tm_len = slice.len(); //! tm_len = slice.len();
@ -1491,25 +1494,26 @@ mod std_mod {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::pool::{LocalPool, PoolCfg, SharedPool}; use crate::pool::{LocalPool, PoolCfg};
use crate::pus::tests::CommonTmInfo; use crate::pus::tests::CommonTmInfo;
use crate::pus::verification::{ use crate::pus::verification::{
EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone,
VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender,
VerificationToken, VerificationToken,
}; };
use crate::pus::{EcssChannel, EcssTmtcError, MpscTmInStoreSender, PusTmWrapper}; use crate::pus::{EcssChannel, MpscTmInStoreSender, PusTmWrapper};
use crate::SenderId; use crate::tmtc::tm_helper::SharedTmStore;
use crate::ChannelId;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::format; use alloc::format;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, PusPacket}; use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, PusError, PusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use spacepackets::util::UnsignedEnum; use spacepackets::util::UnsignedEnum;
use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::{mpsc, Arc, RwLock}; use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
use std::vec; use std::vec;
use std::vec::Vec; use std::vec::Vec;
@ -1534,7 +1538,7 @@ mod tests {
} }
impl EcssChannel for TestSender { impl EcssChannel for TestSender {
fn id(&self) -> SenderId { fn id(&self) -> ChannelId {
0 0
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
@ -1543,9 +1547,7 @@ mod tests {
} }
impl EcssTmSenderCore for TestSender { impl EcssTmSenderCore for TestSender {
type Error = (); fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> {
match tm { match tm {
PusTmWrapper::InStore(_) => { PusTmWrapper::InStore(_) => {
panic!("TestSender: Can not deal with addresses"); panic!("TestSender: Can not deal with addresses");
@ -1576,23 +1578,6 @@ mod tests {
} }
} }
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
struct DummyError {}
#[derive(Default, Clone)]
struct FallibleSender {}
impl EcssChannel for FallibleSender {
fn id(&self) -> SenderId {
0
}
}
impl EcssTmSenderCore for FallibleSender {
type Error = DummyError;
fn send_tm(&self, _: PusTmWrapper) -> Result<(), Self::Error> {
Err(DummyError {})
}
}
struct TestBase<'a> { struct TestBase<'a> {
vr: VerificationReporter, vr: VerificationReporter,
#[allow(dead_code)] #[allow(dead_code)]
@ -1604,13 +1589,13 @@ mod tests {
&mut self.vr &mut self.vr
} }
} }
struct TestBaseWithHelper<'a, E> { struct TestBaseWithHelper<'a> {
helper: VerificationReporterWithSender<E>, helper: VerificationReporterWithSender,
#[allow(dead_code)] #[allow(dead_code)]
tc: PusTc<'a>, tc: PusTc<'a>,
} }
impl<'a, E> TestBaseWithHelper<'a, E> { impl<'a> TestBaseWithHelper<'a> {
fn rep(&mut self) -> &mut VerificationReporter { fn rep(&mut self) -> &mut VerificationReporter {
&mut self.helper.reporter &mut self.helper.reporter
} }
@ -1641,10 +1626,7 @@ mod tests {
(TestBase { vr: reporter, tc }, init_tok) (TestBase { vr: reporter, tc }, init_tok)
} }
fn base_with_helper_init() -> ( fn base_with_helper_init() -> (TestBaseWithHelper<'static>, VerificationToken<TcStateNone>) {
TestBaseWithHelper<'static, ()>,
VerificationToken<TcStateNone>,
) {
let mut reporter = base_reporter(); let mut reporter = base_reporter();
let (tc, _) = base_tc_init(None); let (tc, _) = base_tc_init(None);
let init_tok = reporter.add_tc(&tc); let init_tok = reporter.add_tc(&tc);
@ -1674,9 +1656,10 @@ mod tests {
#[test] #[test]
fn test_mpsc_verif_send_sync() { fn test_mpsc_verif_send_sync() {
let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)]));
let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool))); let tm_store = Box::new(pool);
let shared_tm_store = SharedTmStore::new(tm_store);
let (tx, _) = mpsc::channel(); let (tx, _) = mpsc::channel();
let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_pool, tx); let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store, tx);
is_send(&mpsc_verif_sender); is_send(&mpsc_verif_sender);
} }
@ -1707,23 +1690,6 @@ mod tests {
acceptance_check(sender, &tok.req_id); acceptance_check(sender, &tok.req_id);
} }
#[test]
fn test_acceptance_send_fails() {
let (mut b, tok) = base_init(false);
let mut faulty_sender = FallibleSender::default();
let res =
b.vr.acceptance_success(tok, &mut faulty_sender, Some(&EMPTY_STAMP));
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err.1, tok);
match err.0 {
EcssTmtcError::SendError(e) => {
assert_eq!(e, DummyError {})
}
_ => panic!("{}", format!("Unexpected error {:?}", err.0)),
}
}
fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) { fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) {
let cmp_info = TmInfo { let cmp_info = TmInfo {
common: CommonTmInfo { common: CommonTmInfo {
@ -1788,7 +1754,7 @@ mod tests {
let err_with_token = res.unwrap_err(); let err_with_token = res.unwrap_err();
assert_eq!(err_with_token.1, tok); assert_eq!(err_with_token.1, tok);
match err_with_token.0 { match err_with_token.0 {
EcssTmtcError::EcssTmtcError(EcssTmtcError::ByteConversion(e)) => match e { EcssTmtcError::Pus(PusError::ByteConversion(e)) => match e {
ByteConversionError::ToSliceTooSmall(missmatch) => { ByteConversionError::ToSliceTooSmall(missmatch) => {
assert_eq!( assert_eq!(
missmatch.expected, missmatch.expected,
@ -2357,11 +2323,11 @@ mod tests {
// TODO: maybe a bit more extensive testing, all I have time for right now // TODO: maybe a bit more extensive testing, all I have time for right now
fn test_seq_count_increment() { fn test_seq_count_increment() {
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
let shared_tm_pool: SharedPool = let tm_pool = Box::new(LocalPool::new(pool_cfg.clone()));
Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); let shared_tm_store = SharedTmStore::new(tm_pool);
let shared_tm_pool = shared_tm_store.clone_backing_pool();
let (verif_tx, verif_rx) = mpsc::channel(); let (verif_tx, verif_rx) = mpsc::channel();
let sender = let sender = MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_store, verif_tx);
MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_pool.clone(), verif_tx);
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender));

View File

@ -26,7 +26,7 @@ pub mod std_mod {
} }
} }
pub fn backing_pool(&self) -> SharedPool { pub fn clone_backing_pool(&self) -> SharedPool {
self.pool.clone() self.pool.clone()
} }

View File

@ -3,11 +3,12 @@
#[cfg(feature = "crossbeam")] #[cfg(feature = "crossbeam")]
pub mod crossbeam_test { pub mod crossbeam_test {
use hashbrown::HashMap; use hashbrown::HashMap;
use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider};
use satrs_core::pus::verification::{ use satrs_core::pus::verification::{
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
}; };
use satrs_core::pus::MpscTmInStoreSender; use satrs_core::pus::MpscTmInStoreSender;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
@ -35,13 +36,12 @@ pub mod crossbeam_test {
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
// Shared pool object to store the verification PUS telemetry // Shared pool object to store the verification PUS telemetry
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
let shared_tm_pool: SharedPool = let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone())));
Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone(); let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let sender = let sender =
MpscTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone());
let mut reporter_with_sender_0 = let mut reporter_with_sender_0 =
VerificationReporterWithSender::new(&cfg, Box::new(sender)); VerificationReporterWithSender::new(&cfg, Box::new(sender));
let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
@ -146,8 +146,9 @@ pub mod crossbeam_test {
.recv_timeout(Duration::from_millis(50)) .recv_timeout(Duration::from_millis(50))
.expect("Packet reception timeout"); .expect("Packet reception timeout");
let tm_len; let tm_len;
let shared_tm_store = shared_tm_store.clone_backing_pool();
{ {
let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); let mut rg = shared_tm_store.write().expect("Error locking shared pool");
let store_guard = rg.read_with_guard(verif_addr); let store_guard = rg.read_with_guard(verif_addr);
let slice = store_guard.read().expect("Error reading TM slice"); let slice = store_guard.read().expect("Error reading TM slice");
tm_len = slice.len(); tm_len = slice.len();

View File

@ -83,4 +83,14 @@ pub enum TmSenderId {
PusHk = 3, PusHk = 3,
PusAction = 4, PusAction = 4,
PusSched = 5, PusSched = 5,
AllEvents = 6,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TcReceiverId {
PusTest = 1,
PusEvent = 2,
PusHk = 3,
PusAction = 4,
PusSched = 5,
} }

View File

@ -32,8 +32,10 @@ use satrs_core::pus::hk::Subservice as HkSubservice;
use satrs_core::pus::scheduler::PusScheduler; use satrs_core::pus::scheduler::PusScheduler;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::test::PusService17TestHandler;
use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; use satrs_core::pus::verification::{
use satrs_core::pus::MpscTmInStoreSender; TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken,
};
use satrs_core::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender};
use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore};
use satrs_core::spacepackets::tm::PusTmZeroCopyWriter; use satrs_core::spacepackets::tm::PusTmZeroCopyWriter;
use satrs_core::spacepackets::{ use satrs_core::spacepackets::{
@ -44,8 +46,8 @@ use satrs_core::spacepackets::{
}; };
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{AddressableId, TargetId}; use satrs_core::tmtc::{AddressableId, TargetId};
use satrs_core::SenderId; use satrs_core::ChannelId;
use satrs_example::{RequestTargetId, TmSenderId, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs_example::{RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, SERVER_PORT};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc::{channel, TryRecvError}; use std::sync::mpsc::{channel, TryRecvError};
@ -85,7 +87,7 @@ fn main() {
let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel();
let verif_sender = MpscTmInStoreSender::new( let verif_sender = MpscTmInStoreSender::new(
TmSenderId::PusVerification as SenderId, TmSenderId::PusVerification as ChannelId,
"verif_sender", "verif_sender",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
@ -156,13 +158,18 @@ fn main() {
action_service_receiver: pus_action_tx, action_service_receiver: pus_action_tx,
}; };
let test_srv_tm_sender = MpscTmInStoreSender::new( let test_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusTest as SenderId, TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER", "PUS_17_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus17_handler = PusService17TestHandler::new( let test_srv_receiver = MpscTcInStoreReceiver::new(
TcReceiverId::PusTest as ChannelId,
"PUS_17_TC_RECV",
pus_test_rx, pus_test_rx,
);
let pus17_handler = PusService17TestHandler::new(
Box::new(test_srv_receiver),
tc_store.pool.clone(), tc_store.pool.clone(),
Box::new(test_srv_tm_sender), Box::new(test_srv_tm_sender),
PUS_APID, PUS_APID,
@ -174,15 +181,20 @@ fn main() {
}; };
let sched_srv_tm_sender = MpscTmInStoreSender::new( let sched_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusSched as SenderId, TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER", "PUS_11_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let sched_srv_receiver = MpscTcInStoreReceiver::new(
TcReceiverId::PusSched as ChannelId,
"PUS_11_TC_RECV",
pus_sched_rx,
);
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed"); .expect("Creating PUS Scheduler failed");
let pus_11_handler = PusService11SchedHandler::new( let pus_11_handler = PusService11SchedHandler::new(
pus_sched_rx, Box::new(sched_srv_receiver),
tc_store.pool.clone(), tc_store.pool.clone(),
Box::new(sched_srv_tm_sender), Box::new(sched_srv_tm_sender),
PUS_APID, PUS_APID,
@ -195,13 +207,18 @@ fn main() {
}; };
let event_srv_tm_sender = MpscTmInStoreSender::new( let event_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusEvent as SenderId, TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER", "PUS_5_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus_5_handler = PusService5EventHandler::new( let event_srv_receiver = MpscTcInStoreReceiver::new(
TcReceiverId::PusEvent as ChannelId,
"PUS_5_TC_RECV",
pus_event_rx, pus_event_rx,
);
let pus_5_handler = PusService5EventHandler::new(
Box::new(event_srv_receiver),
tc_store.pool.clone(), tc_store.pool.clone(),
Box::new(event_srv_tm_sender), Box::new(event_srv_tm_sender),
PUS_APID, PUS_APID,
@ -211,13 +228,18 @@ fn main() {
let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler }; let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler };
let action_srv_tm_sender = MpscTmInStoreSender::new( let action_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusAction as SenderId, TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER", "PUS_8_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus_8_handler = PusService8ActionHandler::new( let action_srv_receiver = MpscTcInStoreReceiver::new(
TcReceiverId::PusAction as ChannelId,
"PUS_8_TC_RECV",
pus_action_rx, pus_action_rx,
);
let pus_8_handler = PusService8ActionHandler::new(
Box::new(action_srv_receiver),
tc_store.pool.clone(), tc_store.pool.clone(),
Box::new(action_srv_tm_sender), Box::new(action_srv_tm_sender),
PUS_APID, PUS_APID,
@ -227,13 +249,15 @@ fn main() {
let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler };
let hk_srv_tm_sender = MpscTmInStoreSender::new( let hk_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusHk as SenderId, TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER", "PUS_3_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let hk_srv_receiver =
MpscTcInStoreReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx);
let pus_3_handler = PusService3HkHandler::new( let pus_3_handler = PusService3HkHandler::new(
pus_hk_rx, Box::new(hk_srv_receiver),
tc_store.pool.clone(), tc_store.pool.clone(),
Box::new(hk_srv_tm_sender), Box::new(hk_srv_tm_sender),
PUS_APID, PUS_APID,
@ -262,7 +286,7 @@ fn main() {
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update // Read the TM, set sequence counter and message counter, and finally update
// the CRC. // the CRC.
let shared_pool = shared_tm_store.backing_pool(); let shared_pool = shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard let tm_raw = pool_guard
.modify(&addr) .modify(&addr)
@ -297,12 +321,20 @@ fn main() {
.name("Event".to_string()) .name("Event".to_string())
.spawn(move || { .spawn(move || {
let mut timestamp: [u8; 7] = [0; 7]; let mut timestamp: [u8; 7] = [0; 7];
let mut sender = let mut sender = MpscTmInStoreSender::new(
MpscTmInStoreSender::new(1, "event_sender", tm_store_event.clone(), tm_funnel_tx); TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
tm_store_event.clone(),
tm_funnel_tx,
);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
let started_token: VerificationToken<TcStateStarted> = event_req
.token
.try_into()
.expect("expected start verification token");
reporter_event_handler reporter_event_handler
.completion_success(event_req.token.try_into().unwrap(), Some(timestamp)) .completion_success(started_token, Some(timestamp))
.expect("Sending completion success failed"); .expect("Sending completion success failed");
}; };
loop { loop {

View File

@ -5,7 +5,7 @@ use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
}; };
use satrs_core::pus::{ use satrs_core::pus::{
AcceptedTc, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase,
PusServiceHandler, PusServiceHandler,
}; };
use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::ecss::PusPacket;
@ -13,7 +13,7 @@ use satrs_core::spacepackets::tc::PusTc;
use satrs_core::tmtc::TargetId; use satrs_core::tmtc::TargetId;
use satrs_example::tmtc_err; use satrs_example::tmtc_err;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::Sender;
pub struct PusService8ActionHandler { pub struct PusService8ActionHandler {
psb: PusServiceBase, psb: PusServiceBase,
@ -22,15 +22,21 @@ pub struct PusService8ActionHandler {
impl PusService8ActionHandler { impl PusService8ActionHandler {
pub fn new( pub fn new(
receiver: Receiver<AcceptedTc>, tc_receiver: Box<dyn EcssTcReceiver>,
tc_pool: SharedPool, shared_tc_pool: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
request_handlers: HashMap<TargetId, Sender<RequestWithToken>>, request_handlers: HashMap<TargetId, Sender<RequestWithToken>>,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(receiver, tc_pool, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(
tc_receiver,
shared_tc_pool,
tm_sender,
tm_apid,
verification_handler,
),
request_handlers, request_handlers,
} }
} }

View File

@ -6,7 +6,7 @@ use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
}; };
use satrs_core::pus::{ use satrs_core::pus::{
AcceptedTc, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase,
PusServiceHandler, PusServiceHandler,
}; };
use satrs_core::spacepackets::ecss::{hk, PusPacket}; use satrs_core::spacepackets::ecss::{hk, PusPacket};
@ -14,7 +14,7 @@ use satrs_core::spacepackets::tc::PusTc;
use satrs_core::tmtc::{AddressableId, TargetId}; use satrs_core::tmtc::{AddressableId, TargetId};
use satrs_example::{hk_err, tmtc_err}; use satrs_example::{hk_err, tmtc_err};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::Sender;
pub struct PusService3HkHandler { pub struct PusService3HkHandler {
psb: PusServiceBase, psb: PusServiceBase,
@ -23,15 +23,21 @@ pub struct PusService3HkHandler {
impl PusService3HkHandler { impl PusService3HkHandler {
pub fn new( pub fn new(
receiver: Receiver<AcceptedTc>, tc_receiver: Box<dyn EcssTcReceiver>,
tc_pool: SharedPool, shared_tc_pool: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
request_handlers: HashMap<TargetId, Sender<RequestWithToken>>, request_handlers: HashMap<TargetId, Sender<RequestWithToken>>,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(receiver, tc_pool, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(
tc_receiver,
shared_tc_pool,
tm_sender,
tm_apid,
verification_handler,
),
request_handlers, request_handlers,
} }
} }

View File

@ -2,7 +2,7 @@ use crate::tmtc::MpscStoreAndSendError;
use log::warn; use log::warn;
use satrs_core::pool::StoreAddr; use satrs_core::pool::StoreAddr;
use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender};
use satrs_core::pus::{AcceptedTc, PusPacketHandlerResult}; use satrs_core::pus::{PusPacketHandlerResult, TcAddrWithToken};
use satrs_core::spacepackets::ecss::PusServiceId; use satrs_core::spacepackets::ecss::PusServiceId;
use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::tc::PusTc;
use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::cds::TimeProvider;
@ -17,11 +17,11 @@ pub mod scheduler;
pub mod test; pub mod test;
pub struct PusTcMpscRouter { pub struct PusTcMpscRouter {
pub test_service_receiver: Sender<AcceptedTc>, pub test_service_receiver: Sender<TcAddrWithToken>,
pub event_service_receiver: Sender<AcceptedTc>, pub event_service_receiver: Sender<TcAddrWithToken>,
pub sched_service_receiver: Sender<AcceptedTc>, pub sched_service_receiver: Sender<TcAddrWithToken>,
pub hk_service_receiver: Sender<AcceptedTc>, pub hk_service_receiver: Sender<TcAddrWithToken>,
pub action_service_receiver: Sender<AcceptedTc>, pub action_service_receiver: Sender<TcAddrWithToken>,
} }
pub struct PusReceiver { pub struct PusReceiver {
@ -86,20 +86,20 @@ impl PusReceiver {
PusServiceId::Test => { PusServiceId::Test => {
self.pus_router self.pus_router
.test_service_receiver .test_service_receiver
.send((store_addr, accepted_token))?; .send((store_addr, accepted_token.into()))?;
} }
PusServiceId::Housekeeping => self PusServiceId::Housekeeping => self
.pus_router .pus_router
.hk_service_receiver .hk_service_receiver
.send((store_addr, accepted_token))?, .send((store_addr, accepted_token.into()))?,
PusServiceId::Event => self PusServiceId::Event => self
.pus_router .pus_router
.event_service_receiver .event_service_receiver
.send((store_addr, accepted_token))?, .send((store_addr, accepted_token.into()))?,
PusServiceId::Scheduling => self PusServiceId::Scheduling => self
.pus_router .pus_router
.sched_service_receiver .sched_service_receiver
.send((store_addr, accepted_token))?, .send((store_addr, accepted_token.into()))?,
_ => { _ => {
let result = self.verif_reporter.start_failure( let result = self.verif_reporter.start_failure(
accepted_token, accepted_token,

View File

@ -10,7 +10,7 @@ use crate::ccsds::CcsdsReceiver;
use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::{PusReceiver, PusTcMpscRouter};
use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::pus::verification::StdVerifReporterWithSender;
use satrs_core::pus::{AcceptedTc, ReceivesEcssPusTc}; use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken};
use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket}; use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket};
use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::tc::PusTc;
use satrs_core::spacepackets::SpHeader; use satrs_core::spacepackets::SpHeader;
@ -42,7 +42,7 @@ pub enum MpscStoreAndSendError {
#[error("Store error: {0}")] #[error("Store error: {0}")]
Store(#[from] StoreError), Store(#[from] StoreError),
#[error("TC send error: {0}")] #[error("TC send error: {0}")]
TcSend(#[from] SendError<AcceptedTc>), TcSend(#[from] SendError<TcAddrWithToken>),
#[error("TMTC send error: {0}")] #[error("TMTC send error: {0}")]
TmTcSend(#[from] SendError<StoreAddr>), TmTcSend(#[from] SendError<StoreAddr>),
} }
@ -123,7 +123,7 @@ pub fn core_tmtc_task(
let mut udp_tmtc_server = UdpTmtcServer { let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server, udp_tc_server,
tm_rx: tm_args.tm_server_rx, tm_rx: tm_args.tm_server_rx,
tm_store: tm_args.tm_store.backing_pool(), tm_store: tm_args.tm_store.clone_backing_pool(),
}; };
let mut tc_buf: [u8; 4096] = [0; 4096]; let mut tc_buf: [u8; 4096] = [0; 4096];