Larger update #49

Merged
muellerr merged 41 commits from this-is-complex into main 2023-07-08 15:02:42 +02:00
6 changed files with 66 additions and 55 deletions
Showing only changes of commit 43408f3a9b - Show all commits

View File

@ -40,6 +40,10 @@ impl PusService11SchedHandler {
scheduler, scheduler,
} }
} }
pub fn scheduler_mut(&mut self) -> &mut PusScheduler {
&mut self.scheduler
}
} }
impl PusServiceHandler for PusService11SchedHandler { impl PusServiceHandler for PusService11SchedHandler {

View File

@ -133,7 +133,7 @@ fn main() {
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>(); let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx); request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx);
let tc_source = PusTcSource { let tc_source_wrapper = PusTcSource {
tc_store: tc_store.clone(), tc_store: tc_store.clone(),
tc_source: tc_source_tx, tc_source: tc_source_tx,
}; };
@ -148,7 +148,7 @@ fn main() {
seq_count_provider: seq_count_provider_tmtc, seq_count_provider: seq_count_provider_tmtc,
}; };
let tc_args = TcArgs { let tc_args = TcArgs {
tc_source, tc_source: tc_source_wrapper.clone(),
tc_receiver: tc_source_rx, tc_receiver: tc_source_rx,
}; };
let tm_args = TmArgs { let tm_args = TmArgs {
@ -195,7 +195,10 @@ fn main() {
verif_reporter.clone(), verif_reporter.clone(),
scheduler, scheduler,
); );
let mut pus_11_wrapper = Pus11Wrapper { pus_11_handler }; let mut pus_11_wrapper = Pus11Wrapper {
pus_11_handler,
tc_source_wrapper,
};
let pus_5_handler = PusService5EventHandler::new( let pus_5_handler = PusService5EventHandler::new(
pus_event_rx, pus_event_rx,
tc_store.pool.clone(), tc_store.pool.clone(),
@ -370,18 +373,23 @@ fn main() {
let jh4 = thread::Builder::new() let jh4 = thread::Builder::new()
.name("PUS".to_string()) .name("PUS".to_string())
.spawn(move || loop { .spawn(move || loop {
pus_11_wrapper.release_tcs();
loop {
let mut all_queues_empty = true; let mut all_queues_empty = true;
let mut is_srv_finished = |srv_handler_finished: bool| { let mut is_srv_finished = |srv_handler_finished: bool| {
if !srv_handler_finished { if !srv_handler_finished {
all_queues_empty = false; all_queues_empty = false;
} }
}; };
is_srv_finished(pus_17_wrapper.perform_operation()); is_srv_finished(pus_17_wrapper.handle_next_packet());
is_srv_finished(pus_11_wrapper.perform_operation()); is_srv_finished(pus_11_wrapper.handle_next_packet());
is_srv_finished(pus_5_wrapper.perform_operation()); is_srv_finished(pus_5_wrapper.handle_next_packet());
if all_queues_empty { if all_queues_empty {
thread::sleep(Duration::from_millis(200)); break;
} }
}
thread::sleep(Duration::from_millis(200));
}) })
.unwrap(); .unwrap();
jh0.join().expect("Joining UDP TMTC server thread failed"); jh0.join().expect("Joining UDP TMTC server thread failed");

View File

@ -7,7 +7,7 @@ pub struct Pus5Wrapper {
} }
impl Pus5Wrapper { impl Pus5Wrapper {
pub fn perform_operation(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
match self.pus_5_handler.handle_next_packet() { match self.pus_5_handler.handle_next_packet() {
Ok(result) => match result { Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandled => {}

View File

@ -1,13 +1,50 @@
use log::{error, warn}; use crate::tmtc::PusTcSource;
use log::{error, info, warn};
use satrs_core::pool::{SharedPool, StoreAddr};
use satrs_core::pus::scheduler::TcInfo;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler}; use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler};
pub struct Pus11Wrapper { pub struct Pus11Wrapper {
pub pus_11_handler: PusService11SchedHandler, pub pus_11_handler: PusService11SchedHandler,
pub tc_source_wrapper: PusTcSource,
} }
impl Pus11Wrapper { impl Pus11Wrapper {
pub fn perform_operation(&mut self) -> bool { pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, info: &TcInfo| -> bool {
if enabled {
self.tc_source_wrapper
.tc_source
.send(info.addr())
.expect("sending TC to TC source failed");
}
true
};
let mut pool = self
.tc_source_wrapper
.tc_store
.pool
.write()
.expect("error locking pool");
self.pus_11_handler
.scheduler_mut()
.update_time_from_now()
.unwrap();
if let Ok(released_tcs) = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(releaser, pool.as_mut())
{
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_next_packet() { match self.pus_11_handler.handle_next_packet() {
Ok(result) => match result { Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandled => {}

View File

@ -16,7 +16,7 @@ pub struct Service17CustomWrapper {
} }
impl Service17CustomWrapper { impl Service17CustomWrapper {
pub fn perform_operation(&mut self) -> bool { pub fn handle_next_packet(&mut self) -> bool {
let res = self.pus17_handler.handle_next_packet(); let res = self.pus17_handler.handle_next_packet();
if res.is_err() { if res.is_err() {
warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); warn!("PUS17 handler failed with error {:?}", res.unwrap_err());

View File

@ -2,12 +2,10 @@ use log::info;
use satrs_core::events::EventU32; use satrs_core::events::EventU32;
use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer};
use satrs_core::params::Params; use satrs_core::params::Params;
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -16,7 +14,6 @@ use crate::ccsds::CcsdsReceiver;
use crate::pus::{PusReceiver, PusTcArgs, PusTcMpscRouter, PusTmArgs}; use crate::pus::{PusReceiver, PusTcArgs, PusTcMpscRouter, PusTmArgs};
use crate::requests::RequestWithToken; use crate::requests::RequestWithToken;
use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::pus::verification::StdVerifReporterWithSender;
use satrs_core::seq_count::SeqCountProviderSyncClonable; use satrs_core::seq_count::SeqCountProviderSyncClonable;
use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket}; use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket};
@ -154,10 +151,6 @@ pub fn core_tmtc_task(
tm_args: TmArgs, tm_args: TmArgs,
pus_router: PusTcMpscRouter, pus_router: PusTcMpscRouter,
) { ) {
let scheduler = Rc::new(RefCell::new(
PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(),
));
let pus_tm_args = PusTmArgs { let pus_tm_args = PusTmArgs {
verif_reporter: args.verif_reporter, verif_reporter: args.verif_reporter,
seq_count_provider: args.seq_count_provider.clone(), seq_count_provider: args.seq_count_provider.clone(),
@ -185,13 +178,11 @@ pub fn core_tmtc_task(
let mut tc_buf: [u8; 4096] = [0; 4096]; let mut tc_buf: [u8; 4096] = [0; 4096];
loop { loop {
let tmtc_sched = scheduler.clone();
core_tmtc_loop( core_tmtc_loop(
&mut udp_tmtc_server, &mut udp_tmtc_server,
&mut tc_args, &mut tc_args,
&mut tc_buf, &mut tc_buf,
&mut pus_receiver, &mut pus_receiver,
tmtc_sched,
); );
thread::sleep(Duration::from_millis(400)); thread::sleep(Duration::from_millis(400));
} }
@ -202,36 +193,7 @@ fn core_tmtc_loop(
tc_args: &mut TcArgs, tc_args: &mut TcArgs,
tc_buf: &mut [u8], tc_buf: &mut [u8],
pus_receiver: &mut PusReceiver, pus_receiver: &mut PusReceiver,
scheduler: Rc<RefCell<PusScheduler>>,
) { ) {
let releaser = |enabled: bool, info: &TcInfo| -> bool {
if enabled {
tc_args
.tc_source
.tc_source
.send(info.addr())
.expect("sending TC to TC source failed");
}
true
};
let mut pool = tc_args
.tc_source
.tc_store
.pool
.write()
.expect("error locking pool");
let mut scheduler = scheduler.borrow_mut();
scheduler.update_time_from_now().unwrap();
if let Ok(released_tcs) = scheduler.release_telecommands(releaser, pool.as_mut()) {
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
drop(pool);
drop(scheduler);
while poll_tc_server(udp_tmtc_server) {} while poll_tc_server(udp_tmtc_server) {}
match tc_args.tc_receiver.try_recv() { match tc_args.tc_receiver.try_recv() {
Ok(addr) => { Ok(addr) => {