ops-sat-rs/src/controller.rs

129 lines
4.2 KiB
Rust
Raw Normal View History

2024-04-10 15:37:24 +02:00
use num_enum::TryFromPrimitive;
use satrs::{
action::ActionRequest,
2024-04-15 12:16:01 +02:00
pus::action::{ActionReplyPus, ActionReplyVariant},
2024-04-10 15:37:24 +02:00
request::{GenericMessage, MessageMetadata},
};
2024-04-10 17:03:56 +02:00
use std::{
env::temp_dir,
path::{Path, PathBuf},
sync::{atomic::AtomicBool, mpsc, Arc},
};
2024-04-10 15:37:24 +02:00
2024-04-10 17:13:29 +02:00
use ops_sat_rs::config::{action_err::INVALID_ACTION_ID, HOME_PATH, STOP_FILE_NAME};
2024-04-10 15:37:24 +02:00
use crate::requests::CompositeRequest;
#[derive(Debug, Clone, Copy, TryFromPrimitive)]
#[repr(u32)]
pub enum ActionId {
StopExperiment = 1,
}
pub struct ExperimentController {
pub composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
2024-04-15 12:16:01 +02:00
pub action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
2024-04-10 15:37:24 +02:00
pub stop_signal: Arc<AtomicBool>,
2024-04-10 17:03:56 +02:00
home_path_stop_file: PathBuf,
tmp_path_stop_file: PathBuf,
}
impl ExperimentController {
pub fn new(
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
2024-04-15 12:16:01 +02:00
action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
2024-04-10 17:03:56 +02:00
stop_signal: Arc<AtomicBool>,
) -> Self {
let mut home_path_stop_file = PathBuf::new();
2024-04-10 17:13:29 +02:00
home_path_stop_file.push(HOME_PATH.as_path());
2024-04-10 17:03:56 +02:00
home_path_stop_file.push(STOP_FILE_NAME);
let mut tmp_path_stop_file = temp_dir();
tmp_path_stop_file.push(STOP_FILE_NAME);
Self {
composite_request_rx,
action_reply_tx,
stop_signal,
home_path_stop_file,
tmp_path_stop_file,
}
}
2024-04-10 15:37:24 +02:00
}
impl ExperimentController {
pub fn perform_operation(&mut self) {
match self.composite_request_rx.try_recv() {
Ok(msg) => match msg.message {
2024-04-10 15:44:39 +02:00
CompositeRequest::Hk(_) => {
log::warn!("hk request handling unimplemented")
}
2024-04-10 15:37:24 +02:00
CompositeRequest::Action(action_req) => {
self.handle_action_request(msg.requestor_info, action_req);
}
},
2024-04-10 15:44:39 +02:00
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::error!("composite request rx error: {:?}", e);
}
}
2024-04-10 15:37:24 +02:00
}
self.check_stop_file();
}
pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) {
let action_id = ActionId::try_from(action_req.action_id);
if action_id.is_err() {
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
requestor,
action_req.action_id,
ActionReplyVariant::CompletionFailed {
error_code: INVALID_ACTION_ID,
params: None,
},
));
if result.is_err() {
log::error!("sending action reply failed");
}
return;
}
let action_id = action_id.unwrap();
match action_id {
ActionId::StopExperiment => {
self.stop_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
requestor,
action_req.action_id,
ActionReplyVariant::Completed,
));
if result.is_err() {
log::error!("sending action reply failed");
}
}
}
}
pub fn check_stop_file(&self) {
2024-04-10 17:03:56 +02:00
let check_at_path = |path: &Path| {
if path.exists() {
log::warn!(
2024-04-10 17:13:29 +02:00
"Detected stop file name at {:?}. Initiating experiment shutdown",
path
2024-04-10 17:03:56 +02:00
);
2024-04-10 17:13:29 +02:00
// By default, clear the stop file.
let result = std::fs::remove_file(path);
if result.is_err() {
log::error!(
"failed to remove stop file at {:?}: {}",
path,
result.unwrap_err()
);
}
2024-04-10 17:03:56 +02:00
self.stop_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
}
};
check_at_path(self.tmp_path_stop_file.as_path());
check_at_path(self.home_path_stop_file.as_path());
2024-04-10 15:37:24 +02:00
}
}