diff --git a/.idea/runConfigurations/Docs.xml b/.idea/runConfigurations/Docs.xml new file mode 100644 index 0000000..c4efeaa --- /dev/null +++ b/.idea/runConfigurations/Docs.xml @@ -0,0 +1,18 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Test_All.xml b/.idea/runConfigurations/Test_All.xml new file mode 100644 index 0000000..6f884df --- /dev/null +++ b/.idea/runConfigurations/Test_All.xml @@ -0,0 +1,18 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 12c6fba..b0b631b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,13 +2,158 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "atomic-option" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bus" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e66e1779f5b1440f1a58220ba3b3ded4427175f0a9fb8d7066521f8b4e8f2b" +dependencies = [ + "atomic-option", + "crossbeam-channel 0.4.4", + "num_cpus", + "parking_lot_core", +] + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils 0.7.2", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.8", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +dependencies = [ + "cfg-if 1.0.0", + "lazy_static", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "launchpad" version = "0.1.0" dependencies = [ + "bus", + "crossbeam-channel 0.5.4", "thiserror", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.126" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" + +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "parking_lot_core" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + [[package]] name = "proc-macro2" version = "1.0.38" @@ -27,6 +172,18 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + [[package]] name = "syn" version = "1.0.94" @@ -63,3 +220,25 @@ name = "unicode-xid" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index 717fb73..116c94f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -thiserror = "1.0" \ No newline at end of file +thiserror = "1.0" +bus = "2.2.3" +crossbeam-channel = "0.5.4" \ No newline at end of file diff --git a/src/core.rs b/src/core.rs index 61031db..6ee7941 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,2 +1,3 @@ pub mod events; +pub mod executable; pub mod objects; diff --git a/src/core/executable.rs b/src/core/executable.rs new file mode 100644 index 0000000..68935c1 --- /dev/null +++ b/src/core/executable.rs @@ -0,0 +1,496 @@ +use bus::BusReader; +use std::error::Error; +use std::sync::mpsc::TryRecvError; +use std::thread; +use std::thread::JoinHandle; +use std::time::Duration; + +#[derive(Debug, PartialEq)] +pub enum OpResult { + Ok, + TerminationRequested, +} + +pub enum ExecutionType { + Infinite, + Cycles(u32), + OneShot, +} + +pub trait Executable: Send { + type Error; + + fn exec_type(&self) -> ExecutionType; + fn task_name(&self) -> &'static str; + fn periodic_op(&mut self, op_code: i32) -> Result; +} + +/// This function allows executing one task which implements the [Executable][Executable] trait +/// +/// # Arguments +/// +/// * `executable`: Executable task +/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks +/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op] +/// * `termination`: Optional termination handler which can cancel threads with a broadcast +pub fn exec_sched_single< + T: Executable + Send + 'static + ?Sized, + E: Error + Send + 'static, +>( + mut executable: Box, + task_freq: Option, + op_code: i32, + mut termination: Option>, +) -> JoinHandle> { + let mut cycle_count = 0; + thread::spawn(move || loop { + if let Some(ref mut terminator) = termination { + match terminator.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => { + return Ok(OpResult::Ok); + } + Err(TryRecvError::Empty) => (), + } + } + match executable.exec_type() { + ExecutionType::OneShot => { + executable.periodic_op(op_code)?; + return Ok(OpResult::Ok); + } + ExecutionType::Infinite => { + executable.periodic_op(op_code)?; + } + ExecutionType::Cycles(cycles) => { + executable.periodic_op(op_code)?; + cycle_count += 1; + if cycle_count == cycles { + return Ok(OpResult::Ok); + } + } + } + let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); + thread::sleep(freq); + }) +} + +/// This function allows executing multiple tasks as long as the tasks implement the +/// [Executable][Executable] trait +/// +/// # Arguments +/// +/// * `executable_vec`: Vector of executable objects +/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks +/// * `op_code`: Operation code which is passed to the executable task periodic_op call +/// * `termination`: Optional termination handler which can cancel threads with a broadcast +pub fn exec_sched_multi< + T: Executable + Send + 'static + ?Sized, + E: Error + Send + 'static, +>( + mut executable_vec: Vec>, + task_freq: Option, + op_code: i32, + mut termination: Option>, +) -> JoinHandle> { + let mut cycle_counts = vec![0; executable_vec.len()]; + let mut removal_flags = vec![false; executable_vec.len()]; + thread::spawn(move || loop { + if let Some(ref mut terminator) = termination { + match terminator.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => { + removal_flags.iter_mut().for_each(|x| *x = true); + } + Err(TryRecvError::Empty) => (), + } + } + for (idx, executable) in executable_vec.iter_mut().enumerate() { + match executable.exec_type() { + ExecutionType::OneShot => { + executable.periodic_op(op_code)?; + removal_flags[idx] = true; + } + ExecutionType::Infinite => { + executable.periodic_op(op_code)?; + } + ExecutionType::Cycles(cycles) => { + executable.periodic_op(op_code)?; + cycle_counts[idx] += 1; + if cycle_counts[idx] == cycles { + removal_flags[idx] = true; + } + } + } + } + let mut removal_iter = removal_flags.iter(); + executable_vec.retain(|_| !*removal_iter.next().unwrap()); + removal_iter = removal_flags.iter(); + cycle_counts.retain(|_| !*removal_iter.next().unwrap()); + removal_flags.retain(|&i| !i); + if executable_vec.is_empty() { + return Ok(OpResult::Ok); + } + let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); + thread::sleep(freq); + }) +} + +#[cfg(test)] +mod tests { + use super::{exec_sched_multi, exec_sched_single, Executable, ExecutionType, OpResult}; + use bus::Bus; + use std::error::Error; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + use std::{fmt, thread}; + + struct TestInfo { + exec_num: u32, + op_code: i32, + } + struct OneShotTask { + exec_num: Arc>, + } + struct FixedCyclesTask { + cycles: u32, + exec_num: Arc>, + } + struct PeriodicTask { + exec_num: Arc>, + } + + #[derive(Clone, Debug)] + struct ExampleError { + kind: ErrorKind, + } + + /// The kind of an error that can occur. + #[derive(Clone, Debug)] + pub enum ErrorKind { + Generic(String, i32), + } + + impl ExampleError { + fn new(msg: &str, code: i32) -> ExampleError { + ExampleError { + kind: ErrorKind::Generic(msg.to_string(), code), + } + } + + /// Return the kind of this error. + pub fn kind(&self) -> &ErrorKind { + &self.kind + } + } + + impl fmt::Display for ExampleError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.kind() { + ErrorKind::Generic(str, code) => { + write!(f, "{str} with code {code}") + } + } + } + } + + impl Error for ExampleError {} + + const ONE_SHOT_TASK_NAME: &str = "One Shot Task"; + + impl Executable for OneShotTask { + type Error = ExampleError; + + fn exec_type(&self) -> ExecutionType { + ExecutionType::OneShot + } + + fn task_name(&self) -> &'static str { + ONE_SHOT_TASK_NAME + } + + fn periodic_op(&mut self, op_code: i32) -> Result { + let mut data = self.exec_num.lock().expect("Locking Mutex failed"); + data.exec_num += 1; + data.op_code = op_code; + std::mem::drop(data); + if op_code >= 0 { + Ok(OpResult::Ok) + } else { + Err(ExampleError::new("One Shot Task Failure", op_code)) + } + } + } + + const CYCLE_TASK_NAME: &str = "Fixed Cycles Task"; + + impl Executable for FixedCyclesTask { + type Error = ExampleError; + + fn exec_type(&self) -> ExecutionType { + ExecutionType::Cycles(self.cycles) + } + + fn task_name(&self) -> &'static str { + CYCLE_TASK_NAME + } + + fn periodic_op(&mut self, op_code: i32) -> Result { + let mut data = self.exec_num.lock().expect("Locking Mutex failed"); + data.exec_num += 1; + data.op_code = op_code; + std::mem::drop(data); + if op_code >= 0 { + Ok(OpResult::Ok) + } else { + Err(ExampleError::new("Fixed Cycle Task Failure", op_code)) + } + } + } + + const PERIODIC_TASK_NAME: &str = "Periodic Task"; + + impl Executable for PeriodicTask { + type Error = ExampleError; + + fn exec_type(&self) -> ExecutionType { + ExecutionType::Infinite + } + + fn task_name(&self) -> &'static str { + PERIODIC_TASK_NAME + } + + fn periodic_op(&mut self, op_code: i32) -> Result { + let mut data = self.exec_num.lock().expect("Locking Mutex failed"); + data.exec_num += 1; + data.op_code = op_code; + std::mem::drop(data); + if op_code >= 0 { + Ok(OpResult::Ok) + } else { + Err(ExampleError::new("Example Task Failure", op_code)) + } + } + } + + #[test] + fn test_simple_one_shot() { + let expected_op_code = 42; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let exec_task = OneShotTask { + exec_num: shared.clone(), + }; + let task = Box::new(exec_task); + let jhandle = exec_sched_single( + task, + Some(Duration::from_millis(100)), + expected_op_code, + None, + ); + let thread_res = jhandle.join().expect("One Shot Task failed"); + assert!(thread_res.is_ok()); + assert_eq!(thread_res.unwrap(), OpResult::Ok); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(data.exec_num, 1); + assert_eq!(data.op_code, expected_op_code); + } + + #[test] + fn test_failed_one_shot() { + let op_code_inducing_failure = -1; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let exec_task = OneShotTask { + exec_num: shared.clone(), + }; + let task = Box::new(exec_task); + let jhandle = exec_sched_single( + task, + Some(Duration::from_millis(100)), + op_code_inducing_failure, + None, + ); + let thread_res = jhandle.join().expect("One Shot Task failed"); + assert!(thread_res.is_err()); + let error = thread_res.unwrap_err(); + let err = error.kind(); + assert!(matches!(err, &ErrorKind::Generic { .. })); + match err { + ErrorKind::Generic(str, op_code) => { + assert_eq!(str, &String::from("One Shot Task Failure")); + assert_eq!(op_code, &op_code_inducing_failure); + } + } + let error_display = error.to_string(); + assert_eq!(error_display, "One Shot Task Failure with code -1"); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(data.exec_num, 1); + assert_eq!(data.op_code, op_code_inducing_failure); + } + + #[test] + fn test_simple_multi_one_shot() { + let expected_op_code = 43; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let exec_task_0 = OneShotTask { + exec_num: shared.clone(), + }; + let exec_task_1 = OneShotTask { + exec_num: shared.clone(), + }; + let task_vec = vec![Box::new(exec_task_0), Box::new(exec_task_1)]; + for task in task_vec.iter() { + assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME); + } + let jhandle = exec_sched_multi( + task_vec, + Some(Duration::from_millis(100)), + expected_op_code, + None, + ); + let thread_res = jhandle.join().expect("One Shot Task failed"); + assert!(thread_res.is_ok()); + assert_eq!(thread_res.unwrap(), OpResult::Ok); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(data.exec_num, 2); + assert_eq!(data.op_code, expected_op_code); + } + + #[test] + fn test_cycles_single() { + let expected_op_code = 44; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let cycled_task = Box::new(FixedCyclesTask { + exec_num: shared.clone(), + cycles: 1, + }); + assert_eq!(cycled_task.task_name(), CYCLE_TASK_NAME); + let jh = exec_sched_single( + cycled_task, + Some(Duration::from_millis(100)), + expected_op_code, + None, + ); + let thread_res = jh.join().expect("Cycles Task failed"); + assert!(thread_res.is_ok()); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(thread_res.unwrap(), OpResult::Ok); + assert_eq!(data.exec_num, 1); + assert_eq!(data.op_code, expected_op_code); + } + + #[test] + fn test_single_and_cycles() { + let expected_op_code = 50; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let one_shot_task = Box::new(OneShotTask { + exec_num: shared.clone(), + }); + let cycled_task_0 = Box::new(FixedCyclesTask { + exec_num: shared.clone(), + cycles: 1, + }); + let cycled_task_1 = Box::new(FixedCyclesTask { + exec_num: shared.clone(), + cycles: 1, + }); + assert_eq!(cycled_task_0.task_name(), CYCLE_TASK_NAME); + assert_eq!(one_shot_task.task_name(), ONE_SHOT_TASK_NAME); + let task_vec: Vec>> = + vec![one_shot_task, cycled_task_0, cycled_task_1]; + let jh = exec_sched_multi( + task_vec, + Some(Duration::from_millis(100)), + expected_op_code, + None, + ); + let thread_res = jh.join().expect("Cycles Task failed"); + assert!(thread_res.is_ok()); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(thread_res.unwrap(), OpResult::Ok); + assert_eq!(data.exec_num, 3); + assert_eq!(data.op_code, expected_op_code); + } + + #[test] + #[ignore] + fn test_periodic_single() { + let mut terminator = Bus::new(5); + let expected_op_code = 45; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let periodic_task = Box::new(PeriodicTask { + exec_num: shared.clone(), + }); + assert_eq!(periodic_task.task_name(), PERIODIC_TASK_NAME); + let jh = exec_sched_single( + periodic_task, + Some(Duration::from_millis(20)), + expected_op_code, + Some(terminator.add_rx()), + ); + thread::sleep(Duration::from_millis(40)); + terminator.broadcast(()); + let thread_res = jh.join().expect("Periodic Task failed"); + assert!(thread_res.is_ok()); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(thread_res.unwrap(), OpResult::Ok); + let range = 2..4; + assert!(range.contains(&data.exec_num)); + assert_eq!(data.op_code, expected_op_code); + } + + #[test] + #[ignore] + fn test_periodic_multi() { + let mut terminator = Bus::new(5); + let expected_op_code = 46; + let shared = Arc::new(Mutex::new(TestInfo { + exec_num: 0, + op_code: 0, + })); + let cycled_task = Box::new(FixedCyclesTask { + exec_num: shared.clone(), + cycles: 1, + }); + let periodic_task_0 = Box::new(PeriodicTask { + exec_num: shared.clone(), + }); + let periodic_task_1 = Box::new(PeriodicTask { + exec_num: shared.clone(), + }); + assert_eq!(periodic_task_0.task_name(), PERIODIC_TASK_NAME); + assert_eq!(periodic_task_1.task_name(), PERIODIC_TASK_NAME); + let task_vec: Vec>> = + vec![cycled_task, periodic_task_0, periodic_task_1]; + let jh = exec_sched_multi( + task_vec, + Some(Duration::from_millis(20)), + expected_op_code, + Some(terminator.add_rx()), + ); + thread::sleep(Duration::from_millis(60)); + terminator.broadcast(()); + let thread_res = jh.join().expect("Periodic Task failed"); + assert!(thread_res.is_ok()); + let data = shared.lock().expect("Locking Mutex failed"); + assert_eq!(thread_res.unwrap(), OpResult::Ok); + let range = 7..11; + assert!(range.contains(&data.exec_num)); + assert_eq!(data.op_code, expected_op_code); + } +} diff --git a/src/core/objects.rs b/src/core/objects.rs index dd51ef2..0da10e4 100644 --- a/src/core/objects.rs +++ b/src/core/objects.rs @@ -124,7 +124,7 @@ mod tests { #[test] fn test_obj_manager_simple() { - let mut obj_manager = ObjectManager::new(); + let mut obj_manager = ObjectManager::default(); let expl_obj_id = ObjectId { id: 0, name: "Example 0", @@ -159,5 +159,17 @@ mod tests { let expl_obj_back_casted = obj_back_casted.unwrap(); assert_eq!(expl_obj_back_casted.string, String::from("Hello Test")); assert!(expl_obj_back_casted.was_initialized); + + let existing_obj_id = ObjectId { + id: 12, + name: "Example 1", + }; + let invalid_obj = OtherExampleObject { + id: existing_obj_id, + string: String::from("Hello Test"), + was_initialized: false, + }; + + assert!(!obj_manager.insert(Box::new(invalid_obj))); } } diff --git a/src/main.rs b/src/main.rs index 63f5441..7527576 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -// use launchpad::ObjectManager; - fn main() { - println!("Hello World"); + println!("hello"); }