Merge pull request 'add scheduler service' (#15) from add-scheduler-service into main

Reviewed-on: #15
Reviewed-by: lkoester <st167799@stud.uni-stuttgart.de>
This commit is contained in:
Robin Müller 2024-04-24 17:10:44 +02:00
commit 4af249612d
5 changed files with 194 additions and 15 deletions

View File

@ -172,6 +172,23 @@ pub mod action_err {
pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT];
}
pub mod pool {
use satrs::pool::{StaticMemoryPool, StaticPoolConfig};
pub fn create_sched_tc_pool() -> StaticMemoryPool {
StaticMemoryPool::new(StaticPoolConfig::new(
vec![
(100, 32),
(50, 64),
(50, 128),
(50, 256),
(50, 1024),
(100, 2048),
],
true,
))
}
}
pub mod components {
use satrs::request::UniqueApidTargetId;
@ -191,6 +208,7 @@ pub mod components {
UdpServer = 7,
TcpServer = 8,
TcpSppClient = 9,
PusScheduler = 10,
}
pub const CONTROLLER_ID: UniqueApidTargetId =
@ -205,6 +223,8 @@ pub mod components {
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32);
pub const PUS_MODE_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32);
pub const PUS_SCHEDULER_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusScheduler as u32);
pub const PUS_HK_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32);
pub const UDP_SERVER: UniqueApidTargetId =

View File

@ -9,13 +9,14 @@ use log::info;
use ops_sat_rs::config::{
cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
pool::create_sched_tc_pool,
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
VALID_PACKET_ID_LIST,
};
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
use crate::pus::{scheduler::create_scheduler_service, PusTcDistributor, PusTcMpscRouter};
use crate::tmtc::tm_sink::TmFunnelDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service};
use crate::{
@ -53,7 +54,7 @@ fn main() {
let (pus_test_tx, pus_test_rx) = mpsc::channel();
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
// let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
// let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
let (pus_action_tx, pus_action_rx) = mpsc::channel();
// let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
@ -73,7 +74,7 @@ fn main() {
let pus_router = PusTcMpscRouter {
test_tc_sender: pus_test_tx,
// event_tc_sender: pus_event_tx,
// sched_tc_sender: pus_sched_tx,
sched_tc_sender: pus_sched_tx,
// hk_tc_sender: pus_hk_tx,
action_tc_sender: pus_action_tx,
// mode_tc_sender: pus_mode_tx,
@ -84,12 +85,12 @@ fn main() {
// event_handler.clone_event_sender(),
pus_test_rx,
);
// let pus_scheduler_service = create_scheduler_service_dynamic(
// tm_funnel_tx.clone(),
// tc_source.0.clone(),
// pus_sched_rx,
// create_sched_tc_pool(),
// );
let pus_scheduler_service = create_scheduler_service(
tm_funnel_tx.clone(),
tc_source_tx.clone(),
pus_sched_rx,
create_sched_tc_pool(),
);
//
// let pus_event_service =
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
@ -116,7 +117,7 @@ fn main() {
// pus_hk_service,
// pus_event_service,
pus_action_service,
// pus_scheduler_service,
pus_scheduler_service,
// pus_mode_service,
);

View File

@ -1,4 +1,5 @@
pub mod action;
pub mod scheduler;
pub mod stack;
pub mod test;
@ -52,7 +53,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi
pub struct PusTcMpscRouter {
pub test_tc_sender: Sender<EcssTcAndToken>,
// pub event_tc_sender: Sender<EcssTcAndToken>,
// pub sched_tc_sender: Sender<EcssTcAndToken>,
pub sched_tc_sender: Sender<EcssTcAndToken>,
// pub hk_tc_sender: Sender<EcssTcAndToken>,
pub action_tc_sender: Sender<EcssTcAndToken>,
// pub mode_tc_sender: Sender<EcssTcAndToken>,

153
src/pus/scheduler.rs Normal file
View File

@ -0,0 +1,153 @@
use std::sync::mpsc;
use std::time::Duration;
use crate::pus::create_verification_reporter;
use log::{error, info, warn};
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
use satrs::pool::{PoolProvider, StaticMemoryPool};
use satrs::pus::scheduler::{PusScheduler, TcInfo};
use satrs::pus::scheduler_srv::PusSchedServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
};
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
use satrs::ComponentId;
use super::HandlingStatus;
pub trait TcReleaser {
fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
}
impl TcReleaser for PacketSenderWithSharedPool {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
let shared_pool = self.shared_pool.get_mut();
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = shared_pool
.0
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
self.sender
.send(PacketInPool::new(sender_id, released_tc_addr))
.expect("sending TC to TC source failed");
}
true
}
}
impl TcReleaser for mpsc::Sender<PacketAsVec> {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
// Send released TC to centralized TC source.
self.send(PacketAsVec::new(sender_id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
}
}
pub struct SchedulingService {
pub pus_11_handler: PusSchedServiceHandler<
MpscTcReceiver,
mpsc::Sender<PacketAsVec>,
EcssTcInVecConverter,
VerificationReporter,
PusScheduler,
>,
pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>,
}
impl SchedulingService {
pub fn release_tcs(&mut self) {
let id = self.pus_11_handler.service_helper.id();
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(id, enabled, info, tc)
};
self.pus_11_handler
.scheduler_mut()
.update_time_from_now()
.unwrap();
let released_tcs = self
.pus_11_handler
.scheduler_mut()
.release_telecommands_with_buffer(
releaser,
&mut self.sched_tc_pool,
&mut self.releaser_buf,
)
.expect("releasing TCs failed");
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self
.pus_11_handler
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
{
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
HandlingStatus::HandledOne
}
}
pub fn create_scheduler_service(
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
tc_source_sender: mpsc::Sender<PacketAsVec>,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool,
) -> SchedulingService {
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed");
let pus_11_handler = PusSchedServiceHandler::new(
PusServiceHelper::new(
PUS_SCHEDULER_SERVICE.id(),
pus_sched_rx,
tm_funnel_tx,
create_verification_reporter(PUS_SCHEDULER_SERVICE.id(), PUS_SCHEDULER_SERVICE.apid),
EcssTcInVecConverter::default(),
),
scheduler,
);
SchedulingService {
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_source_sender),
}
}

View File

@ -3,7 +3,7 @@ use crate::pus::HandlingStatus;
use derive_new::new;
use satrs::spacepackets::time::{cds, TimeWriter};
use super::{action::ActionServiceWrapper, TargetedPusService};
use super::{action::ActionServiceWrapper, scheduler::SchedulingService, TargetedPusService};
// use super::{
// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
@ -17,7 +17,7 @@ pub struct PusStack {
// hk_srv_wrapper: HkServiceWrapper<TmSender, TcInMemConverter>,
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
action_srv_wrapper: ActionServiceWrapper,
// schedule_srv: SchedulingServiceWrapper<TmSender, TcInMemConverter>,
schedule_srv: SchedulingService,
// mode_srv: ModeServiceWrapper<TmSender, TcInMemConverter>,
}
@ -25,7 +25,7 @@ impl PusStack {
pub fn periodic_operation(&mut self) {
// Release all telecommands which reached their release time before calling the service
// handlers.
// self.schedule_srv.release_tcs();
self.schedule_srv.release_tcs();
let time_stamp = cds::CdsTime::now_with_u16_days()
.expect("time stamp generation error")
.to_vec()
@ -48,7 +48,11 @@ impl PusStack {
self.test_srv.poll_and_handle_next_packet(&time_stamp),
None,
);
// is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None);
is_srv_finished(
11,
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
None,
);
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
is_srv_finished(
8,