From 511214f903e571a81004dce5297e6ba788d27e11 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 16:14:16 +0200 Subject: [PATCH] add scheduler service --- src/config.rs | 20 ++++++ src/main.rs | 21 +++--- src/pus/mod.rs | 3 +- src/pus/scheduler.rs | 153 +++++++++++++++++++++++++++++++++++++++++++ src/pus/stack.rs | 12 ++-- 5 files changed, 194 insertions(+), 15 deletions(-) create mode 100644 src/pus/scheduler.rs diff --git a/src/config.rs b/src/config.rs index e48b764..121b683 100644 --- a/src/config.rs +++ b/src/config.rs @@ -172,6 +172,23 @@ pub mod action_err { pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT]; } +pub mod pool { + use satrs::pool::{StaticMemoryPool, StaticPoolConfig}; + + pub fn create_sched_tc_pool() -> StaticMemoryPool { + StaticMemoryPool::new(StaticPoolConfig::new( + vec![ + (100, 32), + (50, 64), + (50, 128), + (50, 256), + (50, 1024), + (100, 2048), + ], + true, + )) + } +} pub mod components { use satrs::request::UniqueApidTargetId; @@ -191,6 +208,7 @@ pub mod components { UdpServer = 7, TcpServer = 8, TcpSppClient = 9, + PusScheduler = 10, } pub const CONTROLLER_ID: UniqueApidTargetId = @@ -205,6 +223,8 @@ pub mod components { UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32); pub const PUS_MODE_SERVICE: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32); + pub const PUS_SCHEDULER_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusScheduler as u32); pub const PUS_HK_SERVICE: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32); pub const UDP_SERVER: UniqueApidTargetId = diff --git a/src/main.rs b/src/main.rs index ee9b48d..22d024c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,13 +9,14 @@ use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, + pool::create_sched_tc_pool, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; -use crate::pus::{PusTcDistributor, PusTcMpscRouter}; +use crate::pus::{scheduler::create_scheduler_service, PusTcDistributor, PusTcMpscRouter}; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ @@ -53,7 +54,7 @@ fn main() { let (pus_test_tx, pus_test_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel(); - // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); @@ -73,7 +74,7 @@ fn main() { let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, // event_tc_sender: pus_event_tx, - // sched_tc_sender: pus_sched_tx, + sched_tc_sender: pus_sched_tx, // hk_tc_sender: pus_hk_tx, action_tc_sender: pus_action_tx, // mode_tc_sender: pus_mode_tx, @@ -84,12 +85,12 @@ fn main() { // event_handler.clone_event_sender(), pus_test_rx, ); - // let pus_scheduler_service = create_scheduler_service_dynamic( - // tm_funnel_tx.clone(), - // tc_source.0.clone(), - // pus_sched_rx, - // create_sched_tc_pool(), - // ); + let pus_scheduler_service = create_scheduler_service( + tm_funnel_tx.clone(), + tc_source_tx.clone(), + pus_sched_rx, + create_sched_tc_pool(), + ); // // let pus_event_service = // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); @@ -116,7 +117,7 @@ fn main() { // pus_hk_service, // pus_event_service, pus_action_service, - // pus_scheduler_service, + pus_scheduler_service, // pus_mode_service, ); diff --git a/src/pus/mod.rs b/src/pus/mod.rs index e39c7d1..791a97e 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,4 +1,5 @@ pub mod action; +pub mod scheduler; pub mod stack; pub mod test; @@ -52,7 +53,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi pub struct PusTcMpscRouter { pub test_tc_sender: Sender, // pub event_tc_sender: Sender, - // pub sched_tc_sender: Sender, + pub sched_tc_sender: Sender, // pub hk_tc_sender: Sender, pub action_tc_sender: Sender, // pub mode_tc_sender: Sender, diff --git a/src/pus/scheduler.rs b/src/pus/scheduler.rs new file mode 100644 index 0000000..08cd738 --- /dev/null +++ b/src/pus/scheduler.rs @@ -0,0 +1,153 @@ +use std::sync::mpsc; +use std::time::Duration; + +use crate::pus::create_verification_reporter; +use log::{error, info, warn}; +use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE; +use satrs::pool::{PoolProvider, StaticMemoryPool}; +use satrs::pus::scheduler::{PusScheduler, TcInfo}; +use satrs::pus::scheduler_srv::PusSchedServiceHandler; +use satrs::pus::verification::VerificationReporter; +use satrs::pus::{ + EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, +}; +use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool}; +use satrs::ComponentId; + +use super::HandlingStatus; + +pub trait TcReleaser { + fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; +} + +impl TcReleaser for PacketSenderWithSharedPool { + fn release( + &mut self, + sender_id: ComponentId, + enabled: bool, + _info: &TcInfo, + tc: &[u8], + ) -> bool { + if enabled { + let shared_pool = self.shared_pool.get_mut(); + // Transfer TC from scheduler TC pool to shared TC pool. + let released_tc_addr = shared_pool + .0 + .write() + .expect("locking pool failed") + .add(tc) + .expect("adding TC to shared pool failed"); + self.sender + .send(PacketInPool::new(sender_id, released_tc_addr)) + .expect("sending TC to TC source failed"); + } + true + } +} + +impl TcReleaser for mpsc::Sender { + fn release( + &mut self, + sender_id: ComponentId, + enabled: bool, + _info: &TcInfo, + tc: &[u8], + ) -> bool { + if enabled { + // Send released TC to centralized TC source. + self.send(PacketAsVec::new(sender_id, tc.to_vec())) + .expect("sending TC to TC source failed"); + } + true + } +} + +pub struct SchedulingService { + pub pus_11_handler: PusSchedServiceHandler< + MpscTcReceiver, + mpsc::Sender, + EcssTcInVecConverter, + VerificationReporter, + PusScheduler, + >, + pub sched_tc_pool: StaticMemoryPool, + pub releaser_buf: [u8; 4096], + pub tc_releaser: Box, +} + +impl SchedulingService { + pub fn release_tcs(&mut self) { + let id = self.pus_11_handler.service_helper.id(); + let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { + self.tc_releaser.release(id, enabled, info, tc) + }; + + self.pus_11_handler + .scheduler_mut() + .update_time_from_now() + .unwrap(); + let released_tcs = self + .pus_11_handler + .scheduler_mut() + .release_telecommands_with_buffer( + releaser, + &mut self.sched_tc_pool, + &mut self.releaser_buf, + ) + .expect("releasing TCs failed"); + if released_tcs > 0 { + info!("{released_tcs} TC(s) released from scheduler"); + } + } + + pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self + .pus_11_handler + .poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool) + { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS11 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS11 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS11: Subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}") + } + } + HandlingStatus::HandledOne + } +} + +pub fn create_scheduler_service( + tm_funnel_tx: mpsc::Sender, + tc_source_sender: mpsc::Sender, + pus_sched_rx: mpsc::Receiver, + sched_tc_pool: StaticMemoryPool, +) -> SchedulingService { + let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) + .expect("Creating PUS Scheduler failed"); + let pus_11_handler = PusSchedServiceHandler::new( + PusServiceHelper::new( + PUS_SCHEDULER_SERVICE.id(), + pus_sched_rx, + tm_funnel_tx, + create_verification_reporter(PUS_SCHEDULER_SERVICE.id(), PUS_SCHEDULER_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + scheduler, + ); + SchedulingService { + pus_11_handler, + sched_tc_pool, + releaser_buf: [0; 4096], + tc_releaser: Box::new(tc_source_sender), + } +} diff --git a/src/pus/stack.rs b/src/pus/stack.rs index 0699046..7e70ac6 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -3,7 +3,7 @@ use crate::pus::HandlingStatus; use derive_new::new; use satrs::spacepackets::time::{cds, TimeWriter}; -use super::{action::ActionServiceWrapper, TargetedPusService}; +use super::{action::ActionServiceWrapper, scheduler::SchedulingService, TargetedPusService}; // use super::{ // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, @@ -17,7 +17,7 @@ pub struct PusStack { // hk_srv_wrapper: HkServiceWrapper, // event_srv: EventServiceWrapper, action_srv_wrapper: ActionServiceWrapper, - // schedule_srv: SchedulingServiceWrapper, + schedule_srv: SchedulingService, // mode_srv: ModeServiceWrapper, } @@ -25,7 +25,7 @@ impl PusStack { pub fn periodic_operation(&mut self) { // Release all telecommands which reached their release time before calling the service // handlers. - // self.schedule_srv.release_tcs(); + self.schedule_srv.release_tcs(); let time_stamp = cds::CdsTime::now_with_u16_days() .expect("time stamp generation error") .to_vec() @@ -48,7 +48,11 @@ impl PusStack { self.test_srv.poll_and_handle_next_packet(&time_stamp), None, ); - // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); + is_srv_finished( + 11, + self.schedule_srv.poll_and_handle_next_tc(&time_stamp), + None, + ); // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); is_srv_finished( 8,