Merge pull request 'Refactored Verification Reporter Module' (#132) from refactor-verification-mod into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

Reviewed-on: #132
This commit is contained in:
Robin Müller 2024-02-26 11:58:57 +01:00
commit cf9b115e1e
25 changed files with 571 additions and 414 deletions

View File

@ -1,7 +1,7 @@
use std::sync::mpsc::{self, TryRecvError}; use std::sync::mpsc::{self, TryRecvError};
use log::{info, warn}; use log::{info, warn};
use satrs::pus::verification::{VerificationReporterWithSender, VerificationReportingProvider}; use satrs::pus::verification::VerificationReportingProvider;
use satrs::pus::{EcssTmSender, PusTmWrapper}; use satrs::pus::{EcssTmSender, PusTmWrapper};
use satrs::request::TargetAndApidId; use satrs::request::TargetAndApidId;
use satrs::spacepackets::ecss::hk::Subservice as HkSubservice; use satrs::spacepackets::ecss::hk::Subservice as HkSubservice;
@ -21,19 +21,19 @@ use crate::{
update_time, update_time,
}; };
pub struct AcsTask { pub struct AcsTask<VerificationReporter: VerificationReportingProvider> {
timestamp: [u8; 7], timestamp: [u8; 7],
time_provider: TimeProvider<DaysLen16Bits>, time_provider: TimeProvider<DaysLen16Bits>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporter,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
request_rx: mpsc::Receiver<RequestWithToken>, request_rx: mpsc::Receiver<RequestWithToken>,
} }
impl AcsTask { impl<VerificationReporter: VerificationReportingProvider> AcsTask<VerificationReporter> {
pub fn new( pub fn new(
tm_sender: impl EcssTmSender, tm_sender: impl EcssTmSender,
request_rx: mpsc::Receiver<RequestWithToken>, request_rx: mpsc::Receiver<RequestWithToken>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporter,
) -> Self { ) -> Self {
Self { Self {
timestamp: [0; 7], timestamp: [0; 7],

View File

@ -11,10 +11,7 @@ use satrs::{
event_man::{ event_man::{
DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken,
}, },
verification::{ verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
TcStateStarted, VerificationReporterWithSender, VerificationReportingProvider,
VerificationToken,
},
EcssTmSender, EcssTmSender,
}, },
spacepackets::time::cds::{self, TimeProvider}, spacepackets::time::cds::{self, TimeProvider},
@ -23,21 +20,21 @@ use satrs_example::config::PUS_APID;
use crate::update_time; use crate::update_time;
pub struct PusEventHandler { pub struct PusEventHandler<VerificationReporter: VerificationReportingProvider> {
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>,
pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>, pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
time_provider: TimeProvider, time_provider: TimeProvider,
timestamp: [u8; 7], timestamp: [u8; 7],
verif_handler: VerificationReporterWithSender, verif_handler: VerificationReporter,
} }
/* /*
*/ */
impl PusEventHandler { impl<VerificationReporter: VerificationReportingProvider> PusEventHandler<VerificationReporter> {
pub fn new( pub fn new(
verif_handler: VerificationReporterWithSender, verif_handler: VerificationReporter,
event_manager: &mut EventManagerWithBoundedMpsc, event_manager: &mut EventManagerWithBoundedMpsc,
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
tm_sender: impl EcssTmSender, tm_sender: impl EcssTmSender,
@ -147,15 +144,15 @@ impl EventManagerWrapper {
} }
} }
pub struct EventHandler { pub struct EventHandler<VerificationReporter: VerificationReportingProvider> {
pub event_man_wrapper: EventManagerWrapper, pub event_man_wrapper: EventManagerWrapper,
pub pus_event_handler: PusEventHandler, pub pus_event_handler: PusEventHandler<VerificationReporter>,
} }
impl EventHandler { impl<VerificationReporter: VerificationReportingProvider> EventHandler<VerificationReporter> {
pub fn new( pub fn new(
tm_sender: impl EcssTmSender, tm_sender: impl EcssTmSender,
verif_handler: VerificationReporterWithSender, verif_handler: VerificationReporter,
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self { ) -> Self {
let mut event_man_wrapper = EventManagerWrapper::new(); let mut event_man_wrapper = EventManagerWrapper::new();

View File

@ -44,7 +44,7 @@ use crate::tmtc::{
use crate::udp::{StaticUdpTmHandler, UdpTmtcServer}; use crate::udp::{StaticUdpTmHandler, UdpTmtcServer};
use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; use satrs::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
use satrs::pus::{EcssTmSender, MpscTmAsVecSender, MpscTmInSharedPoolSender}; use satrs::pus::{EcssTmSender, TmAsVecSenderWithId, TmInSharedPoolSenderWithId};
use satrs::spacepackets::{time::cds::TimeProvider, time::TimeWriter}; use satrs::spacepackets::{time::cds::TimeProvider, time::TimeWriter};
use satrs::tmtc::CcsdsDistributor; use satrs::tmtc::CcsdsDistributor;
use satrs::ChannelId; use satrs::ChannelId;
@ -54,11 +54,13 @@ use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender { fn create_verification_reporter<Sender: EcssTmSender + Clone>(
verif_sender: Sender,
) -> VerificationReporterWithSender<Sender> {
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned // Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter. // verification reporter.
VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)) VerificationReporterWithSender::new(&verif_cfg, verif_sender)
} }
#[allow(dead_code)] #[allow(dead_code)]
@ -68,13 +70,13 @@ fn static_tmtc_pool_main() {
let shared_tc_pool = SharedTcPool { let shared_tc_pool = SharedTcPool {
pool: Arc::new(RwLock::new(tc_pool)), pool: Arc::new(RwLock::new(tc_pool)),
}; };
let (tc_source_tx, tc_source_rx) = channel(); let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50);
let (tm_server_tx, tm_server_rx) = channel(); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
// Every software component which needs to generate verification telemetry, receives a cloned // Every software component which needs to generate verification telemetry, receives a cloned
// verification reporter. // verification reporter.
let verif_reporter = create_verification_reporter(MpscTmInSharedPoolSender::new( let verif_reporter = create_verification_reporter(TmInSharedPoolSenderWithId::new(
TmSenderId::PusVerification as ChannelId, TmSenderId::PusVerification as ChannelId,
"verif_sender", "verif_sender",
shared_tm_pool.clone(), shared_tm_pool.clone(),
@ -102,7 +104,7 @@ fn static_tmtc_pool_main() {
// The event task is the core handler to perform the event routing and TM handling as specified // The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation. // in the sat-rs documentation.
let mut event_handler = EventHandler::new( let mut event_handler = EventHandler::new(
MpscTmInSharedPoolSender::new( TmInSharedPoolSenderWithId::new(
TmSenderId::AllEvents as ChannelId, TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX", "ALL_EVENTS_TX",
shared_tm_pool.clone(), shared_tm_pool.clone(),
@ -202,7 +204,7 @@ fn static_tmtc_pool_main() {
.expect("tcp server creation failed"); .expect("tcp server creation failed");
let mut acs_task = AcsTask::new( let mut acs_task = AcsTask::new(
MpscTmInSharedPoolSender::new( TmInSharedPoolSenderWithId::new(
TmSenderId::AcsSubsystem as ChannelId, TmSenderId::AcsSubsystem as ChannelId,
"ACS_TASK_SENDER", "ACS_TASK_SENDER",
shared_tm_pool.clone(), shared_tm_pool.clone(),
@ -303,7 +305,7 @@ fn dyn_tmtc_pool_main() {
let (tm_server_tx, tm_server_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel();
// Every software component which needs to generate verification telemetry, gets a cloned // Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter. // verification reporter.
let verif_reporter = create_verification_reporter(MpscTmAsVecSender::new( let verif_reporter = create_verification_reporter(TmAsVecSenderWithId::new(
TmSenderId::PusVerification as ChannelId, TmSenderId::PusVerification as ChannelId,
"verif_sender", "verif_sender",
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
@ -324,7 +326,7 @@ fn dyn_tmtc_pool_main() {
// The event task is the core handler to perform the event routing and TM handling as specified // The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation. // in the sat-rs documentation.
let mut event_handler = EventHandler::new( let mut event_handler = EventHandler::new(
MpscTmAsVecSender::new( TmAsVecSenderWithId::new(
TmSenderId::AllEvents as ChannelId, TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX", "ALL_EVENTS_TX",
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
@ -415,7 +417,7 @@ fn dyn_tmtc_pool_main() {
.expect("tcp server creation failed"); .expect("tcp server creation failed");
let mut acs_task = AcsTask::new( let mut acs_task = AcsTask::new(
MpscTmAsVecSender::new( TmAsVecSenderWithId::new(
TmSenderId::AcsSubsystem as ChannelId, TmSenderId::AcsSubsystem as ChannelId,
"ACS_TASK_SENDER", "ACS_TASK_SENDER",
tm_funnel_tx.clone(), tm_funnel_tx.clone(),

View File

@ -2,14 +2,16 @@ use log::{error, warn};
use satrs::action::ActionRequest; use satrs::action::ActionRequest;
use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::action::{PusActionToRequestConverter, PusService8ActionHandler}; use satrs::pus::action::{PusActionToRequestConverter, PusService8ActionHandler};
use satrs::pus::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use satrs::pus::verification::{ use satrs::pus::verification::{
FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationReportingProvider, FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken,
VerificationToken,
}; };
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusServiceHelper,
PusPacketHandlingError, PusServiceHelper, TmAsVecSenderWithId, TmInSharedPoolSenderWithId,
}; };
use satrs::request::TargetAndApidId; use satrs::request::TargetAndApidId;
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
@ -74,13 +76,14 @@ impl PusActionToRequestConverter for ExampleActionRequestConverter {
pub fn create_action_service_static( pub fn create_action_service_static(
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::SyncSender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>, pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter, action_router: GenericRequestRouter,
) -> Pus8Wrapper<EcssTcInSharedStoreConverter> { ) -> Pus8Wrapper<EcssTcInSharedStoreConverter, VerificationReporterWithSharedPoolMpscBoundedSender>
let action_srv_tm_sender = MpscTmInSharedPoolSender::new( {
let action_srv_tm_sender = TmInSharedPoolSenderWithId::new(
TmSenderId::PusAction as ChannelId, TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER", "PUS_8_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
@ -108,11 +111,11 @@ pub fn create_action_service_static(
pub fn create_action_service_dynamic( pub fn create_action_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithVecMpscSender,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>, pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter, action_router: GenericRequestRouter,
) -> Pus8Wrapper<EcssTcInVecConverter> { ) -> Pus8Wrapper<EcssTcInVecConverter, VerificationReporterWithVecMpscSender> {
let action_srv_tm_sender = MpscTmAsVecSender::new( let action_srv_tm_sender = TmAsVecSenderWithId::new(
TmSenderId::PusAction as ChannelId, TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER", "PUS_8_TM_SENDER",
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
@ -137,17 +140,24 @@ pub fn create_action_service_dynamic(
Pus8Wrapper { pus_8_handler } Pus8Wrapper { pus_8_handler }
} }
pub struct Pus8Wrapper<TcInMemConverter: EcssTcInMemConverter> { pub struct Pus8Wrapper<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> {
pub(crate) pus_8_handler: PusService8ActionHandler< pub(crate) pus_8_handler: PusService8ActionHandler<
TcInMemConverter, TcInMemConverter,
VerificationReporterWithSender, VerificationReporter,
ExampleActionRequestConverter, ExampleActionRequestConverter,
GenericRequestRouter, GenericRequestRouter,
GenericRoutingErrorHandler<8>, GenericRoutingErrorHandler<8>,
>, >,
} }
impl<TcInMemConverter: EcssTcInMemConverter> Pus8Wrapper<TcInMemConverter> { impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> Pus8Wrapper<TcInMemConverter, VerificationReporter>
{
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_8_handler.handle_one_tc() { match self.pus_8_handler.handle_one_tc() {
Ok(result) => match result { Ok(result) => match result {

View File

@ -4,11 +4,14 @@ use log::{error, warn};
use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusService5EventHandler; use satrs::pus::event_srv::PusService5EventHandler;
use satrs::pus::verification::VerificationReporterWithSender; use satrs::pus::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use satrs::pus::verification::VerificationReportingProvider;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, TmAsVecSenderWithId,
PusServiceHelper, TmInSharedPoolSenderWithId,
}; };
use satrs::tmtc::tm_helper::SharedTmPool; use satrs::tmtc::tm_helper::SharedTmPool;
use satrs::ChannelId; use satrs::ChannelId;
@ -16,13 +19,14 @@ use satrs_example::config::{TcReceiverId, TmSenderId, PUS_APID};
pub fn create_event_service_static( pub fn create_event_service_static(
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::SyncSender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>, pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>, event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> Pus5Wrapper<EcssTcInSharedStoreConverter> { ) -> Pus5Wrapper<EcssTcInSharedStoreConverter, VerificationReporterWithSharedPoolMpscBoundedSender>
let event_srv_tm_sender = MpscTmInSharedPoolSender::new( {
let event_srv_tm_sender = TmInSharedPoolSenderWithId::new(
TmSenderId::PusEvent as ChannelId, TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER", "PUS_5_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
@ -48,11 +52,11 @@ pub fn create_event_service_static(
pub fn create_event_service_dynamic( pub fn create_event_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithVecMpscSender,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>, pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>, event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> Pus5Wrapper<EcssTcInVecConverter> { ) -> Pus5Wrapper<EcssTcInVecConverter, VerificationReporterWithVecMpscSender> {
let event_srv_tm_sender = MpscTmAsVecSender::new( let event_srv_tm_sender = TmAsVecSenderWithId::new(
TmSenderId::PusEvent as ChannelId, TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER", "PUS_5_TM_SENDER",
tm_funnel_tx, tm_funnel_tx,
@ -75,11 +79,18 @@ pub fn create_event_service_dynamic(
Pus5Wrapper { pus_5_handler } Pus5Wrapper { pus_5_handler }
} }
pub struct Pus5Wrapper<TcInMemConverter: EcssTcInMemConverter> { pub struct Pus5Wrapper<
pub pus_5_handler: PusService5EventHandler<TcInMemConverter, VerificationReporterWithSender>, TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> {
pub pus_5_handler: PusService5EventHandler<TcInMemConverter, VerificationReporter>,
} }
impl<TcInMemConverter: EcssTcInMemConverter> Pus5Wrapper<TcInMemConverter> { impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> Pus5Wrapper<TcInMemConverter, VerificationReporter>
{
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_5_handler.handle_one_tc() { match self.pus_5_handler.handle_one_tc() {
Ok(result) => match result { Ok(result) => match result {

View File

@ -2,14 +2,16 @@ use log::{error, warn};
use satrs::hk::{CollectionIntervalFactor, HkRequest}; use satrs::hk::{CollectionIntervalFactor, HkRequest};
use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::hk::{PusHkToRequestConverter, PusService3HkHandler}; use satrs::pus::hk::{PusHkToRequestConverter, PusService3HkHandler};
use satrs::pus::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use satrs::pus::verification::{ use satrs::pus::verification::{
FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationReportingProvider, FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken,
VerificationToken,
}; };
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusServiceHelper,
PusPacketHandlingError, PusServiceHelper, TmAsVecSenderWithId, TmInSharedPoolSenderWithId,
}; };
use satrs::request::TargetAndApidId; use satrs::request::TargetAndApidId;
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
@ -143,13 +145,14 @@ impl PusHkToRequestConverter for ExampleHkRequestConverter {
pub fn create_hk_service_static( pub fn create_hk_service_static(
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::SyncSender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>, pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_router: GenericRequestRouter, request_router: GenericRequestRouter,
) -> Pus3Wrapper<EcssTcInSharedStoreConverter> { ) -> Pus3Wrapper<EcssTcInSharedStoreConverter, VerificationReporterWithSharedPoolMpscBoundedSender>
let hk_srv_tm_sender = MpscTmInSharedPoolSender::new( {
let hk_srv_tm_sender = TmInSharedPoolSenderWithId::new(
TmSenderId::PusHk as ChannelId, TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER", "PUS_3_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
@ -174,11 +177,11 @@ pub fn create_hk_service_static(
pub fn create_hk_service_dynamic( pub fn create_hk_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithVecMpscSender,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>, pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_router: GenericRequestRouter, request_router: GenericRequestRouter,
) -> Pus3Wrapper<EcssTcInVecConverter> { ) -> Pus3Wrapper<EcssTcInVecConverter, VerificationReporterWithVecMpscSender> {
let hk_srv_tm_sender = MpscTmAsVecSender::new( let hk_srv_tm_sender = TmAsVecSenderWithId::new(
TmSenderId::PusHk as ChannelId, TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER", "PUS_3_TM_SENDER",
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
@ -200,17 +203,24 @@ pub fn create_hk_service_dynamic(
Pus3Wrapper { pus_3_handler } Pus3Wrapper { pus_3_handler }
} }
pub struct Pus3Wrapper<TcInMemConverter: EcssTcInMemConverter> { pub struct Pus3Wrapper<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> {
pub(crate) pus_3_handler: PusService3HkHandler< pub(crate) pus_3_handler: PusService3HkHandler<
TcInMemConverter, TcInMemConverter,
VerificationReporterWithSender, VerificationReporter,
ExampleHkRequestConverter, ExampleHkRequestConverter,
GenericRequestRouter, GenericRequestRouter,
GenericRoutingErrorHandler<3>, GenericRoutingErrorHandler<3>,
>, >,
} }
impl<TcInMemConverter: EcssTcInMemConverter> Pus3Wrapper<TcInMemConverter> { impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> Pus3Wrapper<TcInMemConverter, VerificationReporter>
{
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_3_handler.handle_one_tc() { match self.pus_3_handler.handle_one_tc() {
Ok(result) => match result { Ok(result) => match result {

View File

@ -1,8 +1,6 @@
use crate::tmtc::MpscStoreAndSendError; use crate::tmtc::MpscStoreAndSendError;
use log::warn; use log::warn;
use satrs::pus::verification::{ use satrs::pus::verification::{FailParams, VerificationReportingProvider};
FailParams, StdVerifReporterWithSender, VerificationReportingProvider,
};
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, GenericRoutingError, PusPacketHandlerResult, PusRoutingErrorHandler, TcInMemory, EcssTcAndToken, GenericRoutingError, PusPacketHandlerResult, PusRoutingErrorHandler, TcInMemory,
}; };
@ -28,8 +26,8 @@ pub struct PusTcMpscRouter {
pub action_service_receiver: Sender<EcssTcAndToken>, pub action_service_receiver: Sender<EcssTcAndToken>,
} }
pub struct PusReceiver { pub struct PusReceiver<VerificationReporter: VerificationReportingProvider> {
pub verif_reporter: StdVerifReporterWithSender, pub verif_reporter: VerificationReporter,
pub pus_router: PusTcMpscRouter, pub pus_router: PusTcMpscRouter,
stamp_helper: TimeStampHelper, stamp_helper: TimeStampHelper,
} }
@ -61,8 +59,8 @@ impl TimeStampHelper {
} }
} }
impl PusReceiver { impl<VerificationReporter: VerificationReportingProvider> PusReceiver<VerificationReporter> {
pub fn new(verif_reporter: StdVerifReporterWithSender, pus_router: PusTcMpscRouter) -> Self { pub fn new(verif_reporter: VerificationReporter, pus_router: PusTcMpscRouter) -> Self {
Self { Self {
verif_reporter, verif_reporter,
pus_router, pus_router,
@ -71,7 +69,7 @@ impl PusReceiver {
} }
} }
impl PusReceiver { impl<VerificationReporter: VerificationReportingProvider> PusReceiver<VerificationReporter> {
pub fn handle_tc_packet( pub fn handle_tc_packet(
&mut self, &mut self,
tc_in_memory: TcInMemory, tc_in_memory: TcInMemory,

View File

@ -5,11 +5,14 @@ use log::{error, info, warn};
use satrs::pool::{PoolProvider, StaticMemoryPool, StoreAddr}; use satrs::pool::{PoolProvider, StaticMemoryPool, StoreAddr};
use satrs::pus::scheduler::{PusScheduler, TcInfo}; use satrs::pus::scheduler::{PusScheduler, TcInfo};
use satrs::pus::scheduler_srv::PusService11SchedHandler; use satrs::pus::scheduler_srv::PusService11SchedHandler;
use satrs::pus::verification::VerificationReporterWithSender; use satrs::pus::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use satrs::pus::verification::VerificationReportingProvider;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, TmAsVecSenderWithId,
PusServiceHelper, TmInSharedPoolSenderWithId,
}; };
use satrs::tmtc::tm_helper::SharedTmPool; use satrs::tmtc::tm_helper::SharedTmPool;
use satrs::ChannelId; use satrs::ChannelId;
@ -51,15 +54,22 @@ impl TcReleaser for mpsc::Sender<Vec<u8>> {
} }
} }
pub struct Pus11Wrapper<TcInMemConverter: EcssTcInMemConverter> { pub struct Pus11Wrapper<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> {
pub pus_11_handler: pub pus_11_handler:
PusService11SchedHandler<TcInMemConverter, VerificationReporterWithSender, PusScheduler>, PusService11SchedHandler<TcInMemConverter, VerificationReporter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool, pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096], pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>, pub tc_releaser: Box<dyn TcReleaser + Send>,
} }
impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> { impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> Pus11Wrapper<TcInMemConverter, VerificationReporter>
{
pub fn release_tcs(&mut self) { pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(enabled, info, tc) self.tc_releaser.release(enabled, info, tc)
@ -110,13 +120,14 @@ impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> {
pub fn create_scheduler_service_static( pub fn create_scheduler_service_static(
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::SyncSender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender,
tc_releaser: PusTcSourceProviderSharedPool, tc_releaser: PusTcSourceProviderSharedPool,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>, pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool, sched_tc_pool: StaticMemoryPool,
) -> Pus11Wrapper<EcssTcInSharedStoreConverter> { ) -> Pus11Wrapper<EcssTcInSharedStoreConverter, VerificationReporterWithSharedPoolMpscBoundedSender>
let sched_srv_tm_sender = MpscTmInSharedPoolSender::new( {
let sched_srv_tm_sender = TmInSharedPoolSenderWithId::new(
TmSenderId::PusSched as ChannelId, TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER", "PUS_11_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
@ -149,12 +160,12 @@ pub fn create_scheduler_service_static(
pub fn create_scheduler_service_dynamic( pub fn create_scheduler_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithVecMpscSender,
tc_source_sender: mpsc::Sender<Vec<u8>>, tc_source_sender: mpsc::Sender<Vec<u8>>,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>, pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool, sched_tc_pool: StaticMemoryPool,
) -> Pus11Wrapper<EcssTcInVecConverter> { ) -> Pus11Wrapper<EcssTcInVecConverter, VerificationReporterWithVecMpscSender> {
let sched_srv_tm_sender = MpscTmAsVecSender::new( let sched_srv_tm_sender = TmAsVecSenderWithId::new(
TmSenderId::PusSched as ChannelId, TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER", "PUS_11_TM_SENDER",
tm_funnel_tx, tm_funnel_tx,

View File

@ -1,25 +1,32 @@
use satrs::pus::EcssTcInMemConverter; use satrs::pus::{verification::VerificationReportingProvider, EcssTcInMemConverter};
use super::{ use super::{
action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper,
test::Service17CustomWrapper, test::Service17CustomWrapper,
}; };
pub struct PusStack<TcInMemConverter: EcssTcInMemConverter> { pub struct PusStack<
event_srv: Pus5Wrapper<TcInMemConverter>, TcInMemConverter: EcssTcInMemConverter,
hk_srv: Pus3Wrapper<TcInMemConverter>, VerificationReporter: VerificationReportingProvider,
action_srv: Pus8Wrapper<TcInMemConverter>, > {
schedule_srv: Pus11Wrapper<TcInMemConverter>, event_srv: Pus5Wrapper<TcInMemConverter, VerificationReporter>,
test_srv: Service17CustomWrapper<TcInMemConverter>, hk_srv: Pus3Wrapper<TcInMemConverter, VerificationReporter>,
action_srv: Pus8Wrapper<TcInMemConverter, VerificationReporter>,
schedule_srv: Pus11Wrapper<TcInMemConverter, VerificationReporter>,
test_srv: Service17CustomWrapper<TcInMemConverter, VerificationReporter>,
} }
impl<TcInMemConverter: EcssTcInMemConverter> PusStack<TcInMemConverter> { impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> PusStack<TcInMemConverter, VerificationReporter>
{
pub fn new( pub fn new(
hk_srv: Pus3Wrapper<TcInMemConverter>, hk_srv: Pus3Wrapper<TcInMemConverter, VerificationReporter>,
event_srv: Pus5Wrapper<TcInMemConverter>, event_srv: Pus5Wrapper<TcInMemConverter, VerificationReporter>,
action_srv: Pus8Wrapper<TcInMemConverter>, action_srv: Pus8Wrapper<TcInMemConverter, VerificationReporter>,
schedule_srv: Pus11Wrapper<TcInMemConverter>, schedule_srv: Pus11Wrapper<TcInMemConverter, VerificationReporter>,
test_srv: Service17CustomWrapper<TcInMemConverter>, test_srv: Service17CustomWrapper<TcInMemConverter, VerificationReporter>,
) -> Self { ) -> Self {
Self { Self {
event_srv, event_srv,

View File

@ -2,12 +2,13 @@ use log::{info, warn};
use satrs::params::Params; use satrs::params::Params;
use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::test::PusService17TestHandler; use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReportingProvider};
use satrs::pus::verification::{ use satrs::pus::verification::{
FailParams, VerificationReporterWithSender, VerificationReportingProvider, VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
}; };
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver,
MpscTmInSharedPoolSender, PusPacketHandlerResult, PusServiceHelper, PusPacketHandlerResult, PusServiceHelper, TmAsVecSenderWithId, TmInSharedPoolSenderWithId,
}; };
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::ecss::PusPacket;
@ -21,13 +22,16 @@ use std::sync::mpsc::{self, Sender};
pub fn create_test_service_static( pub fn create_test_service_static(
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::SyncSender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>, event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> Service17CustomWrapper<EcssTcInSharedStoreConverter> { ) -> Service17CustomWrapper<
let test_srv_tm_sender = MpscTmInSharedPoolSender::new( EcssTcInSharedStoreConverter,
VerificationReporterWithSharedPoolMpscBoundedSender,
> {
let test_srv_tm_sender = TmInSharedPoolSenderWithId::new(
TmSenderId::PusTest as ChannelId, TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER", "PUS_17_TM_SENDER",
shared_tm_store.clone(), shared_tm_store.clone(),
@ -53,11 +57,11 @@ pub fn create_test_service_static(
pub fn create_test_service_dynamic( pub fn create_test_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithVecMpscSender,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>, event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> Service17CustomWrapper<EcssTcInVecConverter> { ) -> Service17CustomWrapper<EcssTcInVecConverter, VerificationReporterWithVecMpscSender> {
let test_srv_tm_sender = MpscTmAsVecSender::new( let test_srv_tm_sender = TmAsVecSenderWithId::new(
TmSenderId::PusTest as ChannelId, TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER", "PUS_17_TM_SENDER",
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
@ -80,12 +84,19 @@ pub fn create_test_service_dynamic(
} }
} }
pub struct Service17CustomWrapper<TcInMemConverter: EcssTcInMemConverter> { pub struct Service17CustomWrapper<
pub pus17_handler: PusService17TestHandler<TcInMemConverter, VerificationReporterWithSender>, TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> {
pub pus17_handler: PusService17TestHandler<TcInMemConverter, VerificationReporter>,
pub test_srv_event_sender: Sender<(EventU32, Option<Params>)>, pub test_srv_event_sender: Sender<(EventU32, Option<Params>)>,
} }
impl<TcInMemConverter: EcssTcInMemConverter> Service17CustomWrapper<TcInMemConverter> { impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> Service17CustomWrapper<TcInMemConverter, VerificationReporter>
{
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
let res = self.pus17_handler.handle_one_tc(); let res = self.pus17_handler.handle_one_tc();
if res.is_err() { if res.is_err() {

View File

@ -1,6 +1,6 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::mpsc::{Receiver, Sender}, sync::mpsc::{self},
}; };
use log::info; use log::info;
@ -77,16 +77,16 @@ impl TmFunnelCommon {
pub struct TmFunnelStatic { pub struct TmFunnelStatic {
common: TmFunnelCommon, common: TmFunnelCommon,
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
tm_funnel_rx: Receiver<StoreAddr>, tm_funnel_rx: mpsc::Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>, tm_server_tx: mpsc::SyncSender<StoreAddr>,
} }
impl TmFunnelStatic { impl TmFunnelStatic {
pub fn new( pub fn new(
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
sync_tm_tcp_source: SyncTcpTmSource, sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: Receiver<StoreAddr>, tm_funnel_rx: mpsc::Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>, tm_server_tx: mpsc::SyncSender<StoreAddr>,
) -> Self { ) -> Self {
Self { Self {
common: TmFunnelCommon::new(sync_tm_tcp_source), common: TmFunnelCommon::new(sync_tm_tcp_source),
@ -123,15 +123,15 @@ impl TmFunnelStatic {
pub struct TmFunnelDynamic { pub struct TmFunnelDynamic {
common: TmFunnelCommon, common: TmFunnelCommon,
tm_funnel_rx: Receiver<Vec<u8>>, tm_funnel_rx: mpsc::Receiver<Vec<u8>>,
tm_server_tx: Sender<Vec<u8>>, tm_server_tx: mpsc::Sender<Vec<u8>>,
} }
impl TmFunnelDynamic { impl TmFunnelDynamic {
pub fn new( pub fn new(
sync_tm_tcp_source: SyncTcpTmSource, sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: Receiver<Vec<u8>>, tm_funnel_rx: mpsc::Receiver<Vec<u8>>,
tm_server_tx: Sender<Vec<u8>>, tm_server_tx: mpsc::Sender<Vec<u8>>,
) -> Self { ) -> Self {
Self { Self {
common: TmFunnelCommon::new(sync_tm_tcp_source), common: TmFunnelCommon::new(sync_tm_tcp_source),

View File

@ -1,7 +1,10 @@
use log::warn; use log::warn;
use satrs::pus::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use satrs::pus::{EcssTcAndToken, ReceivesEcssPusTc}; use satrs::pus::{EcssTcAndToken, ReceivesEcssPusTc};
use satrs::spacepackets::SpHeader; use satrs::spacepackets::SpHeader;
use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; use std::sync::mpsc::{self, Receiver, SendError, Sender, SyncSender, TryRecvError};
use thiserror::Error; use thiserror::Error;
use crate::pus::PusReceiver; use crate::pus::PusReceiver;
@ -37,7 +40,7 @@ impl SharedTcPool {
#[derive(Clone)] #[derive(Clone)]
pub struct PusTcSourceProviderSharedPool { pub struct PusTcSourceProviderSharedPool {
pub tc_source: Sender<StoreAddr>, pub tc_source: SyncSender<StoreAddr>,
pub shared_pool: SharedTcPool, pub shared_pool: SharedTcPool,
} }
@ -97,14 +100,14 @@ pub struct TcSourceTaskStatic {
shared_tc_pool: SharedTcPool, shared_tc_pool: SharedTcPool,
tc_receiver: Receiver<StoreAddr>, tc_receiver: Receiver<StoreAddr>,
tc_buf: [u8; 4096], tc_buf: [u8; 4096],
pus_receiver: PusReceiver, pus_receiver: PusReceiver<VerificationReporterWithSharedPoolMpscBoundedSender>,
} }
impl TcSourceTaskStatic { impl TcSourceTaskStatic {
pub fn new( pub fn new(
shared_tc_pool: SharedTcPool, shared_tc_pool: SharedTcPool,
tc_receiver: Receiver<StoreAddr>, tc_receiver: Receiver<StoreAddr>,
pus_receiver: PusReceiver, pus_receiver: PusReceiver<VerificationReporterWithSharedPoolMpscBoundedSender>,
) -> Self { ) -> Self {
Self { Self {
shared_tc_pool, shared_tc_pool,
@ -161,11 +164,14 @@ impl TcSourceTaskStatic {
// TC source components where the heap is the backing memory of the received telecommands. // TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTaskDynamic { pub struct TcSourceTaskDynamic {
pub tc_receiver: Receiver<Vec<u8>>, pub tc_receiver: Receiver<Vec<u8>>,
pus_receiver: PusReceiver, pus_receiver: PusReceiver<VerificationReporterWithVecMpscSender>,
} }
impl TcSourceTaskDynamic { impl TcSourceTaskDynamic {
pub fn new(tc_receiver: Receiver<Vec<u8>>, pus_receiver: PusReceiver) -> Self { pub fn new(
tc_receiver: Receiver<Vec<u8>>,
pus_receiver: PusReceiver<VerificationReporterWithVecMpscSender>,
) -> Self {
Self { Self {
tc_receiver, tc_receiver,
pus_receiver, pus_receiver,

View File

@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- `ListenerTable` -> `ListenerMapProvider` - `ListenerTable` -> `ListenerMapProvider`
- `SenderTable` -> `SenderMapProvider` - `SenderTable` -> `SenderMapProvider`
- There is an `EventManagerWithMpsc` and a `EventManagerWithBoundedMpsc` helper type now. - There is an `EventManagerWithMpsc` and a `EventManagerWithBoundedMpsc` helper type now.
- Refactored ECSS TM sender abstractions to be generic over different message queue backends.
- Refactored Verification Reporter abstractions and implementation to be generic over the sender
instead of using trait objects.
## Fixed ## Fixed

View File

@ -7,7 +7,7 @@ use spacepackets::ByteConversionError;
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use stdmod::*; pub use std_mod::*;
pub const CRC_32: Crc<u32> = Crc::<u32>::new(&CRC_32_CKSUM); pub const CRC_32: Crc<u32> = Crc::<u32>::new(&CRC_32_CKSUM);
@ -148,12 +148,11 @@ pub trait VirtualFilestore {
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod stdmod { pub mod std_mod {
use super::*; use super::*;
use std::{ use std::{
fs::{self, File, OpenOptions}, fs::{self, File, OpenOptions},
io::{BufReader, Read, Seek, SeekFrom, Write}, io::{BufReader, Read, Seek, SeekFrom, Write},
path::Path,
}; };
#[derive(Default)] #[derive(Default)]

View File

@ -85,12 +85,8 @@ pub trait EventSendProvider<EV: GenericEvent, AuxDataProvider = Params> {
/// Generic abstraction for an event receiver. /// Generic abstraction for an event receiver.
pub trait EventReceiveProvider<Event: GenericEvent, AuxDataProvider = Params> { pub trait EventReceiveProvider<Event: GenericEvent, AuxDataProvider = Params> {
/// This function has to be provided by any event receiver. A receive call may or may not return /// This function has to be provided by any event receiver. A call may or may not return
/// an event. /// an event and optional auxiliary data.
///
/// To allow returning arbitrary additional auxiliary data, a mutable slice is passed to the
/// [Self::receive] call as well. Receivers can write data to this slice, but care must be taken
/// to avoid panics due to size missmatches or out of bound writes.
fn try_recv_event(&self) -> Option<(Event, Option<AuxDataProvider>)>; fn try_recv_event(&self) -> Option<(Event, Option<AuxDataProvider>)>;
} }
@ -433,10 +429,7 @@ pub mod alloc_mod {
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod std_mod { pub mod std_mod {
use super::*; use super::*;
use crate::event_man::{EventReceiveProvider, EventWithAuxData}; use std::sync::mpsc;
use crate::events::{EventU16, EventU32, GenericEvent};
use crate::params::Params;
use std::sync::mpsc::{self};
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> { pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> {
mpsc_receiver: mpsc::Receiver<(Event, Option<Params>)>, mpsc_receiver: mpsc::Receiver<(Event, Option<Params>)>,

View File

@ -269,7 +269,7 @@ mod tests {
} }
impl EcssChannel for TestSender { impl EcssChannel for TestSender {
fn id(&self) -> ChannelId { fn channel_id(&self) -> ChannelId {
0 0
} }
} }

View File

@ -41,8 +41,8 @@ pub trait PusEventMgmtBackendProvider<Event: GenericEvent> {
#[cfg(feature = "heapless")] #[cfg(feature = "heapless")]
pub mod heapless_mod { pub mod heapless_mod {
use super::*; use super::*;
use crate::events::{GenericEvent, LargestEventRaw}; use crate::events::LargestEventRaw;
use std::marker::PhantomData; use core::marker::PhantomData;
#[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))]
// TODO: After a new version of heapless is released which uses hash32 version 0.3, try using // TODO: After a new version of heapless is released which uses hash32 version 0.3, try using
@ -257,9 +257,8 @@ pub mod alloc_mod {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::events::SeverityInfo; use crate::{events::SeverityInfo, pus::TmAsVecSenderWithMpsc};
use crate::pus::MpscTmAsVecSender; use std::sync::mpsc::{self, TryRecvError};
use std::sync::mpsc::{channel, TryRecvError};
const INFO_EVENT: EventU32TypedSev<SeverityInfo> = const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::const_new(1, 0); EventU32TypedSev::<SeverityInfo>::const_new(1, 0);
@ -279,8 +278,8 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
let mut event_man = create_basic_man_1(); let mut event_man = create_basic_man_1();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = mpsc::channel();
let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx); let mut sender = TmAsVecSenderWithMpsc::new(0, "test_sender", event_tx);
let event_sent = event_man let event_sent = event_man
.generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None) .generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None)
.expect("Sending info event failed"); .expect("Sending info event failed");
@ -293,8 +292,8 @@ mod tests {
#[test] #[test]
fn test_disable_event() { fn test_disable_event() {
let mut event_man = create_basic_man_2(); let mut event_man = create_basic_man_2();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = mpsc::channel();
let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); let mut sender = TmAsVecSenderWithMpsc::new(0, "test", event_tx);
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
assert!(res.is_ok()); assert!(res.is_ok());
assert!(res.unwrap()); assert!(res.unwrap());
@ -316,8 +315,8 @@ mod tests {
#[test] #[test]
fn test_reenable_event() { fn test_reenable_event() {
let mut event_man = create_basic_man_1(); let mut event_man = create_basic_man_1();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = mpsc::channel();
let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); let mut sender = TmAsVecSenderWithMpsc::new(0, "test", event_tx);
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
assert!(res.is_ok()); assert!(res.is_ok());
assert!(res.unwrap()); assert!(res.unwrap());

View File

@ -23,11 +23,11 @@ impl<
> PusService5EventHandler<TcInMemConverter, VerificationReporter> > PusService5EventHandler<TcInMemConverter, VerificationReporter>
{ {
pub fn new( pub fn new(
service_handler: PusServiceHelper<TcInMemConverter, VerificationReporter>, service_helper: PusServiceHelper<TcInMemConverter, VerificationReporter>,
event_request_tx: Sender<EventRequestWithToken>, event_request_tx: Sender<EventRequestWithToken>,
) -> Self { ) -> Self {
Self { Self {
service_helper: service_handler, service_helper,
event_request_tx, event_request_tx,
} }
} }
@ -138,7 +138,9 @@ mod tests {
use crate::pus::event_man::EventRequest; use crate::pus::event_man::EventRequest;
use crate::pus::tests::SimplePusPacketHandler; use crate::pus::tests::SimplePusPacketHandler;
use crate::pus::verification::{RequestId, VerificationReporterWithSender}; use crate::pus::verification::{
RequestId, VerificationReporterWithSharedPoolMpscBoundedSender,
};
use crate::{ use crate::{
events::EventU32, events::EventU32,
pus::{ pus::{
@ -155,8 +157,10 @@ mod tests {
struct Pus5HandlerWithStoreTester { struct Pus5HandlerWithStoreTester {
common: PusServiceHandlerWithSharedStoreCommon, common: PusServiceHandlerWithSharedStoreCommon,
handler: handler: PusService5EventHandler<
PusService5EventHandler<EcssTcInSharedStoreConverter, VerificationReporterWithSender>, EcssTcInSharedStoreConverter,
VerificationReporterWithSharedPoolMpscBoundedSender,
>,
} }
impl Pus5HandlerWithStoreTester { impl Pus5HandlerWithStoreTester {

View File

@ -2,6 +2,8 @@
//! //!
//! This module contains structures to make working with the PUS C standard easier. //! This module contains structures to make working with the PUS C standard easier.
//! The satrs-example application contains various usage examples of these components. //! The satrs-example application contains various usage examples of these components.
use crate::pool::{StoreAddr, StoreError};
use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken};
use crate::queue::{GenericRecvError, GenericSendError}; use crate::queue::{GenericRecvError, GenericSendError};
use crate::ChannelId; use crate::ChannelId;
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
@ -34,8 +36,6 @@ pub mod verification;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
use crate::pool::{StoreAddr, StoreError};
use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken};
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use std_mod::*; pub use std_mod::*;
@ -63,6 +63,7 @@ pub enum EcssTmtcError {
Store(StoreError), Store(StoreError),
Pus(PusError), Pus(PusError),
CantSendAddr(StoreAddr), CantSendAddr(StoreAddr),
CantSendDirectTm,
Send(GenericSendError), Send(GenericSendError),
Recv(GenericRecvError), Recv(GenericRecvError),
} }
@ -82,6 +83,9 @@ impl Display for EcssTmtcError {
EcssTmtcError::CantSendAddr(addr) => { EcssTmtcError::CantSendAddr(addr) => {
write!(f, "can not send address {addr}") write!(f, "can not send address {addr}")
} }
EcssTmtcError::CantSendDirectTm => {
write!(f, "can not send TM directly")
}
EcssTmtcError::Send(send_e) => { EcssTmtcError::Send(send_e) => {
write!(f, "send error {send_e}") write!(f, "send error {send_e}")
} }
@ -123,13 +127,14 @@ impl Error for EcssTmtcError {
EcssTmtcError::Store(e) => Some(e), EcssTmtcError::Store(e) => Some(e),
EcssTmtcError::Pus(e) => Some(e), EcssTmtcError::Pus(e) => Some(e),
EcssTmtcError::Send(e) => Some(e), EcssTmtcError::Send(e) => Some(e),
EcssTmtcError::Recv(e) => Some(e),
_ => None, _ => None,
} }
} }
} }
pub trait EcssChannel: Send { pub trait EcssChannel: Send {
/// Each sender can have an ID associated with it /// Each sender can have an ID associated with it
fn id(&self) -> ChannelId; fn channel_id(&self) -> ChannelId;
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
"unset" "unset"
} }
@ -138,7 +143,7 @@ pub trait EcssChannel: Send {
/// Generic trait for a user supplied sender object. /// Generic trait for a user supplied sender object.
/// ///
/// This sender object is responsible for sending PUS telemetry to a TM sink. /// This sender object is responsible for sending PUS telemetry to a TM sink.
pub trait EcssTmSenderCore: EcssChannel { pub trait EcssTmSenderCore: Send {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError>; fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError>;
} }
@ -146,7 +151,7 @@ pub trait EcssTmSenderCore: EcssChannel {
/// ///
/// This sender object is responsible for sending PUS telecommands to a TC recipient. Each /// This sender object is responsible for sending PUS telecommands to a TC recipient. Each
/// telecommand can optionally have a token which contains its verification state. /// telecommand can optionally have a token which contains its verification state.
pub trait EcssTcSenderCore: EcssChannel { pub trait EcssTcSenderCore {
fn send_tc(&self, tc: PusTcCreator, token: Option<TcStateToken>) -> Result<(), EcssTmtcError>; fn send_tc(&self, tc: PusTcCreator, token: Option<TcStateToken>) -> Result<(), EcssTmtcError>;
} }
@ -221,25 +226,25 @@ impl TryFrom<EcssTcAndToken> for AcceptedEcssTcAndToken {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum TryRecvTmtcError { pub enum TryRecvTmtcError {
Error(EcssTmtcError), Tmtc(EcssTmtcError),
Empty, Empty,
} }
impl From<EcssTmtcError> for TryRecvTmtcError { impl From<EcssTmtcError> for TryRecvTmtcError {
fn from(value: EcssTmtcError) -> Self { fn from(value: EcssTmtcError) -> Self {
Self::Error(value) Self::Tmtc(value)
} }
} }
impl From<PusError> for TryRecvTmtcError { impl From<PusError> for TryRecvTmtcError {
fn from(value: PusError) -> Self { fn from(value: PusError) -> Self {
Self::Error(value.into()) Self::Tmtc(value.into())
} }
} }
impl From<StoreError> for TryRecvTmtcError { impl From<StoreError> for TryRecvTmtcError {
fn from(value: StoreError) -> Self { fn from(value: StoreError) -> Self {
Self::Error(value.into()) Self::Tmtc(value.into())
} }
} }
@ -374,10 +379,9 @@ pub mod std_mod {
use crate::{ChannelId, TargetId}; use crate::{ChannelId, TargetId};
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::vec::Vec; use alloc::vec::Vec;
use crossbeam_channel as cb;
use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::tc::PusTcReader;
use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::tm::PusTmCreator;
use spacepackets::ecss::PusError; use spacepackets::ecss::{PusError, WritablePusPacket};
use spacepackets::time::cds::TimeProvider; use spacepackets::time::cds::TimeProvider;
use spacepackets::time::StdTimestampError; use spacepackets::time::StdTimestampError;
use spacepackets::time::TimeWriter; use spacepackets::time::TimeWriter;
@ -386,6 +390,9 @@ pub mod std_mod {
use std::sync::mpsc::TryRecvError; use std::sync::mpsc::TryRecvError;
use thiserror::Error; use thiserror::Error;
#[cfg(feature = "crossbeam")]
pub use cb_mod::*;
use super::verification::VerificationReportingProvider; use super::verification::VerificationReportingProvider;
use super::{AcceptedEcssTcAndToken, TcInMemory}; use super::{AcceptedEcssTcAndToken, TcInMemory};
@ -395,32 +402,65 @@ pub mod std_mod {
} }
} }
impl From<cb::SendError<StoreAddr>> for EcssTmtcError { impl EcssTmSenderCore for mpsc::Sender<StoreAddr> {
fn from(_: cb::SendError<StoreAddr>) -> Self { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
Self::Send(GenericSendError::RxDisconnected) match tm {
PusTmWrapper::InStore(addr) => self
.send(addr)
.map_err(|_| GenericSendError::RxDisconnected)?,
PusTmWrapper::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm),
};
Ok(())
} }
} }
impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError { impl EcssTmSenderCore for mpsc::SyncSender<StoreAddr> {
fn from(value: cb::TrySendError<StoreAddr>) -> Self { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match value { match tm {
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)), PusTmWrapper::InStore(addr) => self
cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected), .try_send(addr)
.map_err(|e| EcssTmtcError::Send(e.into()))?,
PusTmWrapper::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm),
};
Ok(())
} }
} }
impl EcssTmSenderCore for mpsc::Sender<Vec<u8>> {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm {
PusTmWrapper::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)),
PusTmWrapper::Direct(tm) => self
.send(tm.to_vec()?)
.map_err(|e| EcssTmtcError::Send(e.into()))?,
};
Ok(())
}
}
impl EcssTmSenderCore for mpsc::SyncSender<Vec<u8>> {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm {
PusTmWrapper::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)),
PusTmWrapper::Direct(tm) => self
.send(tm.to_vec()?)
.map_err(|e| EcssTmtcError::Send(e.into()))?,
};
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]
pub struct MpscTmInSharedPoolSender { pub struct TmInSharedPoolSenderWithId<Sender: EcssTmSenderCore> {
id: ChannelId, channel_id: ChannelId,
name: &'static str, name: &'static str,
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
sender: mpsc::Sender<StoreAddr>, sender: Sender,
} }
impl EcssChannel for MpscTmInSharedPoolSender { impl<Sender: EcssTmSenderCore> EcssChannel for TmInSharedPoolSenderWithId<Sender> {
fn id(&self) -> ChannelId { fn channel_id(&self) -> ChannelId {
self.id self.channel_id
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
@ -428,36 +468,31 @@ pub mod std_mod {
} }
} }
impl MpscTmInSharedPoolSender { impl<Sender: EcssTmSenderCore> TmInSharedPoolSenderWithId<Sender> {
pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> { pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> {
let addr = self.shared_tm_store.add_pus_tm(&tm)?; let addr = self.shared_tm_store.add_pus_tm(&tm)?;
self.sender self.sender.send_tm(PusTmWrapper::InStore(addr))
.send(addr)
.map_err(|_| EcssTmtcError::Send(GenericSendError::RxDisconnected))
} }
} }
impl EcssTmSenderCore for MpscTmInSharedPoolSender { impl<Sender: EcssTmSenderCore> EcssTmSenderCore for TmInSharedPoolSenderWithId<Sender> {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { if let PusTmWrapper::Direct(tm) = tm {
PusTmWrapper::InStore(addr) => { return self.send_direct_tm(tm);
self.sender.send(addr)?;
Ok(())
}
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
} }
self.sender.send_tm(tm)
} }
} }
impl MpscTmInSharedPoolSender { impl<Sender: EcssTmSenderCore> TmInSharedPoolSenderWithId<Sender> {
pub fn new( pub fn new(
id: ChannelId, id: ChannelId,
name: &'static str, name: &'static str,
shared_tm_store: SharedTmPool, shared_tm_store: SharedTmPool,
sender: mpsc::Sender<StoreAddr>, sender: Sender,
) -> Self { ) -> Self {
Self { Self {
id, channel_id: id,
name, name,
shared_tm_store, shared_tm_store,
sender, sender,
@ -465,6 +500,51 @@ pub mod std_mod {
} }
} }
pub type TmInSharedPoolSenderWithMpsc = TmInSharedPoolSenderWithId<mpsc::Sender<StoreAddr>>;
pub type TmInSharedPoolSenderWithBoundedMpsc =
TmInSharedPoolSenderWithId<mpsc::SyncSender<StoreAddr>>;
/// This class can be used if frequent heap allocations during run-time are not an issue.
/// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation
/// of this class can not deal with store addresses, so it is assumed that is is always
/// going to be called with direct packets.
#[derive(Clone)]
pub struct TmAsVecSenderWithId<Sender: EcssTmSenderCore> {
id: ChannelId,
name: &'static str,
sender: Sender,
}
impl From<mpsc::SendError<Vec<u8>>> for EcssTmtcError {
fn from(_: mpsc::SendError<Vec<u8>>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl<Sender: EcssTmSenderCore> TmAsVecSenderWithId<Sender> {
pub fn new(id: u32, name: &'static str, sender: Sender) -> Self {
Self { id, sender, name }
}
}
impl<Sender: EcssTmSenderCore> EcssChannel for TmAsVecSenderWithId<Sender> {
fn channel_id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl<Sender: EcssTmSenderCore> EcssTmSenderCore for TmAsVecSenderWithId<Sender> {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
self.sender.send_tm(tm)
}
}
pub type TmAsVecSenderWithMpsc = TmAsVecSenderWithId<mpsc::Sender<Vec<u8>>>;
pub type TmAsVecSenderWithBoundedMpsc = TmAsVecSenderWithId<mpsc::SyncSender<Vec<u8>>>;
pub struct MpscTcReceiver { pub struct MpscTcReceiver {
id: ChannelId, id: ChannelId,
name: &'static str, name: &'static str,
@ -472,7 +552,7 @@ pub mod std_mod {
} }
impl EcssChannel for MpscTcReceiver { impl EcssChannel for MpscTcReceiver {
fn id(&self) -> ChannelId { fn channel_id(&self) -> ChannelId {
self.id self.id
} }
@ -486,7 +566,7 @@ pub mod std_mod {
self.receiver.try_recv().map_err(|e| match e { self.receiver.try_recv().map_err(|e| match e {
TryRecvError::Empty => TryRecvTmtcError::Empty, TryRecvError::Empty => TryRecvTmtcError::Empty,
TryRecvError::Disconnected => { TryRecvError::Disconnected => {
TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) TryRecvTmtcError::Tmtc(EcssTmtcError::from(GenericRecvError::TxDisconnected))
} }
}) })
} }
@ -502,95 +582,50 @@ pub mod std_mod {
} }
} }
/// This class can be used if frequent heap allocations during run-time are not an issue. #[cfg(feature = "crossbeam")]
/// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation pub mod cb_mod {
/// of this class can not deal with store addresses, so it is assumed that is is always use super::*;
/// going to be called with direct packets. use crossbeam_channel as cb;
#[derive(Clone)]
pub struct MpscTmAsVecSender {
id: ChannelId,
name: &'static str,
sender: mpsc::Sender<Vec<u8>>,
}
impl From<mpsc::SendError<Vec<u8>>> for EcssTmtcError { pub type TmInSharedPoolSenderWithCrossbeam =
fn from(_: mpsc::SendError<Vec<u8>>) -> Self { TmInSharedPoolSenderWithId<cb::Sender<StoreAddr>>;
impl From<cb::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: cb::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected) Self::Send(GenericSendError::RxDisconnected)
} }
} }
impl MpscTmAsVecSender { impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError {
pub fn new(id: u32, name: &'static str, sender: mpsc::Sender<Vec<u8>>) -> Self { fn from(value: cb::TrySendError<StoreAddr>) -> Self {
Self { id, sender, name } match value {
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)),
cb::TrySendError::Disconnected(_) => {
Self::Send(GenericSendError::RxDisconnected)
}
}
} }
} }
impl EcssChannel for MpscTmAsVecSender { impl EcssTmSenderCore for cb::Sender<StoreAddr> {
fn id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTmSenderCore for MpscTmAsVecSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { match tm {
PusTmWrapper::InStore(addr) => Err(EcssTmtcError::CantSendAddr(addr)), PusTmWrapper::InStore(addr) => self
PusTmWrapper::Direct(tm) => { .try_send(addr)
let mut vec = Vec::new(); .map_err(|e| EcssTmtcError::Send(e.into()))?,
tm.append_to_vec(&mut vec).map_err(EcssTmtcError::Pus)?; PusTmWrapper::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm),
self.sender.send(vec)?; };
Ok(()) Ok(())
} }
} }
} impl EcssTmSenderCore for cb::Sender<Vec<u8>> {
}
#[derive(Clone)]
pub struct CrossbeamTmInStoreSender {
id: ChannelId,
name: &'static str,
shared_tm_store: SharedTmPool,
sender: crossbeam_channel::Sender<StoreAddr>,
}
impl CrossbeamTmInStoreSender {
pub fn new(
id: ChannelId,
name: &'static str,
shared_tm_store: SharedTmPool,
sender: crossbeam_channel::Sender<StoreAddr>,
) -> Self {
Self {
id,
name,
shared_tm_store,
sender,
}
}
}
impl EcssChannel for CrossbeamTmInStoreSender {
fn id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTmSenderCore for CrossbeamTmInStoreSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { match tm {
PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?, PusTmWrapper::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)),
PusTmWrapper::Direct(tm) => { PusTmWrapper::Direct(tm) => self
let addr = self.shared_tm_store.add_pus_tm(&tm)?; .send(tm.to_vec()?)
self.sender.try_send(addr)?; .map_err(|e| EcssTmtcError::Send(e.into()))?,
} };
}
Ok(()) Ok(())
} }
} }
@ -612,7 +647,7 @@ pub mod std_mod {
} }
impl EcssChannel for CrossbeamTcReceiver { impl EcssChannel for CrossbeamTcReceiver {
fn id(&self) -> ChannelId { fn channel_id(&self) -> ChannelId {
self.id self.id
} }
@ -625,12 +660,13 @@ pub mod std_mod {
fn recv_tc(&self) -> Result<EcssTcAndToken, TryRecvTmtcError> { fn recv_tc(&self) -> Result<EcssTcAndToken, TryRecvTmtcError> {
self.receiver.try_recv().map_err(|e| match e { self.receiver.try_recv().map_err(|e| match e {
cb::TryRecvError::Empty => TryRecvTmtcError::Empty, cb::TryRecvError::Empty => TryRecvTmtcError::Empty,
cb::TryRecvError::Disconnected => { cb::TryRecvError::Disconnected => TryRecvTmtcError::Tmtc(EcssTmtcError::from(
TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) GenericRecvError::TxDisconnected,
} )),
}) })
} }
} }
}
// TODO: All these types could probably be no_std if we implemented error handling ourselves.. // TODO: All these types could probably be no_std if we implemented error handling ourselves..
// but thiserror is really nice, so keep it like this for simplicity for now. Maybe thiserror // but thiserror is really nice, so keep it like this for simplicity for now. Maybe thiserror
@ -906,7 +942,7 @@ pub mod std_mod {
})) }))
} }
Err(e) => match e { Err(e) => match e {
TryRecvTmtcError::Error(e) => Err(PusPacketHandlingError::EcssTmtc(e)), TryRecvTmtcError::Tmtc(e) => Err(PusPacketHandlingError::EcssTmtc(e)),
TryRecvTmtcError::Empty => Ok(None), TryRecvTmtcError::Empty => Ok(None),
}, },
} }
@ -949,6 +985,9 @@ pub mod tests {
use crate::tmtc::tm_helper::SharedTmPool; use crate::tmtc::tm_helper::SharedTmPool;
use crate::TargetId; use crate::TargetId;
use super::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use super::verification::tests::{SharedVerificationMap, TestVerificationReporter}; use super::verification::tests::{SharedVerificationMap, TestVerificationReporter};
use super::verification::{ use super::verification::{
TcStateAccepted, VerificationReporterCfg, VerificationReporterWithSender, TcStateAccepted, VerificationReporterCfg, VerificationReporterWithSender,
@ -956,8 +995,9 @@ pub mod tests {
}; };
use super::{ use super::{
EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, GenericRoutingError, EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, GenericRoutingError,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusRoutingErrorHandler,
PusPacketHandlingError, PusRoutingErrorHandler, PusServiceHelper, TcInMemory, PusServiceHelper, TcInMemory, TmAsVecSenderWithId, TmInSharedPoolSenderWithBoundedMpsc,
TmInSharedPoolSenderWithId,
}; };
pub const TEST_APID: u16 = 0x101; pub const TEST_APID: u16 = 0x101;
@ -1002,9 +1042,9 @@ pub mod tests {
tm_buf: [u8; 2048], tm_buf: [u8; 2048],
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
tm_pool: SharedTmPool, tm_pool: SharedTmPool,
tc_sender: mpsc::Sender<EcssTcAndToken>, tc_sender: mpsc::SyncSender<EcssTcAndToken>,
tm_receiver: mpsc::Receiver<StoreAddr>, tm_receiver: mpsc::Receiver<StoreAddr>,
verification_handler: VerificationReporterWithSender, verification_handler: VerificationReporterWithSharedPoolMpscBoundedSender,
} }
impl PusServiceHandlerWithSharedStoreCommon { impl PusServiceHandlerWithSharedStoreCommon {
@ -1014,17 +1054,20 @@ pub mod tests {
/// The PUS service handler is instantiated with a [EcssTcInStoreConverter]. /// The PUS service handler is instantiated with a [EcssTcInStoreConverter].
pub fn new() -> ( pub fn new() -> (
Self, Self,
PusServiceHelper<EcssTcInSharedStoreConverter, VerificationReporterWithSender>, PusServiceHelper<
EcssTcInSharedStoreConverter,
VerificationReporterWithSharedPoolMpscBoundedSender,
>,
) { ) {
let pool_cfg = StaticPoolConfig::new(alloc::vec![(16, 16), (8, 32), (4, 64)], false); let pool_cfg = StaticPoolConfig::new(alloc::vec![(16, 16), (8, 32), (4, 64)], false);
let tc_pool = StaticMemoryPool::new(pool_cfg.clone()); let tc_pool = StaticMemoryPool::new(pool_cfg.clone());
let tm_pool = StaticMemoryPool::new(pool_cfg); let tm_pool = StaticMemoryPool::new(pool_cfg);
let shared_tc_pool = SharedStaticMemoryPool::new(RwLock::new(tc_pool)); let shared_tc_pool = SharedStaticMemoryPool::new(RwLock::new(tc_pool));
let shared_tm_pool = SharedTmPool::new(tm_pool); let shared_tm_pool = SharedTmPool::new(tm_pool);
let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::sync_channel(10);
let (tm_tx, tm_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::sync_channel(10);
let verif_sender = MpscTmInSharedPoolSender::new( let verif_sender = TmInSharedPoolSenderWithBoundedMpsc::new(
0, 0,
"verif_sender", "verif_sender",
shared_tm_pool.clone(), shared_tm_pool.clone(),
@ -1032,9 +1075,9 @@ pub mod tests {
); );
let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let verification_handler = let verification_handler =
VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); VerificationReporterWithSharedPoolMpscBoundedSender::new(&verif_cfg, verif_sender);
let test_srv_tm_sender = let test_srv_tm_sender =
MpscTmInSharedPoolSender::new(0, "TEST_SENDER", shared_tm_pool.clone(), tm_tx); TmInSharedPoolSenderWithId::new(0, "TEST_SENDER", shared_tm_pool.clone(), tm_tx);
let test_srv_tc_receiver = MpscTcReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx);
let in_store_converter = let in_store_converter =
EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048); EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048);
@ -1115,20 +1158,20 @@ pub mod tests {
pub verification_handler: VerificationReporter, pub verification_handler: VerificationReporter,
} }
impl PusServiceHandlerWithVecCommon<VerificationReporterWithSender> { impl PusServiceHandlerWithVecCommon<VerificationReporterWithVecMpscSender> {
pub fn new_with_standard_verif_reporter() -> ( pub fn new_with_standard_verif_reporter() -> (
Self, Self,
PusServiceHelper<EcssTcInVecConverter, VerificationReporterWithSender>, PusServiceHelper<EcssTcInVecConverter, VerificationReporterWithVecMpscSender>,
) { ) {
let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel();
let verif_sender = MpscTmAsVecSender::new(0, "verififcatio-sender", tm_tx.clone()); let verif_sender = TmAsVecSenderWithId::new(0, "verififcatio-sender", tm_tx.clone());
let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let verification_handler = let verification_handler =
VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); VerificationReporterWithSender::new(&verif_cfg, verif_sender);
let test_srv_tm_sender = MpscTmAsVecSender::new(0, "test-sender", tm_tx); let test_srv_tm_sender = TmAsVecSenderWithId::new(0, "test-sender", tm_tx);
let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx);
let in_store_converter = EcssTcInVecConverter::default(); let in_store_converter = EcssTcInVecConverter::default();
( (
@ -1157,7 +1200,7 @@ pub mod tests {
let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel();
let test_srv_tm_sender = MpscTmAsVecSender::new(0, "test-sender", tm_tx); let test_srv_tm_sender = TmAsVecSenderWithId::new(0, "test-sender", tm_tx);
let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx);
let in_store_converter = EcssTcInVecConverter::default(); let in_store_converter = EcssTcInVecConverter::default();
let shared_verif_map = SharedVerificationMap::default(); let shared_verif_map = SharedVerificationMap::default();

View File

@ -174,7 +174,7 @@ impl<
mod tests { mod tests {
use crate::pool::{StaticMemoryPool, StaticPoolConfig}; use crate::pool::{StaticMemoryPool, StaticPoolConfig};
use crate::pus::tests::TEST_APID; use crate::pus::tests::TEST_APID;
use crate::pus::verification::VerificationReporterWithSender; use crate::pus::verification::VerificationReporterWithSharedPoolMpscBoundedSender;
use crate::pus::{ use crate::pus::{
scheduler::{self, PusSchedulerProvider, TcInfo}, scheduler::{self, PusSchedulerProvider, TcInfo},
tests::{PusServiceHandlerWithSharedStoreCommon, PusTestHarness}, tests::{PusServiceHandlerWithSharedStoreCommon, PusTestHarness},
@ -199,7 +199,7 @@ mod tests {
common: PusServiceHandlerWithSharedStoreCommon, common: PusServiceHandlerWithSharedStoreCommon,
handler: PusService11SchedHandler< handler: PusService11SchedHandler<
EcssTcInSharedStoreConverter, EcssTcInSharedStoreConverter,
VerificationReporterWithSender, VerificationReporterWithSharedPoolMpscBoundedSender,
TestScheduler, TestScheduler,
>, >,
sched_tc_pool: StaticMemoryPool, sched_tc_pool: StaticMemoryPool,

View File

@ -104,7 +104,10 @@ mod tests {
PusServiceHandlerWithSharedStoreCommon, PusServiceHandlerWithVecCommon, PusTestHarness, PusServiceHandlerWithSharedStoreCommon, PusServiceHandlerWithVecCommon, PusTestHarness,
SimplePusPacketHandler, TEST_APID, SimplePusPacketHandler, TEST_APID,
}; };
use crate::pus::verification::{RequestId, VerificationReporterWithSender}; use crate::pus::verification::std_mod::{
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
};
use crate::pus::verification::RequestId;
use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::verification::{TcStateAccepted, VerificationToken};
use crate::pus::{ use crate::pus::{
EcssTcInSharedStoreConverter, EcssTcInVecConverter, PusPacketHandlerResult, EcssTcInSharedStoreConverter, EcssTcInVecConverter, PusPacketHandlerResult,
@ -120,8 +123,10 @@ mod tests {
struct Pus17HandlerWithStoreTester { struct Pus17HandlerWithStoreTester {
common: PusServiceHandlerWithSharedStoreCommon, common: PusServiceHandlerWithSharedStoreCommon,
handler: handler: PusService17TestHandler<
PusService17TestHandler<EcssTcInSharedStoreConverter, VerificationReporterWithSender>, EcssTcInSharedStoreConverter,
VerificationReporterWithSharedPoolMpscBoundedSender,
>,
} }
impl Pus17HandlerWithStoreTester { impl Pus17HandlerWithStoreTester {
@ -158,8 +163,9 @@ mod tests {
} }
struct Pus17HandlerWithVecTester { struct Pus17HandlerWithVecTester {
common: PusServiceHandlerWithVecCommon<VerificationReporterWithSender>, common: PusServiceHandlerWithVecCommon<VerificationReporterWithVecMpscSender>,
handler: PusService17TestHandler<EcssTcInVecConverter, VerificationReporterWithSender>, handler:
PusService17TestHandler<EcssTcInVecConverter, VerificationReporterWithVecMpscSender>,
} }
impl Pus17HandlerWithVecTester { impl Pus17HandlerWithVecTester {

View File

@ -20,7 +20,7 @@
//! VerificationReportingProvider, VerificationReporterCfg, VerificationReporterWithSender //! VerificationReportingProvider, VerificationReporterCfg, VerificationReporterWithSender
//! }; //! };
//! use satrs::seq_count::SeqCountProviderSimple; //! use satrs::seq_count::SeqCountProviderSimple;
//! use satrs::pus::MpscTmInSharedPoolSender; //! use satrs::pus::TmInSharedPoolSenderWithMpsc;
//! use satrs::tmtc::tm_helper::SharedTmPool; //! use satrs::tmtc::tm_helper::SharedTmPool;
//! use spacepackets::ecss::PusPacket; //! use spacepackets::ecss::PusPacket;
//! use spacepackets::SpHeader; //! use spacepackets::SpHeader;
@ -35,9 +35,9 @@
//! let shared_tm_store = SharedTmPool::new(tm_pool); //! let shared_tm_store = SharedTmPool::new(tm_pool);
//! let tm_store = shared_tm_store.clone_backing_pool(); //! let tm_store = shared_tm_store.clone_backing_pool();
//! let (verif_tx, verif_rx) = mpsc::channel(); //! let (verif_tx, verif_rx) = mpsc::channel();
//! let sender = MpscTmInSharedPoolSender::new(0, "Test Sender", shared_tm_store, verif_tx); //! let sender = TmInSharedPoolSenderWithMpsc::new(0, "Test Sender", shared_tm_store, verif_tx);
//! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
//! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! let mut reporter = VerificationReporterWithSender::new(&cfg , sender);
//! //!
//! let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); //! let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap();
//! let tc_header = PusTcSecondaryHeader::new_simple(17, 1); //! let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
@ -95,10 +95,11 @@ pub use crate::seq_count::SeqCountProviderSimple;
pub use spacepackets::ecss::verification::*; pub use spacepackets::ecss::verification::*;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::{ #[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "alloc")))]
VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, pub use alloc_mod::*;
};
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "std")))]
pub use std_mod::*; pub use std_mod::*;
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard. /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard.
@ -949,15 +950,13 @@ impl VerificationReporterCore {
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
mod alloc_mod { pub mod alloc_mod {
use super::*; use super::*;
use crate::pus::alloc_mod::EcssTmSender; use crate::{
use crate::seq_count::SequenceCountProvider; pus::{TmAsVecSenderWithId, TmInSharedPoolSenderWithId},
use alloc::boxed::Box; seq_count::SequenceCountProvider,
use alloc::vec; };
use alloc::vec::Vec;
use core::cell::RefCell; use core::cell::RefCell;
use spacepackets::ecss::tc::IsPusTelecommand;
#[derive(Clone)] #[derive(Clone)]
pub struct VerificationReporterCfg { pub struct VerificationReporterCfg {
@ -992,9 +991,9 @@ mod alloc_mod {
/// TM funnel. This helper will always set those fields to 0. /// TM funnel. This helper will always set those fields to 0.
#[derive(Clone)] #[derive(Clone)]
pub struct VerificationReporter { pub struct VerificationReporter {
source_data_buf: RefCell<Vec<u8>>, source_data_buf: RefCell<alloc::vec::Vec<u8>>,
pub seq_count_provider: Option<Box<dyn SequenceCountProvider<u16> + Send>>, pub seq_count_provider: Option<alloc::boxed::Box<dyn SequenceCountProvider<u16> + Send>>,
pub msg_count_provider: Option<Box<dyn SequenceCountProvider<u16> + Send>>, pub msg_count_provider: Option<alloc::boxed::Box<dyn SequenceCountProvider<u16> + Send>>,
pub reporter: VerificationReporterCore, pub reporter: VerificationReporterCore,
} }
@ -1002,7 +1001,7 @@ mod alloc_mod {
pub fn new(cfg: &VerificationReporterCfg) -> Self { pub fn new(cfg: &VerificationReporterCfg) -> Self {
let reporter = VerificationReporterCore::new(cfg.apid).unwrap(); let reporter = VerificationReporterCore::new(cfg.apid).unwrap();
Self { Self {
source_data_buf: RefCell::new(vec![ source_data_buf: RefCell::new(alloc::vec![
0; 0;
RequestId::SIZE_AS_BYTES RequestId::SIZE_AS_BYTES
+ cfg.step_field_width + cfg.step_field_width
@ -1269,21 +1268,18 @@ mod alloc_mod {
/// Helper object which caches the sender passed as a trait object. Provides the same /// Helper object which caches the sender passed as a trait object. Provides the same
/// API as [VerificationReporter] but without the explicit sender arguments. /// API as [VerificationReporter] but without the explicit sender arguments.
#[derive(Clone)] #[derive(Clone)]
pub struct VerificationReporterWithSender { pub struct VerificationReporterWithSender<Sender: EcssTmSenderCore + Clone> {
pub reporter: VerificationReporter, pub reporter: VerificationReporter,
pub sender: Box<dyn EcssTmSender>, pub sender: Sender,
} }
impl VerificationReporterWithSender { impl<Sender: EcssTmSenderCore + Clone> VerificationReporterWithSender<Sender> {
pub fn new(cfg: &VerificationReporterCfg, sender: Box<dyn EcssTmSender>) -> Self { pub fn new(cfg: &VerificationReporterCfg, sender: Sender) -> Self {
let reporter = VerificationReporter::new(cfg); let reporter = VerificationReporter::new(cfg);
Self::new_from_reporter(reporter, sender) Self::new_from_reporter(reporter, sender)
} }
pub fn new_from_reporter( pub fn new_from_reporter(reporter: VerificationReporter, sender: Sender) -> Self {
reporter: VerificationReporter,
sender: Box<dyn EcssTmSender>,
) -> Self {
Self { reporter, sender } Self { reporter, sender }
} }
@ -1297,7 +1293,9 @@ mod alloc_mod {
} }
} }
impl VerificationReportingProvider for VerificationReporterWithSender { impl<Sender: EcssTmSenderCore + Clone> VerificationReportingProvider
for VerificationReporterWithSender<Sender>
{
delegate! { delegate! {
to self.reporter { to self.reporter {
fn add_tc( fn add_tc(
@ -1315,7 +1313,7 @@ mod alloc_mod {
) -> Result<VerificationToken<TcStateAccepted>, VerificationOrSendErrorWithToken<TcStateNone>> ) -> Result<VerificationToken<TcStateAccepted>, VerificationOrSendErrorWithToken<TcStateNone>>
{ {
self.reporter self.reporter
.acceptance_success(token, self.sender.as_ref(), time_stamp) .acceptance_success(token, &self.sender, time_stamp)
} }
fn acceptance_failure( fn acceptance_failure(
@ -1324,7 +1322,7 @@ mod alloc_mod {
params: FailParams, params: FailParams,
) -> Result<(), VerificationOrSendErrorWithToken<TcStateNone>> { ) -> Result<(), VerificationOrSendErrorWithToken<TcStateNone>> {
self.reporter self.reporter
.acceptance_failure(token, self.sender.as_ref(), params) .acceptance_failure(token, &self.sender, params)
} }
fn start_success( fn start_success(
@ -1335,8 +1333,7 @@ mod alloc_mod {
VerificationToken<TcStateStarted>, VerificationToken<TcStateStarted>,
VerificationOrSendErrorWithToken<TcStateAccepted>, VerificationOrSendErrorWithToken<TcStateAccepted>,
> { > {
self.reporter self.reporter.start_success(token, &self.sender, time_stamp)
.start_success(token, self.sender.as_ref(), time_stamp)
} }
fn start_failure( fn start_failure(
@ -1344,8 +1341,7 @@ mod alloc_mod {
token: VerificationToken<TcStateAccepted>, token: VerificationToken<TcStateAccepted>,
params: FailParams, params: FailParams,
) -> Result<(), VerificationOrSendErrorWithToken<TcStateAccepted>> { ) -> Result<(), VerificationOrSendErrorWithToken<TcStateAccepted>> {
self.reporter self.reporter.start_failure(token, &self.sender, params)
.start_failure(token, self.sender.as_ref(), params)
} }
fn step_success( fn step_success(
@ -1355,7 +1351,7 @@ mod alloc_mod {
step: impl EcssEnumeration, step: impl EcssEnumeration,
) -> Result<(), EcssTmtcError> { ) -> Result<(), EcssTmtcError> {
self.reporter self.reporter
.step_success(token, self.sender.as_ref(), time_stamp, step) .step_success(token, &self.sender, time_stamp, step)
} }
fn step_failure( fn step_failure(
@ -1363,8 +1359,7 @@ mod alloc_mod {
token: VerificationToken<TcStateStarted>, token: VerificationToken<TcStateStarted>,
params: FailParamsWithStep, params: FailParamsWithStep,
) -> Result<(), VerificationOrSendErrorWithToken<TcStateStarted>> { ) -> Result<(), VerificationOrSendErrorWithToken<TcStateStarted>> {
self.reporter self.reporter.step_failure(token, &self.sender, params)
.step_failure(token, self.sender.as_ref(), params)
} }
fn completion_success<TcState: WasAtLeastAccepted + Copy>( fn completion_success<TcState: WasAtLeastAccepted + Copy>(
@ -1373,7 +1368,7 @@ mod alloc_mod {
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<(), VerificationOrSendErrorWithToken<TcState>> { ) -> Result<(), VerificationOrSendErrorWithToken<TcState>> {
self.reporter self.reporter
.completion_success(token, self.sender.as_ref(), time_stamp) .completion_success(token, &self.sender, time_stamp)
} }
fn completion_failure<TcState: WasAtLeastAccepted + Copy>( fn completion_failure<TcState: WasAtLeastAccepted + Copy>(
@ -1382,18 +1377,34 @@ mod alloc_mod {
params: FailParams, params: FailParams,
) -> Result<(), VerificationOrSendErrorWithToken<TcState>> { ) -> Result<(), VerificationOrSendErrorWithToken<TcState>> {
self.reporter self.reporter
.completion_failure(token, self.sender.as_ref(), params) .completion_failure(token, &self.sender, params)
} }
} }
pub type VerificationReporterWithSharedPoolSender<Sender> =
VerificationReporterWithSender<TmInSharedPoolSenderWithId<Sender>>;
pub type VerificationReporterWithVecSender<Sender> =
VerificationReporterWithSender<TmAsVecSenderWithId<Sender>>;
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
mod std_mod { pub mod std_mod {
use crate::pus::verification::VerificationReporterWithSender; use std::sync::mpsc;
use std::sync::{Arc, Mutex};
pub type StdVerifReporterWithSender = VerificationReporterWithSender; use crate::pool::StoreAddr;
pub type SharedStdVerifReporterWithSender = Arc<Mutex<StdVerifReporterWithSender>>;
use super::alloc_mod::{
VerificationReporterWithSharedPoolSender, VerificationReporterWithVecSender,
};
pub type VerificationReporterWithSharedPoolMpscSender =
VerificationReporterWithSharedPoolSender<mpsc::Sender<StoreAddr>>;
pub type VerificationReporterWithSharedPoolMpscBoundedSender =
VerificationReporterWithSharedPoolSender<mpsc::SyncSender<StoreAddr>>;
pub type VerificationReporterWithVecMpscSender =
VerificationReporterWithVecSender<mpsc::Sender<alloc::vec::Vec<u8>>>;
pub type VerificationReporterWithVecMpscBoundedSender =
VerificationReporterWithVecSender<mpsc::SyncSender<alloc::vec::Vec<u8>>>;
} }
#[cfg(test)] #[cfg(test)]
@ -1405,10 +1416,11 @@ pub mod tests {
VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender,
VerificationToken, VerificationToken,
}; };
use crate::pus::{EcssChannel, MpscTmInSharedPoolSender, PusTmWrapper}; use crate::pus::{
EcssChannel, PusTmWrapper, TmInSharedPoolSenderWithId, TmInSharedPoolSenderWithMpsc,
};
use crate::tmtc::tm_helper::SharedTmPool; use crate::tmtc::tm_helper::SharedTmPool;
use crate::ChannelId; use crate::ChannelId;
use alloc::boxed::Box;
use alloc::format; use alloc::format;
use alloc::sync::Arc; use alloc::sync::Arc;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -1637,7 +1649,7 @@ pub mod tests {
} }
impl EcssChannel for TestSender { impl EcssChannel for TestSender {
fn id(&self) -> ChannelId { fn channel_id(&self) -> ChannelId {
0 0
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
@ -1688,13 +1700,13 @@ pub mod tests {
&mut self.vr &mut self.vr
} }
} }
struct TestBaseWithHelper<'a> { struct TestBaseWithHelper<'a, Sender: EcssTmSenderCore + Clone + 'static> {
helper: VerificationReporterWithSender, helper: VerificationReporterWithSender<Sender>,
#[allow(dead_code)] #[allow(dead_code)]
tc: PusTcCreator<'a>, tc: PusTcCreator<'a>,
} }
impl<'a> TestBaseWithHelper<'a> { impl<'a, Sender: EcssTmSenderCore + Clone + 'static> TestBaseWithHelper<'a, Sender> {
fn rep(&mut self) -> &mut VerificationReporter { fn rep(&mut self) -> &mut VerificationReporter {
&mut self.helper.reporter &mut self.helper.reporter
} }
@ -1725,12 +1737,15 @@ pub mod tests {
(TestBase { vr: reporter, tc }, init_tok) (TestBase { vr: reporter, tc }, init_tok)
} }
fn base_with_helper_init() -> (TestBaseWithHelper<'static>, VerificationToken<TcStateNone>) { fn base_with_helper_init() -> (
TestBaseWithHelper<'static, TestSender>,
VerificationToken<TcStateNone>,
) {
let mut reporter = base_reporter(); let mut reporter = base_reporter();
let (tc, _) = base_tc_init(None); let (tc, _) = base_tc_init(None);
let init_tok = reporter.add_tc(&tc); let init_tok = reporter.add_tc(&tc);
let sender = TestSender::default(); let sender = TestSender::default();
let helper = VerificationReporterWithSender::new_from_reporter(reporter, Box::new(sender)); let helper = VerificationReporterWithSender::new_from_reporter(reporter, sender);
(TestBaseWithHelper { helper, tc }, init_tok) (TestBaseWithHelper { helper, tc }, init_tok)
} }
@ -1758,7 +1773,7 @@ pub mod tests {
let shared_tm_store = SharedTmPool::new(pool); let shared_tm_store = SharedTmPool::new(pool);
let (tx, _) = mpsc::channel(); let (tx, _) = mpsc::channel();
let mpsc_verif_sender = let mpsc_verif_sender =
MpscTmInSharedPoolSender::new(0, "verif_sender", shared_tm_store, tx); TmInSharedPoolSenderWithMpsc::new(0, "verif_sender", shared_tm_store, tx);
is_send(&mpsc_verif_sender); is_send(&mpsc_verif_sender);
} }
@ -1785,8 +1800,7 @@ pub mod tests {
b.helper b.helper
.acceptance_success(tok, &EMPTY_STAMP) .acceptance_success(tok, &EMPTY_STAMP)
.expect("Sending acceptance success failed"); .expect("Sending acceptance success failed");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); acceptance_check(&mut b.helper.sender, &tok.req_id);
acceptance_check(sender, &tok.req_id);
} }
fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) { fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) {
@ -1830,8 +1844,7 @@ pub mod tests {
b.helper b.helper
.acceptance_failure(tok, fail_params) .acceptance_failure(tok, fail_params)
.expect("Sending acceptance success failed"); .expect("Sending acceptance success failed");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); acceptance_fail_check(&mut b.helper.sender, tok.req_id, stamp_buf);
acceptance_fail_check(sender, tok.req_id, stamp_buf);
} }
#[test] #[test]
@ -1961,8 +1974,7 @@ pub mod tests {
b.helper b.helper
.start_failure(accepted_token, fail_params) .start_failure(accepted_token, fail_params)
.expect("Start failure failure"); .expect("Start failure failure");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); start_fail_check(&mut b.helper.sender, tok.req_id, fail_data_raw);
start_fail_check(sender, tok.req_id, fail_data_raw);
} }
fn step_success_check(sender: &mut TestSender, req_id: RequestId) { fn step_success_check(sender: &mut TestSender, req_id: RequestId) {
@ -2059,9 +2071,8 @@ pub mod tests {
b.helper b.helper
.step_success(&started_token, &EMPTY_STAMP, EcssEnumU8::new(1)) .step_success(&started_token, &EMPTY_STAMP, EcssEnumU8::new(1))
.expect("Sending step 1 success failed"); .expect("Sending step 1 success failed");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); assert_eq!(b.helper.sender.service_queue.borrow().len(), 4);
assert_eq!(sender.service_queue.borrow().len(), 4); step_success_check(&mut b.helper.sender, tok.req_id);
step_success_check(sender, tok.req_id);
} }
fn check_step_failure(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) { fn check_step_failure(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) {
@ -2191,8 +2202,7 @@ pub mod tests {
b.helper b.helper
.step_failure(started_token, fail_params) .step_failure(started_token, fail_params)
.expect("Step failure failed"); .expect("Step failure failed");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); check_step_failure(&mut b.helper.sender, req_id, fail_data_raw);
check_step_failure(sender, req_id, fail_data_raw);
} }
fn completion_fail_check(sender: &mut TestSender, req_id: RequestId) { fn completion_fail_check(sender: &mut TestSender, req_id: RequestId) {
@ -2278,8 +2288,7 @@ pub mod tests {
b.helper b.helper
.completion_failure(started_token, fail_params) .completion_failure(started_token, fail_params)
.expect("Completion failure"); .expect("Completion failure");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); completion_fail_check(&mut b.helper.sender, req_id);
completion_fail_check(sender, req_id);
} }
fn completion_success_check(sender: &mut TestSender, req_id: RequestId) { fn completion_success_check(sender: &mut TestSender, req_id: RequestId) {
@ -2355,8 +2364,7 @@ pub mod tests {
b.helper b.helper
.completion_success(started_token, &EMPTY_STAMP) .completion_success(started_token, &EMPTY_STAMP)
.expect("Sending completion success failed"); .expect("Sending completion success failed");
let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); completion_success_check(&mut b.helper.sender, tok.req_id);
completion_success_check(sender, tok.req_id);
} }
#[test] #[test]
@ -2368,9 +2376,9 @@ pub mod tests {
let shared_tm_pool = shared_tm_store.clone_backing_pool(); let shared_tm_pool = shared_tm_store.clone_backing_pool();
let (verif_tx, verif_rx) = mpsc::channel(); let (verif_tx, verif_rx) = mpsc::channel();
let sender = let sender =
MpscTmInSharedPoolSender::new(0, "Verification Sender", shared_tm_store, verif_tx); TmInSharedPoolSenderWithId::new(0, "Verification Sender", shared_tm_store, verif_tx);
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter = VerificationReporterWithSender::new(&cfg, sender);
let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(17, 1); let tc_header = PusTcSecondaryHeader::new_simple(17, 1);

View File

@ -1,6 +1,8 @@
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::error::Error; use std::error::Error;
#[cfg(feature = "std")]
use std::sync::mpsc;
/// Generic error type for sending something via a message queue. /// Generic error type for sending something via a message queue.
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
@ -47,3 +49,37 @@ impl Display for GenericRecvError {
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl Error for GenericRecvError {} impl Error for GenericRecvError {}
#[cfg(feature = "std")]
impl<T> From<mpsc::SendError<T>> for GenericSendError {
fn from(_: mpsc::SendError<T>) -> Self {
GenericSendError::RxDisconnected
}
}
#[cfg(feature = "std")]
impl<T> From<mpsc::TrySendError<T>> for GenericSendError {
fn from(err: mpsc::TrySendError<T>) -> Self {
match err {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
}
}
}
#[cfg(feature = "crossbeam")]
impl<T> From<crossbeam_channel::SendError<T>> for GenericSendError {
fn from(_: crossbeam_channel::SendError<T>) -> Self {
GenericSendError::RxDisconnected
}
}
#[cfg(feature = "crossbeam")]
impl<T> From<crossbeam_channel::TrySendError<T>> for GenericSendError {
fn from(err: crossbeam_channel::TrySendError<T>) -> Self {
match err {
crossbeam_channel::TrySendError::Full(_) => GenericSendError::QueueFull(None),
crossbeam_channel::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
}
}
}

View File

@ -5,10 +5,10 @@ use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
use satrs::params::U32Pair; use satrs::params::U32Pair;
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher};
use satrs::pus::MpscTmAsVecSender; use satrs::pus::TmAsVecSenderWithMpsc;
use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::tm::PusTmReader;
use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::ecss::{PusError, PusPacket};
use std::sync::mpsc::{channel, SendError, TryRecvError}; use std::sync::mpsc::{self, SendError, TryRecvError};
use std::thread; use std::thread;
const INFO_EVENT: EventU32TypedSev<SeverityInfo> = const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
@ -24,21 +24,21 @@ pub enum CustomTmSenderError {
#[test] #[test]
fn test_threaded_usage() { fn test_threaded_usage() {
let (event_sender, event_man_receiver) = channel(); let (event_sender, event_man_receiver) = mpsc::channel();
let event_receiver = MpscEventU32Receiver::new(event_man_receiver); let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
let mut event_man = EventManagerWithMpsc::new(event_receiver); let mut event_man = EventManagerWithMpsc::new(event_receiver);
let (pus_event_man_tx, pus_event_man_rx) = channel(); let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.channel_id()); event_man.subscribe_all(pus_event_man_send_provider.channel_id());
event_man.add_sender(pus_event_man_send_provider); event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = mpsc::channel();
let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed"); let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed");
let mut pus_event_man = let mut pus_event_man =
PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
// PUS + Generic event manager thread // PUS + Generic event manager thread
let jh0 = thread::spawn(move || { let jh0 = thread::spawn(move || {
let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx); let mut sender = TmAsVecSenderWithMpsc::new(0, "event_sender", event_tx);
let mut event_cnt = 0; let mut event_cnt = 0;
let mut params_array: [u8; 128] = [0; 128]; let mut params_array: [u8; 128] = [0; 128];
loop { loop {

View File

@ -6,7 +6,7 @@ pub mod crossbeam_test {
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
VerificationReportingProvider, VerificationReportingProvider,
}; };
use satrs::pus::CrossbeamTmInStoreSender; use satrs::pus::TmInSharedPoolSenderWithCrossbeam;
use satrs::tmtc::tm_helper::SharedTmPool; use satrs::tmtc::tm_helper::SharedTmPool;
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::tm::PusTmReader;
@ -40,10 +40,13 @@ pub mod crossbeam_test {
let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone(); let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = crossbeam_channel::bounded(10); let (tx, rx) = crossbeam_channel::bounded(10);
let sender = let sender = TmInSharedPoolSenderWithCrossbeam::new(
CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); 0,
let mut reporter_with_sender_0 = "verif_sender",
VerificationReporterWithSender::new(&cfg, Box::new(sender)); shared_tm_pool.clone(),
tx.clone(),
);
let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, sender);
let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
// For test purposes, we retrieve the request ID from the TCs and pass them to the receiver // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver
// tread. // tread.