2024-04-10 15:37:24 +02:00
|
|
|
use num_enum::TryFromPrimitive;
|
|
|
|
use satrs::{
|
2024-04-25 01:20:54 +02:00
|
|
|
action::{ActionRequest, ActionRequestVariant},
|
|
|
|
params::Params,
|
2024-04-15 12:16:01 +02:00
|
|
|
pus::action::{ActionReplyPus, ActionReplyVariant},
|
2024-04-10 15:37:24 +02:00
|
|
|
request::{GenericMessage, MessageMetadata},
|
2024-04-25 01:20:54 +02:00
|
|
|
res_code::ResultU16,
|
2024-04-10 15:37:24 +02:00
|
|
|
};
|
2024-04-25 16:45:00 +02:00
|
|
|
use serde::{Deserialize, Serialize};
|
2024-04-10 17:03:56 +02:00
|
|
|
use std::{
|
|
|
|
env::temp_dir,
|
|
|
|
path::{Path, PathBuf},
|
2024-04-25 01:20:54 +02:00
|
|
|
process::Command,
|
2024-04-10 17:03:56 +02:00
|
|
|
sync::{atomic::AtomicBool, mpsc, Arc},
|
|
|
|
};
|
2024-04-10 15:37:24 +02:00
|
|
|
|
2024-04-25 01:20:54 +02:00
|
|
|
use ops_sat_rs::config::{
|
|
|
|
action_err::INVALID_ACTION_ID,
|
|
|
|
ctrl_err::{SHELL_CMD_EXECUTION_FAILURE, SHELL_CMD_INVALID_FORMAT, SHELL_CMD_IO_ERROR},
|
|
|
|
HOME_PATH, STOP_FILE_NAME,
|
|
|
|
};
|
2024-04-10 15:37:24 +02:00
|
|
|
|
|
|
|
use crate::requests::CompositeRequest;
|
|
|
|
|
2024-04-25 16:45:00 +02:00
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
2024-04-25 17:11:52 +02:00
|
|
|
pub struct ShellCmd<'a> {
|
|
|
|
cmd: &'a str,
|
|
|
|
args: Vec<&'a str>,
|
2024-04-25 16:45:00 +02:00
|
|
|
}
|
|
|
|
|
2024-04-10 15:37:24 +02:00
|
|
|
#[derive(Debug, Clone, Copy, TryFromPrimitive)]
|
|
|
|
#[repr(u32)]
|
|
|
|
pub enum ActionId {
|
|
|
|
StopExperiment = 1,
|
2024-04-25 01:20:54 +02:00
|
|
|
ExecuteShellCommandBlocking = 2,
|
2024-04-10 15:37:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
pub struct ExperimentController {
|
|
|
|
pub composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
|
2024-04-15 12:16:01 +02:00
|
|
|
pub action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
|
2024-04-10 15:37:24 +02:00
|
|
|
pub stop_signal: Arc<AtomicBool>,
|
2024-04-10 17:03:56 +02:00
|
|
|
home_path_stop_file: PathBuf,
|
|
|
|
tmp_path_stop_file: PathBuf,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ExperimentController {
|
|
|
|
pub fn new(
|
|
|
|
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
|
2024-04-15 12:16:01 +02:00
|
|
|
action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
|
2024-04-10 17:03:56 +02:00
|
|
|
stop_signal: Arc<AtomicBool>,
|
|
|
|
) -> Self {
|
|
|
|
let mut home_path_stop_file = PathBuf::new();
|
2024-04-10 17:13:29 +02:00
|
|
|
home_path_stop_file.push(HOME_PATH.as_path());
|
2024-04-10 17:03:56 +02:00
|
|
|
home_path_stop_file.push(STOP_FILE_NAME);
|
|
|
|
let mut tmp_path_stop_file = temp_dir();
|
|
|
|
tmp_path_stop_file.push(STOP_FILE_NAME);
|
|
|
|
Self {
|
|
|
|
composite_request_rx,
|
|
|
|
action_reply_tx,
|
|
|
|
stop_signal,
|
|
|
|
home_path_stop_file,
|
|
|
|
tmp_path_stop_file,
|
|
|
|
}
|
|
|
|
}
|
2024-04-10 15:37:24 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ExperimentController {
|
|
|
|
pub fn perform_operation(&mut self) {
|
|
|
|
match self.composite_request_rx.try_recv() {
|
|
|
|
Ok(msg) => match msg.message {
|
2024-04-10 15:44:39 +02:00
|
|
|
CompositeRequest::Hk(_) => {
|
|
|
|
log::warn!("hk request handling unimplemented")
|
|
|
|
}
|
2024-04-10 15:37:24 +02:00
|
|
|
CompositeRequest::Action(action_req) => {
|
|
|
|
self.handle_action_request(msg.requestor_info, action_req);
|
|
|
|
}
|
|
|
|
},
|
2024-04-10 15:44:39 +02:00
|
|
|
Err(e) => {
|
|
|
|
if e != mpsc::TryRecvError::Empty {
|
|
|
|
log::error!("composite request rx error: {:?}", e);
|
|
|
|
}
|
|
|
|
}
|
2024-04-10 15:37:24 +02:00
|
|
|
}
|
|
|
|
self.check_stop_file();
|
|
|
|
}
|
|
|
|
|
2024-04-25 01:20:54 +02:00
|
|
|
pub fn send_completion_success(&self, requestor: &MessageMetadata, action_req: &ActionRequest) {
|
|
|
|
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
|
|
|
|
*requestor,
|
|
|
|
action_req.action_id,
|
|
|
|
ActionReplyVariant::Completed,
|
|
|
|
));
|
|
|
|
if result.is_err() {
|
|
|
|
log::error!("sending action reply failed");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn send_completion_failure(
|
|
|
|
&self,
|
|
|
|
requestor: &MessageMetadata,
|
|
|
|
action_req: &ActionRequest,
|
|
|
|
error_code: ResultU16,
|
|
|
|
params: Option<Params>,
|
|
|
|
) {
|
|
|
|
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
|
|
|
|
*requestor,
|
|
|
|
action_req.action_id,
|
|
|
|
ActionReplyVariant::CompletionFailed { error_code, params },
|
|
|
|
));
|
|
|
|
if result.is_err() {
|
|
|
|
log::error!("sending action reply failed");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-10 15:37:24 +02:00
|
|
|
pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) {
|
2024-04-25 01:20:54 +02:00
|
|
|
let send_completion_failure = |error_code: ResultU16, params: Option<Params>| {
|
2024-04-10 15:37:24 +02:00
|
|
|
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
|
|
|
|
requestor,
|
|
|
|
action_req.action_id,
|
2024-04-25 01:20:54 +02:00
|
|
|
ActionReplyVariant::CompletionFailed { error_code, params },
|
2024-04-10 15:37:24 +02:00
|
|
|
));
|
|
|
|
if result.is_err() {
|
|
|
|
log::error!("sending action reply failed");
|
|
|
|
}
|
2024-04-25 01:20:54 +02:00
|
|
|
};
|
|
|
|
let action_id = ActionId::try_from(action_req.action_id);
|
|
|
|
if action_id.is_err() {
|
|
|
|
send_completion_failure(INVALID_ACTION_ID, None);
|
2024-04-10 15:37:24 +02:00
|
|
|
return;
|
|
|
|
}
|
2024-04-25 01:20:54 +02:00
|
|
|
match action_id.unwrap() {
|
2024-04-10 15:37:24 +02:00
|
|
|
ActionId::StopExperiment => {
|
|
|
|
self.stop_signal
|
|
|
|
.store(true, std::sync::atomic::Ordering::Relaxed);
|
2024-04-25 01:20:54 +02:00
|
|
|
self.send_completion_success(&requestor, &action_req);
|
|
|
|
}
|
|
|
|
ActionId::ExecuteShellCommandBlocking => {
|
|
|
|
self.handle_shell_command_execution(&requestor, &action_req);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn handle_shell_command_execution(
|
|
|
|
&self,
|
|
|
|
requestor: &MessageMetadata,
|
|
|
|
action_req: &ActionRequest,
|
|
|
|
) {
|
|
|
|
if let ActionRequestVariant::VecData(data) = &action_req.variant {
|
2024-04-25 16:45:00 +02:00
|
|
|
let shell_cmd_result: serde_json::Result<ShellCmd> = serde_json::from_slice(data);
|
|
|
|
match shell_cmd_result {
|
|
|
|
Ok(shell_cmd) => {
|
|
|
|
log::info!("executing shell cmd {:?}", shell_cmd);
|
|
|
|
match Command::new(shell_cmd.cmd).args(shell_cmd.args).status() {
|
2024-04-25 01:20:54 +02:00
|
|
|
Ok(status) => {
|
|
|
|
if status.success() {
|
|
|
|
self.send_completion_success(requestor, action_req);
|
|
|
|
} else {
|
|
|
|
log::warn!("execution of command failed: {}", status);
|
|
|
|
self.send_completion_failure(
|
|
|
|
requestor,
|
|
|
|
action_req,
|
|
|
|
SHELL_CMD_EXECUTION_FAILURE,
|
|
|
|
Some(status.to_string().into()),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
log::warn!("execution of command failed with IO error: {}", e);
|
|
|
|
self.send_completion_failure(
|
|
|
|
requestor,
|
|
|
|
action_req,
|
|
|
|
SHELL_CMD_IO_ERROR,
|
|
|
|
Some(e.to_string().into()),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
2024-04-25 16:45:00 +02:00
|
|
|
log::warn!("failed to deserialize shell command: {}", e);
|
2024-04-10 15:37:24 +02:00
|
|
|
}
|
|
|
|
}
|
2024-04-25 16:45:00 +02:00
|
|
|
} else {
|
|
|
|
log::warn!("no shell command was supplied for shell command action command");
|
|
|
|
self.send_completion_failure(requestor, action_req, SHELL_CMD_INVALID_FORMAT, None);
|
2024-04-10 15:37:24 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn check_stop_file(&self) {
|
2024-04-10 17:03:56 +02:00
|
|
|
let check_at_path = |path: &Path| {
|
|
|
|
if path.exists() {
|
|
|
|
log::warn!(
|
2024-04-10 17:13:29 +02:00
|
|
|
"Detected stop file name at {:?}. Initiating experiment shutdown",
|
|
|
|
path
|
2024-04-10 17:03:56 +02:00
|
|
|
);
|
2024-04-10 17:13:29 +02:00
|
|
|
// By default, clear the stop file.
|
|
|
|
let result = std::fs::remove_file(path);
|
|
|
|
if result.is_err() {
|
|
|
|
log::error!(
|
|
|
|
"failed to remove stop file at {:?}: {}",
|
|
|
|
path,
|
|
|
|
result.unwrap_err()
|
|
|
|
);
|
|
|
|
}
|
2024-04-10 17:03:56 +02:00
|
|
|
self.stop_signal
|
|
|
|
.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
check_at_path(self.tmp_path_stop_file.as_path());
|
|
|
|
check_at_path(self.home_path_stop_file.as_path());
|
2024-04-10 15:37:24 +02:00
|
|
|
}
|
|
|
|
}
|
2024-04-25 16:45:00 +02:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use std::sync::{mpsc, Arc};
|
|
|
|
|
|
|
|
use tempfile::NamedTempFile;
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
fn init() {
|
|
|
|
env_logger::builder().is_test(true).init();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_shell_cmd_exection() {
|
|
|
|
init();
|
|
|
|
let (composite_req_tx, composite_req_rx) = mpsc::channel();
|
|
|
|
let (action_reply_tx, action_reply_rx) = mpsc::channel();
|
|
|
|
let stop_signal = Arc::default();
|
|
|
|
let mut exp_ctrl =
|
|
|
|
ExperimentController::new(composite_req_rx, action_reply_tx, stop_signal);
|
|
|
|
let named_temp_file = NamedTempFile::new().expect("creating temp file failed");
|
2024-04-25 17:11:52 +02:00
|
|
|
let args = vec![named_temp_file
|
|
|
|
.path()
|
|
|
|
.to_str()
|
|
|
|
.expect("converting path to str failed")];
|
|
|
|
|
|
|
|
let cmd = ShellCmd { cmd: "rm", args };
|
2024-04-25 16:45:00 +02:00
|
|
|
let cmd_serialized = serde_json::to_string(&cmd).expect("serialization failed");
|
|
|
|
let action_req = satrs::action::ActionRequest {
|
|
|
|
action_id: ActionId::ExecuteShellCommandBlocking as u32,
|
|
|
|
variant: satrs::action::ActionRequestVariant::VecData(cmd_serialized.into_bytes()),
|
|
|
|
};
|
|
|
|
composite_req_tx
|
|
|
|
.send(GenericMessage::new(
|
|
|
|
MessageMetadata::new(1, 2),
|
|
|
|
CompositeRequest::Action(action_req),
|
|
|
|
))
|
|
|
|
.expect("sending action request failed");
|
|
|
|
exp_ctrl.perform_operation();
|
|
|
|
assert!(!named_temp_file.path().exists());
|
|
|
|
let action_reply = action_reply_rx
|
|
|
|
.try_recv()
|
|
|
|
.expect("receiving action reply failed");
|
|
|
|
assert_eq!(
|
|
|
|
action_reply.message.action_id,
|
|
|
|
ActionId::ExecuteShellCommandBlocking as u32
|
|
|
|
);
|
|
|
|
match action_reply.message.variant {
|
|
|
|
ActionReplyVariant::Completed => (),
|
|
|
|
_ => {
|
|
|
|
panic!(
|
|
|
|
"unexecpted action reply variant {:?}",
|
|
|
|
action_reply.message.variant
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|