Add action service and controller component #5

Merged
muellerr merged 11 commits from add-action-service-controller-obj into main 2024-04-13 11:19:13 +02:00
4 changed files with 113 additions and 14 deletions
Showing only changes of commit 5d87cab9cc - Show all commits

8
Cargo.lock generated
View File

@ -589,7 +589,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]] [[package]]
name = "satrs" name = "satrs"
version = "0.2.0-rc.0" version = "0.2.0-rc.0"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
dependencies = [ dependencies = [
"bus", "bus",
"cobs", "cobs",
@ -614,7 +614,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-mib" name = "satrs-mib"
version = "0.1.1" version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
dependencies = [ dependencies = [
"csv", "csv",
"satrs-mib-codegen", "satrs-mib-codegen",
@ -626,7 +626,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-mib-codegen" name = "satrs-mib-codegen"
version = "0.1.1" version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -636,7 +636,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-shared" name = "satrs-shared"
version = "0.1.3" version = "0.1.3"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
dependencies = [ dependencies = [
"serde", "serde",
"spacepackets", "spacepackets",

View File

@ -22,6 +22,7 @@ pub enum GroupId {
Tmtc = 0, Tmtc = 0,
Hk = 1, Hk = 1,
Mode = 2, Mode = 2,
Action = 3,
} }
pub mod tmtc_err { pub mod tmtc_err {
@ -60,6 +61,16 @@ pub mod tmtc_err {
]; ];
} }
pub mod action_err {
use super::*;
use satrs::res_code::ResultU16;
#[resultcode]
pub const INVALID_ACTION_ID: ResultU16 = ResultU16::new(GroupId::Action as u8, 0);
pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT];
}
pub mod components { pub mod components {
use satrs::request::UniqueApidTargetId; use satrs::request::UniqueApidTargetId;

84
src/controller.rs Normal file
View File

@ -0,0 +1,84 @@
use derive_new::new;
use num_enum::TryFromPrimitive;
use satrs::{
action::ActionRequest,
pus::action::{ActionReplyVariant, PusActionReply},
request::{GenericMessage, MessageMetadata},
};
use std::sync::{atomic::AtomicBool, mpsc, Arc};
use ops_sat_rs::config::{action_err::INVALID_ACTION_ID, STOP_FILE_NAME};
use crate::requests::CompositeRequest;
#[derive(Debug, Clone, Copy, TryFromPrimitive)]
#[repr(u32)]
pub enum ActionId {
StopExperiment = 1,
}
#[derive(new)]
pub struct ExperimentController {
pub composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
pub action_reply_tx: mpsc::Sender<GenericMessage<PusActionReply>>,
pub stop_signal: Arc<AtomicBool>,
}
impl ExperimentController {
pub fn perform_operation(&mut self) {
match self.composite_request_rx.try_recv() {
Ok(msg) => match msg.message {
CompositeRequest::Hk(_) => todo!(),
CompositeRequest::Action(action_req) => {
self.handle_action_request(msg.requestor_info, action_req);
}
},
Err(_) => todo!(),
}
self.check_stop_file();
}
pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) {
let action_id = ActionId::try_from(action_req.action_id);
if action_id.is_err() {
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
requestor,
action_req.action_id,
ActionReplyVariant::CompletionFailed {
error_code: INVALID_ACTION_ID,
params: None,
},
));
if result.is_err() {
log::error!("sending action reply failed");
}
return;
}
let action_id = action_id.unwrap();
match action_id {
ActionId::StopExperiment => {
self.stop_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
requestor,
action_req.action_id,
ActionReplyVariant::Completed,
));
if result.is_err() {
log::error!("sending action reply failed");
}
}
}
}
pub fn check_stop_file(&self) {
if std::path::Path::new(STOP_FILE_NAME).exists() {
log::warn!(
"Detected stop file name at {}. Initiating experiment shutdown",
STOP_FILE_NAME
);
self.stop_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
}

View File

@ -9,7 +9,7 @@ use log::info;
use ops_sat_rs::config::{ use ops_sat_rs::config::{
components::CONTROLLER_ID, components::CONTROLLER_ID,
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
EXPERIMENT_APID, STOP_FILE_NAME, EXPERIMENT_APID,
}; };
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
use satrs::{ use satrs::{
@ -18,10 +18,10 @@ use satrs::{
tmtc::CcsdsDistributor, tmtc::CcsdsDistributor,
}; };
use crate::pus::test::create_test_service;
use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::tmtc::tm_funnel::TmFunnelDynamic; use crate::tmtc::tm_funnel::TmFunnelDynamic;
use crate::tmtc::TcSourceTaskDynamic; use crate::tmtc::TcSourceTaskDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service};
use crate::{ use crate::{
interface::tcp::{SyncTcpTmSource, TcpTask}, interface::tcp::{SyncTcpTmSource, TcpTask},
interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, interface::udp::{DynamicUdpTmHandler, UdpTmtcServer},
@ -34,6 +34,7 @@ use crate::{
requests::GenericRequestRouter, requests::GenericRequestRouter,
}; };
mod controller;
mod interface; mod interface;
mod logger; mod logger;
mod pus; mod pus;
@ -59,10 +60,11 @@ fn main() {
let (pus_action_tx, pus_action_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel();
// let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
// let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
// let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
let (controller_composite_tx, _controller_composite_rx) = mpsc::channel(); let (controller_composite_tx, controller_composite_rx) = mpsc::channel();
// let (controller_action_reply_tx, controller_action_reply_rx) = mpsc::channel();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID. // Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default(); let mut request_map = GenericRequestRouter::default();
@ -157,17 +159,19 @@ fn main() {
stop_signal.clone(), stop_signal.clone(),
); );
let mut controller = ExperimentController::new(
controller_composite_rx,
pus_action_reply_tx,
stop_signal.clone(),
);
info!("Starting CTRL task"); info!("Starting CTRL task");
let ctrl_stop_signal = stop_signal.clone(); let ctrl_stop_signal = stop_signal.clone();
let jh_ctrl_thread = thread::Builder::new() let jh_ctrl_thread = thread::Builder::new()
.name("ops-sat ctrl".to_string()) .name("ops-sat ctrl".to_string())
.spawn(move || loop { .spawn(move || loop {
if std::path::Path::new(STOP_FILE_NAME).exists() { controller.perform_operation();
log::warn!( if ctrl_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
"Detected stop file name at {}. Initiating experiment shutdown",
STOP_FILE_NAME
);
ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed);
break; break;
} }
thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); thread::sleep(Duration::from_millis(FREQ_MS_CTRL));