continuing dynamic main
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-02-06 20:26:05 +01:00
parent 7c76f609bd
commit ccb8bdbb95
Signed by: muellerr
GPG Key ID: A649FB78196E3849
11 changed files with 444 additions and 134 deletions

View File

@ -318,10 +318,31 @@ mod alloc_mod {
/// [Clone]. /// [Clone].
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone {} pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn EcssTmSenderCore;
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore;
}
/// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable. /// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable.
impl<T> EcssTmSender for T where T: EcssTmSenderCore + Clone + 'static {} impl<T> EcssTmSender for T
where
T: EcssTmSenderCore + Clone + 'static,
{
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn EcssTmSenderCore {
self
}
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore {
self
}
}
dyn_clone::clone_trait_object!(EcssTmSender); dyn_clone::clone_trait_object!(EcssTmSender);
impl_downcast!(EcssTmSender); impl_downcast!(EcssTmSender);

View File

@ -1,15 +1,22 @@
use crate::tmtc::{MpscStoreAndSendError, PusTcSource}; use satrs_core::pus::ReceivesEcssPusTc;
use satrs_core::spacepackets::{CcsdsPacket, SpHeader}; use satrs_core::spacepackets::{CcsdsPacket, SpHeader};
use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
use satrs_example::PUS_APID; use satrs_example::PUS_APID;
#[derive(Clone)] #[derive(Clone)]
pub struct CcsdsReceiver { pub struct CcsdsReceiver<
pub tc_source: PusTcSource, TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone,
E,
> {
pub tc_source: TcSource,
} }
impl CcsdsPacketHandler for CcsdsReceiver { impl<
type Error = MpscStoreAndSendError; TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
E: 'static,
> CcsdsPacketHandler for CcsdsReceiver<TcSource, E>
{
type Error = E;
fn valid_apids(&self) -> &'static [u16] { fn valid_apids(&self) -> &'static [u16] {
&[PUS_APID] &[PUS_APID]

View File

@ -7,20 +7,17 @@ use satrs_core::{
}, },
events::EventU32, events::EventU32,
params::Params, params::Params,
pool::StoreAddr,
pus::{ pus::{
event_man::{ event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher, PusEventDispatcher,
}, },
verification::{TcStateStarted, VerificationReporterWithSender, VerificationToken}, verification::{TcStateStarted, VerificationReporterWithSender, VerificationToken},
MpscTmInStoreSender, EcssTmSender,
}, },
spacepackets::time::cds::{self, TimeProvider}, spacepackets::time::cds::{self, TimeProvider},
tmtc::tm_helper::SharedTmStore,
ChannelId,
}; };
use satrs_example::{TmSenderId, PUS_APID}; use satrs_example::PUS_APID;
use crate::update_time; use crate::update_time;
@ -30,19 +27,20 @@ pub struct PusEventHandler {
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
pus_event_dispatcher: PusEventDispatcher<(), EventU32>, pus_event_dispatcher: PusEventDispatcher<(), EventU32>,
pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>, pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>,
tm_sender: MpscTmInStoreSender, tm_sender: Box<dyn EcssTmSender>,
time_provider: TimeProvider, time_provider: TimeProvider,
timestamp: [u8; 7], timestamp: [u8; 7],
verif_handler: VerificationReporterWithSender, verif_handler: VerificationReporterWithSender,
} }
/*
*/
impl PusEventHandler { impl PusEventHandler {
pub fn new( pub fn new(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_handler: VerificationReporterWithSender, verif_handler: VerificationReporterWithSender,
event_manager: &mut MpscEventManager, event_manager: &mut MpscEventManager,
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
tm_sender: impl EcssTmSender,
) -> Self { ) -> Self {
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
@ -64,12 +62,7 @@ impl PusEventHandler {
time_provider: cds::TimeProvider::new_with_u16_days(0, 0), time_provider: cds::TimeProvider::new_with_u16_days(0, 0),
timestamp: [0; 7], timestamp: [0; 7],
verif_handler, verif_handler,
tm_sender: MpscTmInStoreSender::new( tm_sender: Box::new(tm_sender),
TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
shared_tm_store,
tm_funnel_tx,
),
} }
} }
@ -109,7 +102,12 @@ impl PusEventHandler {
if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() { if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() {
update_time(&mut self.time_provider, &mut self.timestamp); update_time(&mut self.time_provider, &mut self.timestamp);
self.pus_event_dispatcher self.pus_event_dispatcher
.generate_pus_event_tm_generic(&mut self.tm_sender, &self.timestamp, event, None) .generate_pus_event_tm_generic(
self.tm_sender.upcast_mut(),
&self.timestamp,
event,
None,
)
.expect("Sending TM as event failed"); .expect("Sending TM as event failed");
} }
} }
@ -155,18 +153,16 @@ pub struct EventHandler {
impl EventHandler { impl EventHandler {
pub fn new( pub fn new(
shared_tm_store: SharedTmStore, tm_sender: impl EcssTmSender,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_handler: VerificationReporterWithSender, verif_handler: VerificationReporterWithSender,
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self { ) -> Self {
let mut event_man_wrapper = EventManagerWrapper::new(); let mut event_man_wrapper = EventManagerWrapper::new();
let pus_event_handler = PusEventHandler::new( let pus_event_handler = PusEventHandler::new(
shared_tm_store,
tm_funnel_tx,
verif_handler, verif_handler,
event_man_wrapper.event_manager(), event_man_wrapper.event_manager(),
event_request_rx, event_request_rx,
tm_sender,
); );
Self { Self {
event_man_wrapper, event_man_wrapper,

View File

@ -14,8 +14,11 @@ use crate::events::EventHandler;
use crate::pus::stack::PusStack; use crate::pus::stack::PusStack;
use crate::tm_funnel::TmFunnel; use crate::tm_funnel::TmFunnel;
use log::info; use log::info;
use pus::test::create_test_service_dynamic;
use satrs_core::hal::std::tcp_server::ServerConfig; use satrs_core::hal::std::tcp_server::ServerConfig;
use satrs_core::hal::std::udp_server::UdpTcServer; use satrs_core::hal::std::udp_server::UdpTcServer;
use tmtc::PusTcSourceDynamic;
use udp::DynamicUdpTmHandler;
use crate::acs::AcsTask; use crate::acs::AcsTask;
use crate::ccsds::CcsdsReceiver; use crate::ccsds::CcsdsReceiver;
@ -24,16 +27,19 @@ use crate::pus::action::create_action_service;
use crate::pus::event::create_event_service; use crate::pus::event::create_event_service;
use crate::pus::hk::create_hk_service; use crate::pus::hk::create_hk_service;
use crate::pus::scheduler::create_scheduler_service; use crate::pus::scheduler::create_scheduler_service;
use crate::pus::test::create_test_service; use crate::pus::test::create_test_service_static;
use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::RequestWithToken; use crate::requests::RequestWithToken;
use crate::tcp::{SyncTcpTmSource, TcpTask}; use crate::tcp::{SyncTcpTmSource, TcpTask};
use crate::tmtc::{PusTcSource, SharedTcPool, TcArgs, TmArgs, TmtcTask}; use crate::tmtc::{
use crate::udp::UdpTmtcServer; MpscStoreAndSendError, PusTcSourceStaticPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic,
TmtcTaskStatic,
};
use crate::udp::{StaticUdpTmHandler, UdpTmtcServer};
use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig}; use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig};
use satrs_core::pus::event_man::EventRequestWithToken; use satrs_core::pus::event_man::EventRequestWithToken;
use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
use satrs_core::pus::MpscTmInStoreSender; use satrs_core::pus::{EcssTmSender, MpscTmAsVecSender, MpscTmInStoreSender};
use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter}; use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter};
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{CcsdsDistributor, TargetId}; use satrs_core::tmtc::{CcsdsDistributor, TargetId};
@ -48,29 +54,38 @@ use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
fn main() { const USE_STATIC_POOLS: bool = true;
setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example"); fn create_static_pools() -> (StaticMemoryPool, StaticMemoryPool) {
let tm_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (
(30, 32), StaticMemoryPool::new(StaticPoolConfig::new(vec![
(15, 64), (30, 32),
(15, 128), (15, 64),
(15, 256), (15, 128),
(15, 1024), (15, 256),
(15, 2048), (15, 1024),
])); (15, 2048),
let shared_tm_store = SharedTmStore::new(tm_pool); ])),
let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32), (30, 32),
(15, 64), (15, 64),
(15, 128), (15, 128),
(15, 256), (15, 256),
(15, 1024), (15, 1024),
(15, 2048), (15, 2048),
])); ])),
let shared_tc_pool = SharedTcPool { )
pool: Arc::new(RwLock::new(tc_pool)), }
};
fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender {
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender))
}
fn static_tmtc_pool_main() {
let (tm_pool, tc_pool) = create_static_pools();
let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32), (30, 32),
(15, 64), (15, 64),
@ -79,28 +94,30 @@ fn main() {
(15, 1024), (15, 1024),
(15, 2048), (15, 2048),
])); ]));
let shared_tm_store = SharedTmStore::new(tm_pool);
let shared_tc_pool = SharedTcPool {
pool: Arc::new(RwLock::new(tc_pool)),
};
let (tc_source_tx, tc_source_rx) = channel(); let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel();
let verif_sender = MpscTmInStoreSender::new(
// Every software component which needs to generate verification telemetry, receives a cloned
// verification reporter.
let verif_reporter = create_verification_reporter(MpscTmInStoreSender::new(
TmSenderId::PusVerification as ChannelId, TmSenderId::PusVerification as ChannelId,
"verif_sender", "verif_sender",
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); ));
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender));
let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId);
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID. // Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = HashMap::new(); let mut request_map = HashMap::new();
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>(); request_map.insert(acs_target_id, acs_thread_tx);
let target_apid = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId);
request_map.insert(target_apid, acs_thread_tx);
let tc_source_wrapper = PusTcSource { let tc_source_wrapper = PusTcSourceStaticPool {
tc_store: shared_tc_pool.clone(), tc_store: shared_tc_pool.clone(),
tc_source: tc_source_tx, tc_source: tc_source_tx,
}; };
@ -120,11 +137,16 @@ fn main() {
// These sender handles are used to send event requests, for example to enable or disable // These sender handles are used to send event requests, for example to enable or disable
// certain events. // certain events.
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>(); let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified // The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation. // in the sat-rs documentation.
let mut event_handler = EventHandler::new( let mut event_handler = EventHandler::new(
shared_tm_store.clone(), MpscTmInStoreSender::new(
tm_funnel_tx.clone(), TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
),
verif_reporter.clone(), verif_reporter.clone(),
event_request_rx, event_request_rx,
); );
@ -141,7 +163,7 @@ fn main() {
hk_service_receiver: pus_hk_tx, hk_service_receiver: pus_hk_tx,
action_service_receiver: pus_action_tx, action_service_receiver: pus_action_tx,
}; };
let pus_test_service = create_test_service( let pus_test_service = create_test_service_static(
shared_tm_store.clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
verif_reporter.clone(), verif_reporter.clone(),
@ -192,7 +214,7 @@ fn main() {
let ccsds_receiver = CcsdsReceiver { let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(), tc_source: tc_args.tc_source.clone(),
}; };
let mut tmtc_task = TmtcTask::new( let mut tmtc_task = TmtcTaskStatic::new(
tc_args, tc_args,
PusReceiver::new(verif_reporter.clone(), pus_router), PusReceiver::new(verif_reporter.clone(), pus_router),
); );
@ -203,8 +225,10 @@ fn main() {
.expect("creating UDP TMTC server failed"); .expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer { let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server, udp_tc_server,
tm_rx: tm_args.tm_udp_server_rx, tm_handler: StaticUdpTmHandler {
tm_store: tm_args.tm_store.clone_backing_pool(), tm_rx: tm_args.tm_udp_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
},
}; };
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
@ -301,6 +325,127 @@ fn main() {
jh4.join().expect("Joining PUS handler thread failed"); jh4.join().expect("Joining PUS handler thread failed");
} }
fn dyn_tmtc_pool_main() {
let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
let verif_reporter = create_verification_reporter(MpscTmAsVecSender::new(
TmSenderId::PusVerification as ChannelId,
"verif_sender",
tm_funnel_tx.clone(),
));
let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId);
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = HashMap::new();
request_map.insert(acs_target_id, acs_thread_tx);
let tc_source = PusTcSourceDynamic {
tc_source: tc_source_tx,
};
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(
MpscTmAsVecSender::new(
TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
tm_funnel_tx.clone(),
),
verif_reporter.clone(),
event_request_rx,
);
let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel();
let (pus_sched_tx, pus_sched_rx) = channel();
let (pus_hk_tx, pus_hk_rx) = channel();
let (pus_action_tx, pus_action_rx) = channel();
let pus_router = PusTcMpscRouter {
test_service_receiver: pus_test_tx,
event_service_receiver: pus_event_tx,
sched_service_receiver: pus_sched_tx,
hk_service_receiver: pus_hk_tx,
action_service_receiver: pus_action_tx,
};
let pus_test_service = create_test_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
event_handler.clone_event_sender(),
pus_test_rx,
);
let ccsds_receiver = CcsdsReceiver { tc_source };
let mut tmtc_task = TmtcTaskDynamic::new(
tc_source_rx,
PusReceiver::new(verif_reporter.clone(), pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone()));
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_handler: DynamicUdpTmHandler {
tm_rx: tm_server_rx,
},
};
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string())
.spawn(move || {
info!("Running UDP server on port {SERVER_PORT}");
loop {
udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation();
thread::sleep(Duration::from_millis(400));
}
})
.unwrap();
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("TCP".to_string())
.spawn(move || {
info!("Running TCP server on port {SERVER_PORT}");
loop {
tcp_server.periodic_operation();
}
})
.unwrap();
}
fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example");
if USE_STATIC_POOLS {
static_tmtc_pool_main();
} else {
dyn_tmtc_pool_main();
}
}
pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) { pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) {
time_provider time_provider
.update_from_now() .update_from_now()

View File

@ -9,11 +9,11 @@ use satrs_core::spacepackets::time::TimeWriter;
use satrs_example::{tmtc_err, CustomPusServiceId}; use satrs_example::{tmtc_err, CustomPusServiceId};
use std::sync::mpsc::Sender; use std::sync::mpsc::Sender;
pub mod stack;
pub mod action; pub mod action;
pub mod event; pub mod event;
pub mod hk; pub mod hk;
pub mod scheduler; pub mod scheduler;
pub mod stack;
pub mod test; pub mod test;
pub struct PusTcMpscRouter { pub struct PusTcMpscRouter {

View File

@ -1,7 +1,6 @@
use std::sync::mpsc; use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
use crate::tmtc::PusTcSource;
use log::{error, info, warn}; use log::{error, info, warn};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr}; use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr};
use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
@ -15,17 +14,19 @@ use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId; use satrs_core::ChannelId;
use satrs_example::{TcReceiverId, TmSenderId, PUS_APID}; use satrs_example::{TcReceiverId, TmSenderId, PUS_APID};
use crate::tmtc::PusTcSourceStaticPool;
pub struct Pus11Wrapper { pub struct Pus11Wrapper {
pub pus_11_handler: PusService11SchedHandler<EcssTcInSharedStoreConverter, PusScheduler>, pub pus_11_handler: PusService11SchedHandler<EcssTcInSharedStoreConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool, pub sched_tc_pool: StaticMemoryPool,
pub tc_source_wrapper: PusTcSource, pub tc_source_wrapper: PusTcSourceStaticPool,
} }
pub fn create_scheduler_service( pub fn create_scheduler_service(
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSender,
tc_source_wrapper: PusTcSource, tc_source_wrapper: PusTcSourceStaticPool,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>, pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool, sched_tc_pool: StaticMemoryPool,
) -> Pus11Wrapper { ) -> Pus11Wrapper {

View File

@ -1,3 +1,5 @@
use satrs_core::pus::EcssTcInSharedStoreConverter;
use super::{ use super::{
action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper,
test::Service17CustomWrapper, test::Service17CustomWrapper,
@ -8,7 +10,7 @@ pub struct PusStack {
hk_srv: Pus3Wrapper, hk_srv: Pus3Wrapper,
action_srv: Pus8Wrapper, action_srv: Pus8Wrapper,
schedule_srv: Pus11Wrapper, schedule_srv: Pus11Wrapper,
test_srv: Service17CustomWrapper, test_srv: Service17CustomWrapper<EcssTcInSharedStoreConverter>,
} }
impl PusStack { impl PusStack {
@ -17,14 +19,14 @@ impl PusStack {
event_srv: Pus5Wrapper, event_srv: Pus5Wrapper,
action_srv: Pus8Wrapper, action_srv: Pus8Wrapper,
schedule_srv: Pus11Wrapper, schedule_srv: Pus11Wrapper,
test_srv: Service17CustomWrapper, test_srv: Service17CustomWrapper<EcssTcInSharedStoreConverter>,
) -> Self { ) -> Self {
Self { Self {
event_srv, event_srv,
action_srv, action_srv,
schedule_srv, schedule_srv,
test_srv, test_srv,
hk_srv hk_srv,
} }
} }

View File

@ -4,8 +4,8 @@ use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::test::PusService17TestHandler;
use satrs_core::pus::verification::{FailParams, VerificationReporterWithSender}; use satrs_core::pus::verification::{FailParams, VerificationReporterWithSender};
use satrs_core::pus::{ use satrs_core::pus::{
EcssTcAndToken, EcssTcInMemConverter, MpscTcReceiver, MpscTmInStoreSender, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
PusPacketHandlerResult, PusServiceHelper, MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper,
}; };
use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::ecss::PusPacket;
@ -17,14 +17,14 @@ use satrs_core::{events::EventU32, pus::EcssTcInSharedStoreConverter};
use satrs_example::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID, TEST_EVENT}; use satrs_example::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID, TEST_EVENT};
use std::sync::mpsc::{self, Sender}; use std::sync::mpsc::{self, Sender};
pub fn create_test_service( pub fn create_test_service_static(
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>, tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender, verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>, event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> Service17CustomWrapper { ) -> Service17CustomWrapper<EcssTcInSharedStoreConverter> {
let test_srv_tm_sender = MpscTmInStoreSender::new( let test_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusTest as ChannelId, TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER", "PUS_17_TM_SENDER",
@ -49,12 +49,41 @@ pub fn create_test_service(
} }
} }
pub struct Service17CustomWrapper { pub fn create_test_service_dynamic(
pub pus17_handler: PusService17TestHandler<EcssTcInSharedStoreConverter>, tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> Service17CustomWrapper<EcssTcInVecConverter> {
let test_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER",
tm_funnel_tx.clone(),
);
let test_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusTest as ChannelId,
"PUS_17_TC_RECV",
pus_test_rx,
);
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
Box::new(test_srv_receiver),
Box::new(test_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
));
Service17CustomWrapper {
pus17_handler,
test_srv_event_sender: event_sender,
}
}
pub struct Service17CustomWrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus17_handler: PusService17TestHandler<TcInMemConverter>,
pub test_srv_event_sender: Sender<(EventU32, Option<Params>)>, pub test_srv_event_sender: Sender<(EventU32, Option<Params>)>,
} }
impl Service17CustomWrapper { impl<TcInMemConverter: EcssTcInMemConverter> Service17CustomWrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
let res = self.pus17_handler.handle_one_tc(); let res = self.pus17_handler.handle_one_tc();
if res.is_err() { if res.is_err() {

View File

@ -71,20 +71,21 @@ impl TmPacketSourceCore for SyncTcpTmSource {
} }
} }
pub struct TcpTask { pub struct TcpTask<MpscErrorType: 'static> {
server: TcpSpacepacketsServer< server: TcpSpacepacketsServer<
(), (),
CcsdsError<MpscStoreAndSendError>, CcsdsError<MpscErrorType>,
SyncTcpTmSource, SyncTcpTmSource,
CcsdsDistributor<MpscStoreAndSendError>, CcsdsDistributor<MpscErrorType>,
>, >,
phantom: std::marker::PhantomData<MpscErrorType>,
} }
impl TcpTask { impl<MpscErrorType: 'static + core::fmt::Debug> TcpTask<MpscErrorType> {
pub fn new( pub fn new(
cfg: ServerConfig, cfg: ServerConfig,
tm_source: SyncTcpTmSource, tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<MpscStoreAndSendError>, tc_receiver: CcsdsDistributor<MpscErrorType>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
server: TcpSpacepacketsServer::new( server: TcpSpacepacketsServer::new(
@ -93,6 +94,7 @@ impl TcpTask {
tc_receiver, tc_receiver,
Box::new(PACKET_ID_LOOKUP), Box::new(PACKET_ID_LOOKUP),
)?, )?,
phantom: std::marker::PhantomData,
}) })
} }

