From 43408f3a9b1988adeaaf96b87e6d5815f19e6193 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 5 Jul 2023 15:12:03 +0200 Subject: [PATCH] only actions and modes remaining --- satrs-core/src/pus/scheduler_srv.rs | 4 +++ satrs-example/src/main.rs | 34 +++++++++++++++--------- satrs-example/src/pus/event.rs | 2 +- satrs-example/src/pus/scheduler.rs | 41 +++++++++++++++++++++++++++-- satrs-example/src/pus/test.rs | 2 +- satrs-example/src/tmtc.rs | 38 -------------------------- 6 files changed, 66 insertions(+), 55 deletions(-) diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 98ea570..9de4981 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -40,6 +40,10 @@ impl PusService11SchedHandler { scheduler, } } + + pub fn scheduler_mut(&mut self) -> &mut PusScheduler { + &mut self.scheduler + } } impl PusServiceHandler for PusService11SchedHandler { diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 66f805e..8621a98 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -133,7 +133,7 @@ fn main() { let (acs_thread_tx, acs_thread_rx) = channel::(); request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx); - let tc_source = PusTcSource { + let tc_source_wrapper = PusTcSource { tc_store: tc_store.clone(), tc_source: tc_source_tx, }; @@ -148,7 +148,7 @@ fn main() { seq_count_provider: seq_count_provider_tmtc, }; let tc_args = TcArgs { - tc_source, + tc_source: tc_source_wrapper.clone(), tc_receiver: tc_source_rx, }; let tm_args = TmArgs { @@ -195,7 +195,10 @@ fn main() { verif_reporter.clone(), scheduler, ); - let mut pus_11_wrapper = Pus11Wrapper { pus_11_handler }; + let mut pus_11_wrapper = Pus11Wrapper { + pus_11_handler, + tc_source_wrapper, + }; let pus_5_handler = PusService5EventHandler::new( pus_event_rx, tc_store.pool.clone(), @@ -370,18 +373,23 @@ fn main() { let jh4 = thread::Builder::new() .name("PUS".to_string()) .spawn(move || loop { - let mut all_queues_empty = true; - let mut is_srv_finished = |srv_handler_finished: bool| { - if !srv_handler_finished { - all_queues_empty = false; + pus_11_wrapper.release_tcs(); + loop { + let mut all_queues_empty = true; + let mut is_srv_finished = |srv_handler_finished: bool| { + if !srv_handler_finished { + all_queues_empty = false; + } + }; + is_srv_finished(pus_17_wrapper.handle_next_packet()); + is_srv_finished(pus_11_wrapper.handle_next_packet()); + is_srv_finished(pus_5_wrapper.handle_next_packet()); + if all_queues_empty { + break; } - }; - is_srv_finished(pus_17_wrapper.perform_operation()); - is_srv_finished(pus_11_wrapper.perform_operation()); - is_srv_finished(pus_5_wrapper.perform_operation()); - if all_queues_empty { - thread::sleep(Duration::from_millis(200)); } + + thread::sleep(Duration::from_millis(200)); }) .unwrap(); jh0.join().expect("Joining UDP TMTC server thread failed"); diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index ef47c87..0f2654e 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -7,7 +7,7 @@ pub struct Pus5Wrapper { } impl Pus5Wrapper { - pub fn perform_operation(&mut self) -> bool { + pub fn handle_next_packet(&mut self) -> bool { match self.pus_5_handler.handle_next_packet() { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 2f10636..ffd8c89 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,13 +1,50 @@ -use log::{error, warn}; +use crate::tmtc::PusTcSource; +use log::{error, info, warn}; +use satrs_core::pool::{SharedPool, StoreAddr}; +use satrs_core::pus::scheduler::TcInfo; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler}; pub struct Pus11Wrapper { pub pus_11_handler: PusService11SchedHandler, + pub tc_source_wrapper: PusTcSource, } impl Pus11Wrapper { - pub fn perform_operation(&mut self) -> bool { + pub fn release_tcs(&mut self) { + let releaser = |enabled: bool, info: &TcInfo| -> bool { + if enabled { + self.tc_source_wrapper + .tc_source + .send(info.addr()) + .expect("sending TC to TC source failed"); + } + true + }; + + let mut pool = self + .tc_source_wrapper + .tc_store + .pool + .write() + .expect("error locking pool"); + + self.pus_11_handler + .scheduler_mut() + .update_time_from_now() + .unwrap(); + if let Ok(released_tcs) = self + .pus_11_handler + .scheduler_mut() + .release_telecommands(releaser, pool.as_mut()) + { + if released_tcs > 0 { + info!("{released_tcs} TC(s) released from scheduler"); + } + } + } + + pub fn handle_next_packet(&mut self) -> bool { match self.pus_11_handler.handle_next_packet() { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index daf8727..3aefa6e 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -16,7 +16,7 @@ pub struct Service17CustomWrapper { } impl Service17CustomWrapper { - pub fn perform_operation(&mut self) -> bool { + pub fn handle_next_packet(&mut self) -> bool { let res = self.pus17_handler.handle_next_packet(); if res.is_err() { warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 86b5c1c..28aaab9 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -2,12 +2,10 @@ use log::info; use satrs_core::events::EventU32; use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::params::Params; -use std::cell::RefCell; use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; -use std::rc::Rc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; use std::time::Duration; @@ -16,7 +14,6 @@ use crate::ccsds::CcsdsReceiver; use crate::pus::{PusReceiver, PusTcArgs, PusTcMpscRouter, PusTmArgs}; use crate::requests::RequestWithToken; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::seq_count::SeqCountProviderSyncClonable; use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket}; @@ -154,10 +151,6 @@ pub fn core_tmtc_task( tm_args: TmArgs, pus_router: PusTcMpscRouter, ) { - let scheduler = Rc::new(RefCell::new( - PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), - )); - let pus_tm_args = PusTmArgs { verif_reporter: args.verif_reporter, seq_count_provider: args.seq_count_provider.clone(), @@ -185,13 +178,11 @@ pub fn core_tmtc_task( let mut tc_buf: [u8; 4096] = [0; 4096]; loop { - let tmtc_sched = scheduler.clone(); core_tmtc_loop( &mut udp_tmtc_server, &mut tc_args, &mut tc_buf, &mut pus_receiver, - tmtc_sched, ); thread::sleep(Duration::from_millis(400)); } @@ -202,36 +193,7 @@ fn core_tmtc_loop( tc_args: &mut TcArgs, tc_buf: &mut [u8], pus_receiver: &mut PusReceiver, - scheduler: Rc>, ) { - let releaser = |enabled: bool, info: &TcInfo| -> bool { - if enabled { - tc_args - .tc_source - .tc_source - .send(info.addr()) - .expect("sending TC to TC source failed"); - } - true - }; - - let mut pool = tc_args - .tc_source - .tc_store - .pool - .write() - .expect("error locking pool"); - - let mut scheduler = scheduler.borrow_mut(); - scheduler.update_time_from_now().unwrap(); - if let Ok(released_tcs) = scheduler.release_telecommands(releaser, pool.as_mut()) { - if released_tcs > 0 { - info!("{released_tcs} TC(s) released from scheduler"); - } - } - drop(pool); - drop(scheduler); - while poll_tc_server(udp_tmtc_server) {} match tc_args.tc_receiver.try_recv() { Ok(addr) => {