add scheduler service
This commit is contained in:
parent
ffeb7951a8
commit
511214f903
@ -172,6 +172,23 @@ pub mod action_err {
|
|||||||
|
|
||||||
pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT];
|
pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT];
|
||||||
}
|
}
|
||||||
|
pub mod pool {
|
||||||
|
use satrs::pool::{StaticMemoryPool, StaticPoolConfig};
|
||||||
|
|
||||||
|
pub fn create_sched_tc_pool() -> StaticMemoryPool {
|
||||||
|
StaticMemoryPool::new(StaticPoolConfig::new(
|
||||||
|
vec![
|
||||||
|
(100, 32),
|
||||||
|
(50, 64),
|
||||||
|
(50, 128),
|
||||||
|
(50, 256),
|
||||||
|
(50, 1024),
|
||||||
|
(100, 2048),
|
||||||
|
],
|
||||||
|
true,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub mod components {
|
pub mod components {
|
||||||
use satrs::request::UniqueApidTargetId;
|
use satrs::request::UniqueApidTargetId;
|
||||||
@ -191,6 +208,7 @@ pub mod components {
|
|||||||
UdpServer = 7,
|
UdpServer = 7,
|
||||||
TcpServer = 8,
|
TcpServer = 8,
|
||||||
TcpSppClient = 9,
|
TcpSppClient = 9,
|
||||||
|
PusScheduler = 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const CONTROLLER_ID: UniqueApidTargetId =
|
pub const CONTROLLER_ID: UniqueApidTargetId =
|
||||||
@ -205,6 +223,8 @@ pub mod components {
|
|||||||
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32);
|
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32);
|
||||||
pub const PUS_MODE_SERVICE: UniqueApidTargetId =
|
pub const PUS_MODE_SERVICE: UniqueApidTargetId =
|
||||||
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32);
|
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32);
|
||||||
|
pub const PUS_SCHEDULER_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusScheduler as u32);
|
||||||
pub const PUS_HK_SERVICE: UniqueApidTargetId =
|
pub const PUS_HK_SERVICE: UniqueApidTargetId =
|
||||||
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32);
|
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32);
|
||||||
pub const UDP_SERVER: UniqueApidTargetId =
|
pub const UDP_SERVER: UniqueApidTargetId =
|
||||||
|
21
src/main.rs
21
src/main.rs
@ -9,13 +9,14 @@ use log::info;
|
|||||||
use ops_sat_rs::config::{
|
use ops_sat_rs::config::{
|
||||||
cfg_file::create_app_config,
|
cfg_file::create_app_config,
|
||||||
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
|
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
|
||||||
|
pool::create_sched_tc_pool,
|
||||||
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
|
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
|
||||||
VALID_PACKET_ID_LIST,
|
VALID_PACKET_ID_LIST,
|
||||||
};
|
};
|
||||||
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
|
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
|
||||||
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
|
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
|
||||||
|
|
||||||
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
|
use crate::pus::{scheduler::create_scheduler_service, PusTcDistributor, PusTcMpscRouter};
|
||||||
use crate::tmtc::tm_sink::TmFunnelDynamic;
|
use crate::tmtc::tm_sink::TmFunnelDynamic;
|
||||||
use crate::{controller::ExperimentController, pus::test::create_test_service};
|
use crate::{controller::ExperimentController, pus::test::create_test_service};
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -53,7 +54,7 @@ fn main() {
|
|||||||
|
|
||||||
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
||||||
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
||||||
// let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
|
let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
|
||||||
// let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
|
// let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
|
||||||
let (pus_action_tx, pus_action_rx) = mpsc::channel();
|
let (pus_action_tx, pus_action_rx) = mpsc::channel();
|
||||||
// let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
|
// let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
|
||||||
@ -73,7 +74,7 @@ fn main() {
|
|||||||
let pus_router = PusTcMpscRouter {
|
let pus_router = PusTcMpscRouter {
|
||||||
test_tc_sender: pus_test_tx,
|
test_tc_sender: pus_test_tx,
|
||||||
// event_tc_sender: pus_event_tx,
|
// event_tc_sender: pus_event_tx,
|
||||||
// sched_tc_sender: pus_sched_tx,
|
sched_tc_sender: pus_sched_tx,
|
||||||
// hk_tc_sender: pus_hk_tx,
|
// hk_tc_sender: pus_hk_tx,
|
||||||
action_tc_sender: pus_action_tx,
|
action_tc_sender: pus_action_tx,
|
||||||
// mode_tc_sender: pus_mode_tx,
|
// mode_tc_sender: pus_mode_tx,
|
||||||
@ -84,12 +85,12 @@ fn main() {
|
|||||||
// event_handler.clone_event_sender(),
|
// event_handler.clone_event_sender(),
|
||||||
pus_test_rx,
|
pus_test_rx,
|
||||||
);
|
);
|
||||||
// let pus_scheduler_service = create_scheduler_service_dynamic(
|
let pus_scheduler_service = create_scheduler_service(
|
||||||
// tm_funnel_tx.clone(),
|
tm_funnel_tx.clone(),
|
||||||
// tc_source.0.clone(),
|
tc_source_tx.clone(),
|
||||||
// pus_sched_rx,
|
pus_sched_rx,
|
||||||
// create_sched_tc_pool(),
|
create_sched_tc_pool(),
|
||||||
// );
|
);
|
||||||
//
|
//
|
||||||
// let pus_event_service =
|
// let pus_event_service =
|
||||||
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
|
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
|
||||||
@ -116,7 +117,7 @@ fn main() {
|
|||||||
// pus_hk_service,
|
// pus_hk_service,
|
||||||
// pus_event_service,
|
// pus_event_service,
|
||||||
pus_action_service,
|
pus_action_service,
|
||||||
// pus_scheduler_service,
|
pus_scheduler_service,
|
||||||
// pus_mode_service,
|
// pus_mode_service,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
pub mod action;
|
pub mod action;
|
||||||
|
pub mod scheduler;
|
||||||
pub mod stack;
|
pub mod stack;
|
||||||
pub mod test;
|
pub mod test;
|
||||||
|
|
||||||
@ -52,7 +53,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi
|
|||||||
pub struct PusTcMpscRouter {
|
pub struct PusTcMpscRouter {
|
||||||
pub test_tc_sender: Sender<EcssTcAndToken>,
|
pub test_tc_sender: Sender<EcssTcAndToken>,
|
||||||
// pub event_tc_sender: Sender<EcssTcAndToken>,
|
// pub event_tc_sender: Sender<EcssTcAndToken>,
|
||||||
// pub sched_tc_sender: Sender<EcssTcAndToken>,
|
pub sched_tc_sender: Sender<EcssTcAndToken>,
|
||||||
// pub hk_tc_sender: Sender<EcssTcAndToken>,
|
// pub hk_tc_sender: Sender<EcssTcAndToken>,
|
||||||
pub action_tc_sender: Sender<EcssTcAndToken>,
|
pub action_tc_sender: Sender<EcssTcAndToken>,
|
||||||
// pub mode_tc_sender: Sender<EcssTcAndToken>,
|
// pub mode_tc_sender: Sender<EcssTcAndToken>,
|
||||||
|
153
src/pus/scheduler.rs
Normal file
153
src/pus/scheduler.rs
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
use std::sync::mpsc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crate::pus::create_verification_reporter;
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
|
||||||
|
use satrs::pool::{PoolProvider, StaticMemoryPool};
|
||||||
|
use satrs::pus::scheduler::{PusScheduler, TcInfo};
|
||||||
|
use satrs::pus::scheduler_srv::PusSchedServiceHandler;
|
||||||
|
use satrs::pus::verification::VerificationReporter;
|
||||||
|
use satrs::pus::{
|
||||||
|
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
|
||||||
|
};
|
||||||
|
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
|
||||||
|
use satrs::ComponentId;
|
||||||
|
|
||||||
|
use super::HandlingStatus;
|
||||||
|
|
||||||
|
pub trait TcReleaser {
|
||||||
|
fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TcReleaser for PacketSenderWithSharedPool {
|
||||||
|
fn release(
|
||||||
|
&mut self,
|
||||||
|
sender_id: ComponentId,
|
||||||
|
enabled: bool,
|
||||||
|
_info: &TcInfo,
|
||||||
|
tc: &[u8],
|
||||||
|
) -> bool {
|
||||||
|
if enabled {
|
||||||
|
let shared_pool = self.shared_pool.get_mut();
|
||||||
|
// Transfer TC from scheduler TC pool to shared TC pool.
|
||||||
|
let released_tc_addr = shared_pool
|
||||||
|
.0
|
||||||
|
.write()
|
||||||
|
.expect("locking pool failed")
|
||||||
|
.add(tc)
|
||||||
|
.expect("adding TC to shared pool failed");
|
||||||
|
self.sender
|
||||||
|
.send(PacketInPool::new(sender_id, released_tc_addr))
|
||||||
|
.expect("sending TC to TC source failed");
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TcReleaser for mpsc::Sender<PacketAsVec> {
|
||||||
|
fn release(
|
||||||
|
&mut self,
|
||||||
|
sender_id: ComponentId,
|
||||||
|
enabled: bool,
|
||||||
|
_info: &TcInfo,
|
||||||
|
tc: &[u8],
|
||||||
|
) -> bool {
|
||||||
|
if enabled {
|
||||||
|
// Send released TC to centralized TC source.
|
||||||
|
self.send(PacketAsVec::new(sender_id, tc.to_vec()))
|
||||||
|
.expect("sending TC to TC source failed");
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SchedulingService {
|
||||||
|
pub pus_11_handler: PusSchedServiceHandler<
|
||||||
|
MpscTcReceiver,
|
||||||
|
mpsc::Sender<PacketAsVec>,
|
||||||
|
EcssTcInVecConverter,
|
||||||
|
VerificationReporter,
|
||||||
|
PusScheduler,
|
||||||
|
>,
|
||||||
|
pub sched_tc_pool: StaticMemoryPool,
|
||||||
|
pub releaser_buf: [u8; 4096],
|
||||||
|
pub tc_releaser: Box<dyn TcReleaser + Send>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulingService {
|
||||||
|
pub fn release_tcs(&mut self) {
|
||||||
|
let id = self.pus_11_handler.service_helper.id();
|
||||||
|
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
|
||||||
|
self.tc_releaser.release(id, enabled, info, tc)
|
||||||
|
};
|
||||||
|
|
||||||
|
self.pus_11_handler
|
||||||
|
.scheduler_mut()
|
||||||
|
.update_time_from_now()
|
||||||
|
.unwrap();
|
||||||
|
let released_tcs = self
|
||||||
|
.pus_11_handler
|
||||||
|
.scheduler_mut()
|
||||||
|
.release_telecommands_with_buffer(
|
||||||
|
releaser,
|
||||||
|
&mut self.sched_tc_pool,
|
||||||
|
&mut self.releaser_buf,
|
||||||
|
)
|
||||||
|
.expect("releasing TCs failed");
|
||||||
|
if released_tcs > 0 {
|
||||||
|
info!("{released_tcs} TC(s) released from scheduler");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
match self
|
||||||
|
.pus_11_handler
|
||||||
|
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
|
||||||
|
{
|
||||||
|
Ok(result) => match result {
|
||||||
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
|
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
||||||
|
warn!("PUS11 partial packet handling success: {e:?}")
|
||||||
|
}
|
||||||
|
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
||||||
|
warn!("PUS11 invalid subservice {invalid}");
|
||||||
|
}
|
||||||
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
|
warn!("PUS11: Subservice {subservice} not implemented");
|
||||||
|
}
|
||||||
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
|
},
|
||||||
|
Err(error) => {
|
||||||
|
error!("PUS packet handling error: {error:?}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
HandlingStatus::HandledOne
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_scheduler_service(
|
||||||
|
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||||
|
tc_source_sender: mpsc::Sender<PacketAsVec>,
|
||||||
|
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||||
|
sched_tc_pool: StaticMemoryPool,
|
||||||
|
) -> SchedulingService {
|
||||||
|
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
|
||||||
|
.expect("Creating PUS Scheduler failed");
|
||||||
|
let pus_11_handler = PusSchedServiceHandler::new(
|
||||||
|
PusServiceHelper::new(
|
||||||
|
PUS_SCHEDULER_SERVICE.id(),
|
||||||
|
pus_sched_rx,
|
||||||
|
tm_funnel_tx,
|
||||||
|
create_verification_reporter(PUS_SCHEDULER_SERVICE.id(), PUS_SCHEDULER_SERVICE.apid),
|
||||||
|
EcssTcInVecConverter::default(),
|
||||||
|
),
|
||||||
|
scheduler,
|
||||||
|
);
|
||||||
|
SchedulingService {
|
||||||
|
pus_11_handler,
|
||||||
|
sched_tc_pool,
|
||||||
|
releaser_buf: [0; 4096],
|
||||||
|
tc_releaser: Box::new(tc_source_sender),
|
||||||
|
}
|
||||||
|
}
|
@ -3,7 +3,7 @@ use crate::pus::HandlingStatus;
|
|||||||
use derive_new::new;
|
use derive_new::new;
|
||||||
use satrs::spacepackets::time::{cds, TimeWriter};
|
use satrs::spacepackets::time::{cds, TimeWriter};
|
||||||
|
|
||||||
use super::{action::ActionServiceWrapper, TargetedPusService};
|
use super::{action::ActionServiceWrapper, scheduler::SchedulingService, TargetedPusService};
|
||||||
|
|
||||||
// use super::{
|
// use super::{
|
||||||
// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
|
// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
|
||||||
@ -17,7 +17,7 @@ pub struct PusStack {
|
|||||||
// hk_srv_wrapper: HkServiceWrapper<TmSender, TcInMemConverter>,
|
// hk_srv_wrapper: HkServiceWrapper<TmSender, TcInMemConverter>,
|
||||||
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
|
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
|
||||||
action_srv_wrapper: ActionServiceWrapper,
|
action_srv_wrapper: ActionServiceWrapper,
|
||||||
// schedule_srv: SchedulingServiceWrapper<TmSender, TcInMemConverter>,
|
schedule_srv: SchedulingService,
|
||||||
// mode_srv: ModeServiceWrapper<TmSender, TcInMemConverter>,
|
// mode_srv: ModeServiceWrapper<TmSender, TcInMemConverter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,7 +25,7 @@ impl PusStack {
|
|||||||
pub fn periodic_operation(&mut self) {
|
pub fn periodic_operation(&mut self) {
|
||||||
// Release all telecommands which reached their release time before calling the service
|
// Release all telecommands which reached their release time before calling the service
|
||||||
// handlers.
|
// handlers.
|
||||||
// self.schedule_srv.release_tcs();
|
self.schedule_srv.release_tcs();
|
||||||
let time_stamp = cds::CdsTime::now_with_u16_days()
|
let time_stamp = cds::CdsTime::now_with_u16_days()
|
||||||
.expect("time stamp generation error")
|
.expect("time stamp generation error")
|
||||||
.to_vec()
|
.to_vec()
|
||||||
@ -48,7 +48,11 @@ impl PusStack {
|
|||||||
self.test_srv.poll_and_handle_next_packet(&time_stamp),
|
self.test_srv.poll_and_handle_next_packet(&time_stamp),
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
// is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None);
|
is_srv_finished(
|
||||||
|
11,
|
||||||
|
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
|
||||||
|
None,
|
||||||
|
);
|
||||||
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
||||||
is_srv_finished(
|
is_srv_finished(
|
||||||
8,
|
8,
|
||||||
|
Loading…
Reference in New Issue
Block a user