View File

@ -1,7 +1,7 @@
use log::warn; use log::warn;
use satrs_core::pus::{EcssTcAndToken, ReceivesEcssPusTc}; use satrs_core::pus::{EcssTcAndToken, ReceivesEcssPusTc};
use satrs_core::spacepackets::SpHeader; use satrs_core::spacepackets::SpHeader;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError};
use thiserror::Error; use thiserror::Error;
use crate::pus::PusReceiver; use crate::pus::PusReceiver;
@ -18,13 +18,13 @@ pub struct TmArgs {
} }
pub struct TcArgs { pub struct TcArgs {
pub tc_source: PusTcSource, pub tc_source: PusTcSourceStaticPool,
pub tc_receiver: Receiver<StoreAddr>, pub tc_receiver: Receiver<StoreAddr>,
} }
impl TcArgs { impl TcArgs {
#[allow(dead_code)] #[allow(dead_code)]
fn split(self) -> (PusTcSource, Receiver<StoreAddr>) { fn split(self) -> (PusTcSourceStaticPool, Receiver<StoreAddr>) {
(self.tc_source, self.tc_receiver) (self.tc_source, self.tc_receiver)
} }
} }
@ -54,19 +54,19 @@ impl SharedTcPool {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct PusTcSource { pub struct PusTcSourceStaticPool {
pub tc_source: Sender<StoreAddr>, pub tc_source: Sender<StoreAddr>,
pub tc_store: SharedTcPool, pub tc_store: SharedTcPool,
} }
impl PusTcSource { impl PusTcSourceStaticPool {
#[allow(dead_code)] #[allow(dead_code)]
pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool {
self.tc_store.pool.clone() self.tc_store.pool.clone()
} }
} }
impl ReceivesEcssPusTc for PusTcSource { impl ReceivesEcssPusTc for PusTcSourceStaticPool {
type Error = MpscStoreAndSendError; type Error = MpscStoreAndSendError;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
@ -76,7 +76,7 @@ impl ReceivesEcssPusTc for PusTcSource {
} }
} }
impl ReceivesCcsdsTc for PusTcSource { impl ReceivesCcsdsTc for PusTcSourceStaticPool {
type Error = MpscStoreAndSendError; type Error = MpscStoreAndSendError;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
@ -88,13 +88,36 @@ impl ReceivesCcsdsTc for PusTcSource {
} }
} }
pub struct TmtcTask { #[derive(Clone)]
pub struct PusTcSourceDynamic {
pub tc_source: Sender<Vec<u8>>,
}
impl ReceivesEcssPusTc for PusTcSourceDynamic {
type Error = SendError<Vec<u8>>;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
self.tc_source.send(pus_tc.raw_data().to_vec())?;
Ok(())
}
}
impl ReceivesCcsdsTc for PusTcSourceDynamic {
type Error = mpsc::SendError<Vec<u8>>;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_source.send(tc_raw.to_vec())?;
Ok(())
}
}
pub struct TmtcTaskStatic {
tc_args: TcArgs, tc_args: TcArgs,
tc_buf: [u8; 4096], tc_buf: [u8; 4096],
pus_receiver: PusReceiver, pus_receiver: PusReceiver,
} }
impl TmtcTask { impl TmtcTaskStatic {
pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self { pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self {
Self { Self {
tc_args, tc_args,
@ -148,3 +171,50 @@ impl TmtcTask {
} }
} }
} }
pub struct TmtcTaskDynamic {
pub tc_receiver: Receiver<Vec<u8>>,
pus_receiver: PusReceiver,
}
impl TmtcTaskDynamic {
pub fn new(tc_receiver: Receiver<Vec<u8>>, pus_receiver: PusReceiver) -> Self {
Self {
tc_receiver,
pus_receiver,
}
}
pub fn periodic_operation(&mut self) {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> bool {
match self.tc_receiver.try_recv() {
Ok(tc) => match PusTcReader::new(&tc) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(
satrs_core::pus::TcInMemory::Vec(tc.clone()),
pus_tc.service(),
&pus_tc,
)
.ok();
true
}
Err(e) => {
warn!("error creating PUS TC from raw data: {e}");
warn!("raw data: {:x?}", tc);
true
}
},
Err(e) => match e {
TryRecvError::Empty => false,
TryRecvError::Disconnected => {
warn!("tmtc thread: sender disconnected");
false
}
},
}
}
}

