use crate::logger::LOGFILE_PATH; use num_enum::TryFromPrimitive; use ops_sat_rs::config::{action_err::INVALID_ACTION_ID, HOME_FOLDER_EXPERIMENT}; use ops_sat_rs::config::{ HOME_PATH, STOP_FILE_NAME, TO_GROUND_FOLDER_DIR, TO_GROUND_LP_FOLDER_DIR, }; use satrs::action::ActionRequestVariant; use satrs::{ action::ActionRequest, params::Params, pus::action::{ActionReplyPus, ActionReplyVariant}, request::{GenericMessage, MessageMetadata}, res_code::ResultU16, }; use serde::{Deserialize, Serialize}; use std::env::temp_dir; use std::io; use std::{ path::{Path, PathBuf}, process::Command, sync::{atomic::AtomicBool, mpsc, Arc}, }; use ops_sat_rs::config::ctrl_err::{ FILESYSTEM_COPY_ERROR, INVALID_LOGFILE_PATH, IO_ERROR, SHELL_CMD_EXECUTION_FAILURE, SHELL_CMD_INVALID_FORMAT, SHELL_CMD_IO_ERROR, }; use crate::requests::CompositeRequest; #[derive(Serialize, Deserialize, Debug)] pub struct ShellCmd<'a> { cmd: &'a str, args: Vec<&'a str>, } #[derive(Debug, Clone, Copy, TryFromPrimitive)] #[repr(u32)] pub enum ActionId { StopExperiment = 1, DownlinkLogfile = 2, /// Standard command to download the images made by the camera. It moves all image related /// files inside the home folder into the toGroundLP (low priority to ground download) folder. DownlinkImagesByMoving = 3, ExecuteShellCommandBlocking = 4, } #[derive(Debug)] pub struct ControllerPathCollection { pub stop_file_home_path: PathBuf, pub stop_file_tmp_path: PathBuf, pub to_ground_dir: PathBuf, pub to_ground_low_prio_dir: PathBuf, } impl Default for ControllerPathCollection { fn default() -> Self { let mut home_path_stop_file = PathBuf::new(); home_path_stop_file.push(HOME_PATH.as_path()); home_path_stop_file.push(STOP_FILE_NAME); let mut tmp_path_stop_file = temp_dir(); tmp_path_stop_file.push(STOP_FILE_NAME); Self { stop_file_home_path: home_path_stop_file, stop_file_tmp_path: tmp_path_stop_file, to_ground_dir: TO_GROUND_FOLDER_DIR .get() .expect("to ground directory not set") .clone(), to_ground_low_prio_dir: TO_GROUND_LP_FOLDER_DIR .get() .expect("to ground low prio directory not set") .clone(), } } } pub struct ExperimentController { pub composite_request_rx: mpsc::Receiver>, pub action_reply_tx: mpsc::Sender>, pub stop_signal: Arc, pub paths: ControllerPathCollection, } impl ExperimentController { pub fn new( composite_request_rx: mpsc::Receiver>, action_reply_tx: mpsc::Sender>, stop_signal: Arc, paths: ControllerPathCollection, ) -> Self { Self { composite_request_rx, action_reply_tx, stop_signal, paths, } } } impl ExperimentController { pub fn perform_operation(&mut self) { match self.composite_request_rx.try_recv() { Ok(msg) => match msg.message { CompositeRequest::Hk(_) => { log::warn!("hk request handling unimplemented") } CompositeRequest::Action(action_req) => { self.handle_action_request(msg.requestor_info, action_req); } }, Err(e) => { if e != mpsc::TryRecvError::Empty { log::error!("composite request rx error: {:?}", e); } } } self.check_stop_file(); } pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) { let send_completion_failure = |error_code: ResultU16, params: Option| { let result = self.action_reply_tx.send(GenericMessage::new_action_reply( requestor, action_req.action_id, ActionReplyVariant::CompletionFailed { error_code, params }, )); if result.is_err() { log::error!("sending action reply failed"); } }; let action_id = ActionId::try_from(action_req.action_id); if action_id.is_err() { send_completion_failure(INVALID_ACTION_ID, None); return; } match action_id.unwrap() { ActionId::StopExperiment => { self.stop_signal .store(true, std::sync::atomic::Ordering::Relaxed); self.send_completion_success(&requestor, &action_req); } ActionId::ExecuteShellCommandBlocking => { self.handle_shell_command_execution(&requestor, &action_req); } ActionId::DownlinkLogfile => self.handle_downlink_logfile(&requestor, &action_req), ActionId::DownlinkImagesByMoving => { let result = self.handle_downlink_cam_image_by_moving(&requestor, &action_req); if let Err(e) = result { send_completion_failure(IO_ERROR, Some(e.to_string().into())); } } } } pub fn handle_downlink_cam_image_by_moving( &self, requestor: &MessageMetadata, action_req: &ActionRequest, ) -> io::Result<()> { log::info!("moving images into low priority downlink folder"); let num_moved_files = move_images_inside_home_dir_to_low_prio_ground_dir( &HOME_PATH, &self.paths.to_ground_low_prio_dir, )?; log::info!("moved {} image files", num_moved_files); // TODO: Trigger event containing the number of moved files? self.send_completion_success(requestor, action_req); Ok(()) } pub fn handle_downlink_logfile(&self, requestor: &MessageMetadata, action_req: &ActionRequest) { log::info!("copying logfile into {:?}", self.paths.to_ground_dir); if let Some(logfile_path) = LOGFILE_PATH.get() { self.handle_file_copy( requestor, action_req, logfile_path, &self.paths.to_ground_dir, ) } else { log::error!("downlink path emtpy"); self.send_completion_failure(requestor, action_req, INVALID_LOGFILE_PATH, None); } } pub fn handle_file_copy( &self, requestor: &MessageMetadata, action_req: &ActionRequest, source_path: &Path, target_path: &Path, ) { if let Err(e) = std::fs::copy(source_path, target_path) { log::warn!("copying logfile into downlink path failed: {}", e); self.send_completion_failure( requestor, action_req, FILESYSTEM_COPY_ERROR, Some(e.to_string().into()), ); return; } self.send_completion_success(requestor, action_req) } pub fn send_completion_success(&self, requestor: &MessageMetadata, action_req: &ActionRequest) { let result = self.action_reply_tx.send(GenericMessage::new_action_reply( *requestor, action_req.action_id, ActionReplyVariant::Completed, )); if result.is_err() { log::error!("sending action reply failed"); } } pub fn send_completion_failure( &self, requestor: &MessageMetadata, action_req: &ActionRequest, error_code: ResultU16, params: Option, ) { let result = self.action_reply_tx.send(GenericMessage::new_action_reply( *requestor, action_req.action_id, ActionReplyVariant::CompletionFailed { error_code, params }, )); if result.is_err() { log::error!("sending action reply failed"); } } pub fn handle_shell_command_execution( &self, requestor: &MessageMetadata, action_req: &ActionRequest, ) { if let ActionRequestVariant::VecData(data) = &action_req.variant { let shell_cmd_result: serde_json::Result = serde_json::from_slice(data); match shell_cmd_result { Ok(shell_cmd) => { log::info!("executing shell cmd {:?}", shell_cmd); match Command::new(shell_cmd.cmd).args(shell_cmd.args).status() { Ok(status) => { if status.success() { self.send_completion_success(requestor, action_req); } else { log::warn!("execution of command failed: {}", status); self.send_completion_failure( requestor, action_req, SHELL_CMD_EXECUTION_FAILURE, Some(status.to_string().into()), ); } } Err(e) => { log::warn!("execution of command failed with IO error: {}", e); self.send_completion_failure( requestor, action_req, SHELL_CMD_IO_ERROR, Some(e.to_string().into()), ); } } } Err(e) => { log::warn!("failed to deserialize shell command: {}", e); let result = self.action_reply_tx.send(GenericMessage::new_action_reply( *requestor, action_req.action_id, ActionReplyVariant::Completed, )); if result.is_err() { log::error!("Sending action reply failed"); } } } } else { log::warn!("no shell command was supplied for shell command action command"); self.send_completion_failure(requestor, action_req, SHELL_CMD_INVALID_FORMAT, None); } } pub fn check_stop_file(&self) { let check_at_path = |path: &Path| { if path.exists() { log::warn!( "Detected stop file name at {:?}. Initiating experiment shutdown", path ); // By default, clear the stop file. let result = std::fs::remove_file(path); if result.is_err() { log::error!( "failed to remove stop file at {:?}: {}", path, result.unwrap_err() ); } self.stop_signal .store(true, std::sync::atomic::Ordering::Relaxed); } }; check_at_path(self.paths.stop_file_tmp_path.as_path()); check_at_path(self.paths.stop_file_home_path.as_path()); } } pub fn move_images_inside_home_dir_to_low_prio_ground_dir( home_dir: &Path, low_prio_target_dir: &Path, ) -> io::Result { let mut moved_files = 0; for dir_entry_result in std::fs::read_dir(home_dir)? { if let Ok(dir_entry) = &dir_entry_result { if let Ok(file_type) = dir_entry.file_type() { if file_type.is_file() { let path_name = dir_entry.file_name(); let path_name_str = path_name.to_string_lossy(); if path_name_str.contains("img_msec_") { let mut target_path = PathBuf::new(); target_path.push(low_prio_target_dir); target_path.push(&path_name); log::info!("moving file {}", &path_name_str); std::fs::rename(dir_entry.path(), target_path)?; moved_files += 1; } } } } } Ok(moved_files) } // TODO no idea if this works in any way shape or form #[allow(dead_code)] pub fn get_latest_image(index: usize) -> Result { // Get the most recently modified file let mut png_files = std::fs::read_dir(HOME_FOLDER_EXPERIMENT)? .flatten() .filter(|f| match f.metadata() { Ok(metadata) => metadata.is_file(), Err(_) => false, }) .filter(|f| match f.file_name().into_string() { Ok(name) => name.ends_with(".png"), Err(_) => false, }) .collect::>(); png_files.sort_by_key(|x| match x.metadata() { Ok(metadata) => { if let Ok(time) = metadata.modified() { time } else { std::time::SystemTime::UNIX_EPOCH } } Err(_) => std::time::SystemTime::UNIX_EPOCH, }); png_files.reverse(); if let Some(png) = png_files.into_iter().nth(index) { return Ok(png.path()); } Err(std::io::Error::other("No latest image found")) } #[cfg(test)] mod tests { use std::sync::{mpsc, Arc}; use tempfile::NamedTempFile; use super::*; fn init() { env_logger::builder().is_test(true).init(); } pub struct ControllerTestbench { pub composite_req_tx: mpsc::Sender>, pub action_reply_rx: mpsc::Receiver>, pub stop_signal: Arc, pub ctrl: ExperimentController, } impl ControllerTestbench { pub fn new() -> Self { init(); let (composite_req_tx, composite_req_rx) = mpsc::channel(); let (action_reply_tx, action_reply_rx) = mpsc::channel(); let stop_signal = Arc::new(AtomicBool::new(false)); let test_tmp_dir = tempfile::tempdir().expect("creating tmpdir failed"); let base_dir = PathBuf::from(test_tmp_dir.path()); let mut stop_file_tmp_path = base_dir.clone(); stop_file_tmp_path.push(STOP_FILE_NAME); let mut stop_file_home_path = base_dir.clone(); stop_file_home_path.push("home"); stop_file_home_path.push(STOP_FILE_NAME); let mut to_ground_dir = base_dir.clone(); to_ground_dir.push("toGround"); let mut to_ground_low_prio_dir = base_dir.clone(); to_ground_low_prio_dir.push("toGroundLP"); let test_paths = ControllerPathCollection { stop_file_home_path, stop_file_tmp_path, to_ground_dir, to_ground_low_prio_dir, }; ControllerTestbench { composite_req_tx, action_reply_rx, stop_signal: stop_signal.clone(), ctrl: ExperimentController::new( composite_req_rx, action_reply_tx, stop_signal, test_paths, ), } } } #[test] fn test_shell_cmd_exection() { let mut testbench = ControllerTestbench::new(); let named_temp_file = NamedTempFile::new().expect("creating temp file failed"); let args = vec![named_temp_file .path() .to_str() .expect("converting path to str failed")]; let cmd = ShellCmd { cmd: "rm", args }; let cmd_serialized = serde_json::to_string(&cmd).expect("serialization failed"); let action_req = satrs::action::ActionRequest { action_id: ActionId::ExecuteShellCommandBlocking as u32, variant: satrs::action::ActionRequestVariant::VecData(cmd_serialized.into_bytes()), }; testbench .composite_req_tx .send(GenericMessage::new( MessageMetadata::new(1, 2), CompositeRequest::Action(action_req), )) .expect("sending action request failed"); testbench.ctrl.perform_operation(); assert!(!named_temp_file.path().exists()); let action_reply = testbench .action_reply_rx .try_recv() .expect("receiving action reply failed"); assert_eq!( action_reply.message.action_id, ActionId::ExecuteShellCommandBlocking as u32 ); match action_reply.message.variant { ActionReplyVariant::Completed => (), _ => { panic!( "unexecpted action reply variant {:?}", action_reply.message.variant ) } } } } // Need to think about the value of this again. This is not easy to do in Rust.. /* pub trait ActionHelperHook { fn is_valid_action_id(&self, action_id: satrs::action::ActionId) -> bool; fn send_reply(&self, action_reply: GenericActionReplyPus) -> Result<(), GenericSendError>; } pub struct ActionHelper { pub requestor: MessageMetadata, pub action_id: satrs::action::ActionId, pub user_hook: Hook, } impl ActionHelper { fn new( &mut self, requestor: MessageMetadata, action_id: satrs::action::ActionId, ) -> Result, GenericSendError> { if !self.user_hook.is_valid_action_id(action_id) { self.report_completion_failed(INVALID_ACTION_ID, None)?; return Ok(None); } Ok(Some(Self { requestor, action_id })) } fn report_completion_success(&self) -> Result<(), GenericSendError> { self.user_hook.send_reply(GenericMessage::new_action_reply( self.requestor, self.action_id, ActionReplyVariant::Completed, ))?; Ok(()) } fn report_completion_failed( &self, error_code: ResultU16, params: Option, ) -> Result<(), GenericSendError> { self.user_hook.send_reply(GenericMessage::new_action_reply( self.requestor, self.action_id, ActionReplyVariant::CompletionFailed { error_code, params }, ))?; Ok(()) } } */