finally, done
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-02-07 15:09:29 +01:00
parent d4a69a77e9
commit 267ae6500e
Signed by: muellerr
GPG Key ID: A649FB78196E3849
9 changed files with 301 additions and 192 deletions

View File

@ -23,10 +23,10 @@ use udp::DynamicUdpTmHandler;
use crate::acs::AcsTask; use crate::acs::AcsTask;
use crate::ccsds::CcsdsReceiver; use crate::ccsds::CcsdsReceiver;
use crate::logger::setup_logger; use crate::logger::setup_logger;
use crate::pus::action::create_action_service; use crate::pus::action::{create_action_service_dynamic, create_action_service_static};
use crate::pus::event::create_event_service; use crate::pus::event::{create_event_service_dynamic, create_event_service_static};
use crate::pus::hk::create_hk_service; use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static};
use crate::pus::scheduler::create_scheduler_service_static; use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static};
use crate::pus::test::create_test_service_static; use crate::pus::test::create_test_service_static;
use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::RequestWithToken; use crate::requests::RequestWithToken;
@ -76,6 +76,17 @@ fn create_static_pools() -> (StaticMemoryPool, StaticMemoryPool) {
) )
} }
fn create_sched_tc_pool() -> StaticMemoryPool {
StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]))
}
fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender { fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender {
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned // Every software component which needs to generate verification telemetry, gets a cloned
@ -85,14 +96,6 @@ fn create_verification_reporter(verif_sender: impl EcssTmSender) -> Verification
fn static_tmtc_pool_main() { fn static_tmtc_pool_main() {
let (tm_pool, tc_pool) = create_static_pools(); let (tm_pool, tc_pool) = create_static_pools();
let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]));
let shared_tm_store = SharedTmStore::new(tm_pool); let shared_tm_store = SharedTmStore::new(tm_pool);
let shared_tc_pool = SharedTcPool { let shared_tc_pool = SharedTcPool {
pool: Arc::new(RwLock::new(tc_pool)), pool: Arc::new(RwLock::new(tc_pool)),
@ -176,9 +179,9 @@ fn static_tmtc_pool_main() {
verif_reporter.clone(), verif_reporter.clone(),
tc_source_wrapper, tc_source_wrapper,
pus_sched_rx, pus_sched_rx,
sched_tc_pool, create_sched_tc_pool(),
); );
let pus_event_service = create_event_service( let pus_event_service = create_event_service_static(
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
verif_reporter.clone(), verif_reporter.clone(),
@ -186,7 +189,7 @@ fn static_tmtc_pool_main() {
pus_event_rx, pus_event_rx,
event_request_tx, event_request_tx,
); );
let pus_action_service = create_action_service( let pus_action_service = create_action_service_static(
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
verif_reporter.clone(), verif_reporter.clone(),
@ -194,7 +197,7 @@ fn static_tmtc_pool_main() {
pus_action_rx, pus_action_rx,
request_map.clone(), request_map.clone(),
); );
let pus_hk_service = create_hk_service( let pus_hk_service = create_hk_service_static(
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
verif_reporter.clone(), verif_reporter.clone(),
@ -346,9 +349,7 @@ fn dyn_tmtc_pool_main() {
let mut request_map = HashMap::new(); let mut request_map = HashMap::new();
request_map.insert(acs_target_id, acs_thread_tx); request_map.insert(acs_target_id, acs_thread_tx);
let tc_source = PusTcSourceProviderDynamic { let tc_source = PusTcSourceProviderDynamic(tc_source_tx);
tc_source: tc_source_tx,
};
// Create event handling components // Create event handling components
// These sender handles are used to send event requests, for example to enable or disable // These sender handles are used to send event requests, for example to enable or disable
@ -385,6 +386,39 @@ fn dyn_tmtc_pool_main() {
event_handler.clone_event_sender(), event_handler.clone_event_sender(),
pus_test_rx, pus_test_rx,
); );
let pus_scheduler_service = create_scheduler_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
tc_source.0.clone(),
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service = create_event_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
pus_event_rx,
event_request_tx,
);
let pus_action_service = create_action_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
pus_action_rx,
request_map.clone(),
);
let pus_hk_service = create_hk_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
pus_hk_rx,
request_map,
);
let mut pus_stack = PusStack::new(
pus_hk_service,
pus_event_service,
pus_action_service,
pus_scheduler_service,
pus_test_service,
);
let ccsds_receiver = CcsdsReceiver { tc_source }; let ccsds_receiver = CcsdsReceiver { tc_source };
@ -479,7 +513,7 @@ fn dyn_tmtc_pool_main() {
let jh4 = thread::Builder::new() let jh4 = thread::Builder::new()
.name("PUS".to_string()) .name("PUS".to_string())
.spawn(move || loop { .spawn(move || loop {
// pus_stack.periodic_operation(); pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
}) })
.unwrap(); .unwrap();

View File

@ -5,9 +5,9 @@ use satrs_core::pus::verification::{
FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken, FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken,
}; };
use satrs_core::pus::{ use satrs_core::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTmSender, MpscTcReceiver, MpscTmInStoreSender, PusPacketHandlerResult, EcssTcReceiver, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender,
PusPacketHandlingError, PusServiceBase, PusServiceHelper, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper,
}; };
use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::ecss::PusPacket;
@ -17,14 +17,14 @@ use satrs_example::{tmtc_err, TargetIdWithApid, TcReceiverId, TmSenderId, PUS_AP
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::mpsc::{self, Sender}; use std::sync::mpsc::{self, Sender};
pub fn create_action_service( pub fn create_action_service_static(
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>, pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>, request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus8Wrapper { ) -> Pus8Wrapper<EcssTcInSharedStoreConverter> {
let action_srv_tm_sender = MpscTmInStoreSender::new( let action_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusAction as ChannelId, TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER", "PUS_8_TM_SENDER",
@ -47,6 +47,33 @@ pub fn create_action_service(
Pus8Wrapper { pus_8_handler } Pus8Wrapper { pus_8_handler }
} }
pub fn create_action_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus8Wrapper<EcssTcInVecConverter> {
let action_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER",
tm_funnel_tx.clone(),
);
let action_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusAction as ChannelId,
"PUS_8_TC_RECV",
pus_action_rx,
);
let pus_8_handler = PusService8ActionHandler::new(
Box::new(action_srv_receiver),
Box::new(action_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
request_map.clone(),
);
Pus8Wrapper { pus_8_handler }
}
pub struct PusService8ActionHandler<TcInMemConverter: EcssTcInMemConverter> { pub struct PusService8ActionHandler<TcInMemConverter: EcssTcInMemConverter> {
service_helper: PusServiceHelper<TcInMemConverter>, service_helper: PusServiceHelper<TcInMemConverter>,
request_handlers: HashMap<TargetIdWithApid, Sender<RequestWithToken>>, request_handlers: HashMap<TargetIdWithApid, Sender<RequestWithToken>>,
@ -175,11 +202,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> PusService8ActionHandler<TcInMemCon
} }
} }
pub struct Pus8Wrapper { pub struct Pus8Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub(crate) pus_8_handler: PusService8ActionHandler<EcssTcInSharedStoreConverter>, pub(crate) pus_8_handler: PusService8ActionHandler<TcInMemConverter>,
} }
impl Pus8Wrapper { impl<TcInMemConverter: EcssTcInMemConverter> Pus8Wrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_8_handler.handle_one_tc() { match self.pus_8_handler.handle_one_tc() {
Ok(result) => match result { Ok(result) => match result {

View File

@ -6,21 +6,22 @@ use satrs_core::pus::event_man::EventRequestWithToken;
use satrs_core::pus::event_srv::PusService5EventHandler; use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::verification::VerificationReporterWithSender; use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{ use satrs_core::pus::{
EcssTcAndToken, EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
PusPacketHandlerResult, PusServiceHelper, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult,
PusServiceHelper,
}; };
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId; use satrs_core::ChannelId;
use satrs_example::{TcReceiverId, TmSenderId, PUS_APID}; use satrs_example::{TcReceiverId, TmSenderId, PUS_APID};
pub fn create_event_service( pub fn create_event_service_static(
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>, pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>, event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> Pus5Wrapper { ) -> Pus5Wrapper<EcssTcInSharedStoreConverter> {
let event_srv_tm_sender = MpscTmInStoreSender::new( let event_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusEvent as ChannelId, TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER", "PUS_5_TM_SENDER",
@ -45,11 +46,40 @@ pub fn create_event_service(
Pus5Wrapper { pus_5_handler } Pus5Wrapper { pus_5_handler }
} }
pub struct Pus5Wrapper { pub fn create_event_service_dynamic(
pub pus_5_handler: PusService5EventHandler<EcssTcInSharedStoreConverter>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> Pus5Wrapper<EcssTcInVecConverter> {
let event_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER",
tm_funnel_tx,
);
let event_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusEvent as ChannelId,
"PUS_5_TC_RECV",
pus_event_rx,
);
let pus_5_handler = PusService5EventHandler::new(
PusServiceHelper::new(
Box::new(event_srv_receiver),
Box::new(event_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
),
event_request_tx,
);
Pus5Wrapper { pus_5_handler }
} }
impl Pus5Wrapper { pub struct Pus5Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus_5_handler: PusService5EventHandler<TcInMemConverter>,
}
impl<TcInMemConverter: EcssTcInMemConverter> Pus5Wrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_5_handler.handle_one_tc() { match self.pus_5_handler.handle_one_tc() {
Ok(result) => match result { Ok(result) => match result {

View File

@ -6,9 +6,9 @@ use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, VerificationReporterWithSender, FailParams, StdVerifReporterWithSender, VerificationReporterWithSender,
}; };
use satrs_core::pus::{ use satrs_core::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTmSender, MpscTcReceiver, MpscTmInStoreSender, PusPacketHandlerResult, EcssTcReceiver, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender,
PusPacketHandlingError, PusServiceBase, PusServiceHelper, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper,
}; };
use satrs_core::spacepackets::ecss::{hk, PusPacket}; use satrs_core::spacepackets::ecss::{hk, PusPacket};
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
@ -17,14 +17,14 @@ use satrs_example::{hk_err, tmtc_err, TargetIdWithApid, TcReceiverId, TmSenderId
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::mpsc::{self, Sender}; use std::sync::mpsc::{self, Sender};
pub fn create_hk_service( pub fn create_hk_service_static(
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>, pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>, request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus3Wrapper { ) -> Pus3Wrapper<EcssTcInSharedStoreConverter> {
let hk_srv_tm_sender = MpscTmInStoreSender::new( let hk_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusHk as ChannelId, TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER", "PUS_3_TM_SENDER",
@ -44,6 +44,30 @@ pub fn create_hk_service(
Pus3Wrapper { pus_3_handler } Pus3Wrapper { pus_3_handler }
} }
pub fn create_hk_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus3Wrapper<EcssTcInVecConverter> {
let hk_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER",
tm_funnel_tx.clone(),
);
let hk_srv_receiver =
MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx);
let pus_3_handler = PusService3HkHandler::new(
Box::new(hk_srv_receiver),
Box::new(hk_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
request_map,
);
Pus3Wrapper { pus_3_handler }
}
pub struct PusService3HkHandler<TcInMemConverter: EcssTcInMemConverter> { pub struct PusService3HkHandler<TcInMemConverter: EcssTcInMemConverter> {
psb: PusServiceHelper<TcInMemConverter>, psb: PusServiceHelper<TcInMemConverter>,
request_handlers: HashMap<TargetIdWithApid, Sender<RequestWithToken>>, request_handlers: HashMap<TargetIdWithApid, Sender<RequestWithToken>>,
@ -180,11 +204,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> PusService3HkHandler<TcInMemConvert
} }
} }
pub struct Pus3Wrapper { pub struct Pus3Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub(crate) pus_3_handler: PusService3HkHandler<EcssTcInSharedStoreConverter>, pub(crate) pus_3_handler: PusService3HkHandler<TcInMemConverter>,
} }
impl Pus3Wrapper { impl<TcInMemConverter: EcssTcInMemConverter> Pus3Wrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_3_handler.handle_one_tc() { match self.pus_3_handler.handle_one_tc() {
Ok(result) => match result { Ok(result) => match result {

View File

@ -7,29 +7,109 @@ use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::verification::VerificationReporterWithSender; use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{ use satrs_core::pus::{
EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, MpscTcReceiver, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult,
PusServiceHelper,
}; };
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId; use satrs_core::ChannelId;
use satrs_example::{TcReceiverId, TmSenderId, PUS_APID}; use satrs_example::{TcReceiverId, TmSenderId, PUS_APID};
use crate::tmtc::{PusTcSourceProviderDynamic, PusTcSourceProviderSharedPool}; use crate::tmtc::PusTcSourceProviderSharedPool;
pub struct Pus11WrapperStatic { pub trait TcReleaser {
pub pus_11_handler: PusService11SchedHandler<EcssTcInSharedStoreConverter, PusScheduler>, fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
}
impl TcReleaser for PusTcSourceProviderSharedPool {
fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool {
if enabled {
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = self
.shared_pool
.pool
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
self.tc_source
.send(released_tc_addr)
.expect("sending TC to TC source failed");
}
true
}
}
impl TcReleaser for mpsc::Sender<Vec<u8>> {
fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool {
if enabled {
// Send released TC to centralized TC source.
self.send(tc.to_vec())
.expect("sending TC to TC source failed");
}
true
}
}
pub struct Pus11Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus_11_handler: PusService11SchedHandler<TcInMemConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool, pub sched_tc_pool: StaticMemoryPool,
pub tc_source_wrapper: PusTcSourceProviderSharedPool, pub tc_releaser: Box<dyn TcReleaser + Send>,
}
impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(enabled, info, tc)
};
self.pus_11_handler
.scheduler_mut()
.update_time_from_now()
.unwrap();
let released_tcs = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(releaser, &mut self.sched_tc_pool)
.expect("releasing TCs failed");
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
} }
pub fn create_scheduler_service_static( pub fn create_scheduler_service_static(
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSender,
tc_source_wrapper: PusTcSourceProviderSharedPool, tc_releaser: PusTcSourceProviderSharedPool,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>, pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool, sched_tc_pool: StaticMemoryPool,
) -> Pus11WrapperStatic { ) -> Pus11Wrapper<EcssTcInSharedStoreConverter> {
let sched_srv_tm_sender = MpscTmInStoreSender::new( let sched_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusSched as ChannelId, TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER", "PUS_11_TM_SENDER",
@ -49,132 +129,49 @@ pub fn create_scheduler_service_static(
Box::new(sched_srv_tm_sender), Box::new(sched_srv_tm_sender),
PUS_APID, PUS_APID,
verif_reporter.clone(), verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_source_wrapper.clone_backing_pool(), 2048), EcssTcInSharedStoreConverter::new(tc_releaser.clone_backing_pool(), 2048),
), ),
scheduler, scheduler,
); );
Pus11WrapperStatic { Pus11Wrapper {
pus_11_handler, pus_11_handler,
sched_tc_pool, sched_tc_pool,
tc_source_wrapper, tc_releaser: Box::new(tc_releaser),
} }
} }
impl Pus11WrapperStatic { pub fn create_scheduler_service_dynamic(
pub fn release_tcs(&mut self) { tm_funnel_tx: mpsc::Sender<Vec<u8>>,
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { verif_reporter: VerificationReporterWithSender,
if enabled { tc_source_sender: mpsc::Sender<Vec<u8>>,
// Transfer TC from scheduler TC pool to shared TC pool. pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
let released_tc_addr = self sched_tc_pool: StaticMemoryPool,
.tc_source_wrapper ) -> Pus11Wrapper<EcssTcInVecConverter> {
.shared_pool let sched_srv_tm_sender = MpscTmAsVecSender::new(
.pool TmSenderId::PusSched as ChannelId,
.write() "PUS_11_TM_SENDER",
.expect("locking pool failed") tm_funnel_tx,
.add(tc) );
.expect("adding TC to shared pool failed"); let sched_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusSched as ChannelId,
self.tc_source_wrapper "PUS_11_TC_RECV",
.tc_source pus_sched_rx,
.send(released_tc_addr) );
.expect("sending TC to TC source failed"); let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
} .expect("Creating PUS Scheduler failed");
true let pus_11_handler = PusService11SchedHandler::new(
}; PusServiceHelper::new(
Box::new(sched_srv_receiver),
self.pus_11_handler Box::new(sched_srv_tm_sender),
.scheduler_mut() PUS_APID,
.update_time_from_now() verif_reporter.clone(),
.unwrap(); EcssTcInVecConverter::default(),
let released_tcs = self ),
.pus_11_handler scheduler,
.scheduler_mut() );
.release_telecommands(releaser, &mut self.sched_tc_pool) Pus11Wrapper {
.expect("releasing TCs failed"); pus_11_handler,
if released_tcs > 0 { sched_tc_pool,
info!("{released_tcs} TC(s) released from scheduler"); tc_releaser: Box::new(tc_source_sender),
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
}
pub struct Pus11WrapperDynamic {
pub pus_11_handler: PusService11SchedHandler<EcssTcInVecConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub tc_source_wrapper: PusTcSourceProviderDynamic,
}
impl Pus11WrapperDynamic {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
if enabled {
// Send released TC to centralized TC source.
self.tc_source_wrapper
.tc_source
.send(tc.to_vec())
.expect("sending TC to TC source failed");
}
true
};
self.pus_11_handler
.scheduler_mut()
.update_time_from_now()
.unwrap();
let released_tcs = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(releaser, &mut self.sched_tc_pool)
.expect("releasing TCs failed");
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
} }
} }

View File

@ -1,25 +1,25 @@
use satrs_core::pus::EcssTcInSharedStoreConverter; use satrs_core::pus::EcssTcInMemConverter;
use super::{ use super::{
action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11WrapperStatic, action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper,
test::Service17CustomWrapper, test::Service17CustomWrapper,
}; };
pub struct PusStack { pub struct PusStack<TcInMemConverter: EcssTcInMemConverter> {
event_srv: Pus5Wrapper, event_srv: Pus5Wrapper<TcInMemConverter>,
hk_srv: Pus3Wrapper, hk_srv: Pus3Wrapper<TcInMemConverter>,
action_srv: Pus8Wrapper, action_srv: Pus8Wrapper<TcInMemConverter>,
schedule_srv: Pus11WrapperStatic, schedule_srv: Pus11Wrapper<TcInMemConverter>,
test_srv: Service17CustomWrapper<EcssTcInSharedStoreConverter>, test_srv: Service17CustomWrapper<TcInMemConverter>,
} }
impl PusStack { impl<TcInMemConverter: EcssTcInMemConverter> PusStack<TcInMemConverter> {
pub fn new( pub fn new(
hk_srv: Pus3Wrapper, hk_srv: Pus3Wrapper<TcInMemConverter>,
event_srv: Pus5Wrapper, event_srv: Pus5Wrapper<TcInMemConverter>,
action_srv: Pus8Wrapper, action_srv: Pus8Wrapper<TcInMemConverter>,
schedule_srv: Pus11WrapperStatic, schedule_srv: Pus11Wrapper<TcInMemConverter>,
test_srv: Service17CustomWrapper<EcssTcInSharedStoreConverter>, test_srv: Service17CustomWrapper<TcInMemConverter>,
) -> Self { ) -> Self {
Self { Self {
event_srv, event_srv,

View File

@ -11,8 +11,6 @@ use satrs_core::{
}; };
use satrs_example::PUS_APID; use satrs_example::PUS_APID;
use crate::tmtc::MpscStoreAndSendError;
pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)];
#[derive(Default, Clone)] #[derive(Default, Clone)]

View File

@ -88,16 +88,15 @@ impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool {
} }
} }
// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules.
#[derive(Clone)] #[derive(Clone)]
pub struct PusTcSourceProviderDynamic { pub struct PusTcSourceProviderDynamic(pub Sender<Vec<u8>>);
pub tc_source: Sender<Vec<u8>>,
}
impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { impl ReceivesEcssPusTc for PusTcSourceProviderDynamic {
type Error = SendError<Vec<u8>>; type Error = SendError<Vec<u8>>;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
self.tc_source.send(pus_tc.raw_data().to_vec())?; self.0.send(pus_tc.raw_data().to_vec())?;
Ok(()) Ok(())
} }
} }
@ -106,7 +105,7 @@ impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
type Error = mpsc::SendError<Vec<u8>>; type Error = mpsc::SendError<Vec<u8>>;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_source.send(tc_raw.to_vec())?; self.0.send(tc_raw.to_vec())?;
Ok(()) Ok(())
} }
} }

View File

@ -1,6 +1,6 @@
use std::{ use std::{
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::mpsc::{self, Receiver}, sync::mpsc::Receiver,
}; };
use log::{info, warn}; use log::{info, warn};