View File

@ -1,4 +1,7 @@
use std::{net::SocketAddr, sync::mpsc::Receiver}; use std::{
net::{SocketAddr, UdpSocket},
sync::mpsc::{self, Receiver},
};
use log::{info, warn}; use log::{info, warn};
use satrs_core::{ use satrs_core::{
@ -7,45 +10,17 @@ use satrs_core::{
tmtc::CcsdsError, tmtc::CcsdsError,
}; };
use crate::tmtc::MpscStoreAndSendError; pub trait UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
pub struct UdpTmtcServer { pub struct StaticUdpTmHandler {
pub udp_tc_server: UdpTcServer<CcsdsError<MpscStoreAndSendError>>,
pub tm_rx: Receiver<StoreAddr>, pub tm_rx: Receiver<StoreAddr>,
pub tm_store: SharedStaticMemoryPool, pub tm_store: SharedStaticMemoryPool,
} }
impl UdpTmtcServer {
pub fn periodic_operation(&mut self) {
while self.poll_tc_server() {}
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
self.send_tm_to_udp_client(&recv_addr);
}
}
fn poll_tc_server(&mut self) -> bool { impl UdpTmHandler for StaticUdpTmHandler {
match self.udp_tc_server.try_recv_tc() { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, &recv_addr: &SocketAddr) {
Ok(_) => true,
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc store and send error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
}
}
fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) {
while let Ok(addr) = self.tm_rx.try_recv() { while let Ok(addr) = self.tm_rx.try_recv() {
let store_lock = self.tm_store.write(); let store_lock = self.tm_store.write();
if store_lock.is_err() { if store_lock.is_err() {
@ -67,10 +42,72 @@ impl UdpTmtcServer {
} else { } else {
info!("Sending PUS TM"); info!("Sending PUS TM");
} }
let result = self.udp_tc_server.socket.send_to(buf, recv_addr); let result = socket.send_to(buf, recv_addr);
if let Err(e) = result { if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}") warn!("Sending TM with UDP socket failed: {e}")
} }
} }
} }
} }
pub struct DynamicUdpTmHandler {
pub tm_rx: Receiver<Vec<u8>>,
}
impl UdpTmHandler for DynamicUdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
while let Ok(tm) = self.tm_rx.try_recv() {
if tm.len() > 9 {
let service = tm[7];
let subservice = tm[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = socket.send_to(&tm, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
}
}
pub struct UdpTmtcServer<TmHandler: UdpTmHandler, SendError> {
pub udp_tc_server: UdpTcServer<CcsdsError<SendError>>,
pub tm_handler: TmHandler,
}
impl<TmHandler: UdpTmHandler, SendError: core::fmt::Debug + 'static>
UdpTmtcServer<TmHandler, SendError>
{
pub fn periodic_operation(&mut self) {
while self.poll_tc_server() {}
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
self.tm_handler
.send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr);
}
}
fn poll_tc_server(&mut self) -> bool {
match self.udp_tc_server.try_recv_tc() {
Ok(_) => true,
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc custom error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
}
}
}