use crate::logger::LOGFILE_PATH; use num_enum::TryFromPrimitive; use ops_sat_rs::config::{ action_err::INVALID_ACTION_ID, HOME_FOLDER_EXPERIMENT, HOME_PATH, STOP_FILE_NAME, TO_GROUND_FOLDER_EXPERIMENT, }; use satrs::action::ActionRequestVariant; use satrs::{ action::ActionRequest, params::Params, pus::action::{ActionReplyPus, ActionReplyVariant}, request::{GenericMessage, MessageMetadata}, res_code::ResultU16, }; use serde::{Deserialize, Serialize}; use std::{ env::temp_dir, path::{Path, PathBuf}, process::Command, sync::{atomic::AtomicBool, mpsc, Arc}, }; use ops_sat_rs::config::ctrl_err::{ SHELL_CMD_EXECUTION_FAILURE, SHELL_CMD_INVALID_FORMAT, SHELL_CMD_IO_ERROR, }; use crate::requests::CompositeRequest; #[derive(Serialize, Deserialize, Debug)] pub struct ShellCmd<'a> { cmd: &'a str, args: Vec<&'a str>, } #[derive(Debug, Clone, Copy, TryFromPrimitive)] #[repr(u32)] pub enum ActionId { StopExperiment = 1, DownlinkLogfile = 2, DownlinkImages = 3, ExecuteShellCommandBlocking = 4, } pub struct ExperimentController { pub composite_request_rx: mpsc::Receiver>, pub action_reply_tx: mpsc::Sender>, pub stop_signal: Arc, home_path_stop_file: PathBuf, tmp_path_stop_file: PathBuf, } impl ExperimentController { pub fn new( composite_request_rx: mpsc::Receiver>, action_reply_tx: mpsc::Sender>, stop_signal: Arc, ) -> Self { let mut home_path_stop_file = PathBuf::new(); home_path_stop_file.push(HOME_PATH.as_path()); home_path_stop_file.push(STOP_FILE_NAME); let mut tmp_path_stop_file = temp_dir(); tmp_path_stop_file.push(STOP_FILE_NAME); Self { composite_request_rx, action_reply_tx, stop_signal, home_path_stop_file, tmp_path_stop_file, } } } impl ExperimentController { pub fn perform_operation(&mut self) { match self.composite_request_rx.try_recv() { Ok(msg) => match msg.message { CompositeRequest::Hk(_) => { log::warn!("hk request handling unimplemented") } CompositeRequest::Action(action_req) => { self.handle_action_request(msg.requestor_info, action_req); } }, Err(e) => { if e != mpsc::TryRecvError::Empty { log::error!("composite request rx error: {:?}", e); } } } self.check_stop_file(); } pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) { let send_completion_failure = |error_code: ResultU16, params: Option| { let result = self.action_reply_tx.send(GenericMessage::new_action_reply( requestor, action_req.action_id, ActionReplyVariant::CompletionFailed { error_code, params }, )); if result.is_err() { log::error!("sending action reply failed"); } }; let action_id = ActionId::try_from(action_req.action_id); if action_id.is_err() { send_completion_failure(INVALID_ACTION_ID, None); return; } match action_id.unwrap() { ActionId::StopExperiment => { self.stop_signal .store(true, std::sync::atomic::Ordering::Relaxed); self.send_completion_success(&requestor, &action_req); } ActionId::ExecuteShellCommandBlocking => { self.handle_shell_command_execution(&requestor, &action_req); } ActionId::DownlinkLogfile => self.handle_downlink_logfile(&requestor, &action_req), // downlink images, default will be the last image, otherwise specified counting down (2 = second to last image, etc.) ActionId::DownlinkImages => { log::info!("Copying images into low priority downlink folder"); if let Ok(image_path) = match action_req.variant { ActionRequestVariant::VecData(data) => { let index = data[0]; get_latest_image(index as usize) } _ => get_latest_image(0), } { if let Ok(image_path) = ::clone(&image_path) .into_os_string() .into_string() { if std::fs::copy(image_path, TO_GROUND_FOLDER_EXPERIMENT).is_err() { log::error!("Copying logfile into downlink path failed") } } } } } } pub fn handle_downlink_logfile(&self, requestor: &MessageMetadata, action_req: &ActionRequest) { log::info!("copying logfile into downlink folder"); if let Some(logfile_path) = LOGFILE_PATH.get() { if let Ok(logfile_path) = ::clone(logfile_path) .into_os_string() .into_string() { if std::fs::copy(logfile_path.as_str(), TO_GROUND_FOLDER_EXPERIMENT).is_err() { log::warn!("copying logfile into downlink path failed") } self.send_completion_success(requestor, action_req) } } else { log::warn!("downlink path emtpy") } } pub fn send_completion_success(&self, requestor: &MessageMetadata, action_req: &ActionRequest) { let result = self.action_reply_tx.send(GenericMessage::new_action_reply( *requestor, action_req.action_id, ActionReplyVariant::Completed, )); if result.is_err() { log::error!("sending action reply failed"); } } pub fn send_completion_failure( &self, requestor: &MessageMetadata, action_req: &ActionRequest, error_code: ResultU16, params: Option, ) { let result = self.action_reply_tx.send(GenericMessage::new_action_reply( *requestor, action_req.action_id, ActionReplyVariant::CompletionFailed { error_code, params }, )); if result.is_err() { log::error!("sending action reply failed"); } } pub fn handle_shell_command_execution( &self, requestor: &MessageMetadata, action_req: &ActionRequest, ) { if let ActionRequestVariant::VecData(data) = &action_req.variant { let shell_cmd_result: serde_json::Result = serde_json::from_slice(data); match shell_cmd_result { Ok(shell_cmd) => { log::info!("executing shell cmd {:?}", shell_cmd); match Command::new(shell_cmd.cmd).args(shell_cmd.args).status() { Ok(status) => { if status.success() { self.send_completion_success(requestor, action_req); } else { log::warn!("execution of command failed: {}", status); self.send_completion_failure( requestor, action_req, SHELL_CMD_EXECUTION_FAILURE, Some(status.to_string().into()), ); } } Err(e) => { log::warn!("execution of command failed with IO error: {}", e); self.send_completion_failure( requestor, action_req, SHELL_CMD_IO_ERROR, Some(e.to_string().into()), ); } } } Err(e) => { log::warn!("failed to deserialize shell command: {}", e); let result = self.action_reply_tx.send(GenericMessage::new_action_reply( *requestor, action_req.action_id, ActionReplyVariant::Completed, )); if result.is_err() { log::error!("Sending action reply failed"); } } } } else { log::warn!("no shell command was supplied for shell command action command"); self.send_completion_failure(requestor, action_req, SHELL_CMD_INVALID_FORMAT, None); } } pub fn check_stop_file(&self) { let check_at_path = |path: &Path| { if path.exists() { log::warn!( "Detected stop file name at {:?}. Initiating experiment shutdown", path ); // By default, clear the stop file. let result = std::fs::remove_file(path); if result.is_err() { log::error!( "failed to remove stop file at {:?}: {}", path, result.unwrap_err() ); } self.stop_signal .store(true, std::sync::atomic::Ordering::Relaxed); } }; check_at_path(self.tmp_path_stop_file.as_path()); check_at_path(self.home_path_stop_file.as_path()); } } // TODO no idea if this works in any way shape or form pub fn get_latest_image(index: usize) -> Result { // Get the most recently modified file let mut png_files = std::fs::read_dir(HOME_FOLDER_EXPERIMENT)? .flatten() .filter(|f| match f.metadata() { Ok(metadata) => metadata.is_file(), Err(_) => false, }) .filter(|f| match f.file_name().into_string() { Ok(name) => name.ends_with(".png"), Err(_) => false, }) .collect::>(); png_files.sort_by_key(|x| match x.metadata() { Ok(metadata) => { if let Ok(time) = metadata.modified() { time } else { std::time::SystemTime::UNIX_EPOCH } } Err(_) => std::time::SystemTime::UNIX_EPOCH, }); png_files.reverse(); if let Some(png) = png_files.into_iter().nth(index) { return Ok(png.path()); } Err(std::io::Error::other("No latest image found")) } #[cfg(test)] mod tests { use std::sync::{mpsc, Arc}; use tempfile::NamedTempFile; use super::*; fn init() { env_logger::builder().is_test(true).init(); } #[test] fn test_shell_cmd_exection() { init(); let (composite_req_tx, composite_req_rx) = mpsc::channel(); let (action_reply_tx, action_reply_rx) = mpsc::channel(); let stop_signal = Arc::default(); let mut exp_ctrl = ExperimentController::new(composite_req_rx, action_reply_tx, stop_signal); let named_temp_file = NamedTempFile::new().expect("creating temp file failed"); let args = vec![named_temp_file .path() .to_str() .expect("converting path to str failed")]; let cmd = ShellCmd { cmd: "rm", args }; let cmd_serialized = serde_json::to_string(&cmd).expect("serialization failed"); let action_req = satrs::action::ActionRequest { action_id: ActionId::ExecuteShellCommandBlocking as u32, variant: satrs::action::ActionRequestVariant::VecData(cmd_serialized.into_bytes()), }; composite_req_tx .send(GenericMessage::new( MessageMetadata::new(1, 2), CompositeRequest::Action(action_req), )) .expect("sending action request failed"); exp_ctrl.perform_operation(); assert!(!named_temp_file.path().exists()); let action_reply = action_reply_rx .try_recv() .expect("receiving action reply failed"); assert_eq!( action_reply.message.action_id, ActionId::ExecuteShellCommandBlocking as u32 ); match action_reply.message.variant { ActionReplyVariant::Completed => (), _ => { panic!( "unexecpted action reply variant {:?}", action_reply.message.variant ) } } } }