improvements for executable module
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-02-07 16:17:07 +01:00
parent de4e261835
commit 66c594b3d2
Signed by: muellerr
GPG Key ID: A649FB78196E3849

View File

@ -1,13 +1,13 @@
//! Task scheduling module //! Task scheduling module
use alloc::string::String;
use bus::BusReader; use bus::BusReader;
use std::boxed::Box; use std::boxed::Box;
use std::error::Error;
use std::sync::mpsc::TryRecvError; use std::sync::mpsc::TryRecvError;
use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use std::vec; use std::vec;
use std::vec::Vec; use std::vec::Vec;
use std::{io, thread};
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum OpResult { pub enum OpResult {
@ -34,47 +34,49 @@ pub trait Executable: Send {
/// # Arguments /// # Arguments
/// ///
/// * `executable`: Executable task /// * `executable`: Executable task
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks /// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks.
/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op] /// If [None] is passed, no sleeping will be performed.
/// * `op_code`: Operation code which is passed to the executable task
/// [operation call][Executable::periodic_op]
/// * `termination`: Optional termination handler which can cancel threads with a broadcast /// * `termination`: Optional termination handler which can cancel threads with a broadcast
pub fn exec_sched_single< pub fn exec_sched_single<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
T: Executable<Error = E> + Send + 'static + ?Sized,
E: Error + Send + 'static,
>(
mut executable: Box<T>, mut executable: Box<T>,
task_freq: Option<Duration>, task_freq: Option<Duration>,
op_code: i32, op_code: i32,
mut termination: Option<BusReader<()>>, mut termination: Option<BusReader<()>>,
) -> JoinHandle<Result<OpResult, E>> { ) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
let mut cycle_count = 0; let mut cycle_count = 0;
thread::spawn(move || loop { thread::Builder::new()
if let Some(ref mut terminator) = termination { .name(String::from(executable.task_name()))
match terminator.try_recv() { .spawn(move || loop {
Ok(_) | Err(TryRecvError::Disconnected) => { if let Some(ref mut terminator) = termination {
return Ok(OpResult::Ok); match terminator.try_recv() {
} Ok(_) | Err(TryRecvError::Disconnected) => {
Err(TryRecvError::Empty) => (), return Ok(OpResult::Ok);
} }
} Err(TryRecvError::Empty) => (),
match executable.exec_type() {
ExecutionType::OneShot => {
executable.periodic_op(op_code)?;
return Ok(OpResult::Ok);
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_count += 1;
if cycle_count == cycles {
return Ok(OpResult::Ok);
} }
} }
} match executable.exec_type() {
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); ExecutionType::OneShot => {
thread::sleep(freq); executable.periodic_op(op_code)?;
}) return Ok(OpResult::Ok);
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_count += 1;
if cycle_count == cycles {
return Ok(OpResult::Ok);
}
}
}
if let Some(freq) = task_freq {
thread::sleep(freq);
}
})
} }
/// This function allows executing multiple tasks as long as the tasks implement the /// This function allows executing multiple tasks as long as the tasks implement the
@ -86,55 +88,56 @@ pub fn exec_sched_single<
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks /// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks
/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op] /// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op]
/// * `termination`: Optional termination handler which can cancel threads with a broadcast /// * `termination`: Optional termination handler which can cancel threads with a broadcast
pub fn exec_sched_multi< pub fn exec_sched_multi<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
T: Executable<Error = E> + Send + 'static + ?Sized, task_name: &'static str,
E: Error + Send + 'static,
>(
mut executable_vec: Vec<Box<T>>, mut executable_vec: Vec<Box<T>>,
task_freq: Option<Duration>, task_freq: Option<Duration>,
op_code: i32, op_code: i32,
mut termination: Option<BusReader<()>>, mut termination: Option<BusReader<()>>,
) -> JoinHandle<Result<OpResult, E>> { ) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
let mut cycle_counts = vec![0; executable_vec.len()]; let mut cycle_counts = vec![0; executable_vec.len()];
let mut removal_flags = vec![false; executable_vec.len()]; let mut removal_flags = vec![false; executable_vec.len()];
thread::spawn(move || loop {
if let Some(ref mut terminator) = termination { thread::Builder::new()
match terminator.try_recv() { .name(String::from(task_name))
Ok(_) | Err(TryRecvError::Disconnected) => { .spawn(move || loop {
removal_flags.iter_mut().for_each(|x| *x = true); if let Some(ref mut terminator) = termination {
match terminator.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
removal_flags.iter_mut().for_each(|x| *x = true);
}
Err(TryRecvError::Empty) => (),
} }
Err(TryRecvError::Empty) => (),
} }
} for (idx, executable) in executable_vec.iter_mut().enumerate() {
for (idx, executable) in executable_vec.iter_mut().enumerate() { match executable.exec_type() {
match executable.exec_type() { ExecutionType::OneShot => {
ExecutionType::OneShot => { executable.periodic_op(op_code)?;
executable.periodic_op(op_code)?;
removal_flags[idx] = true;
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_counts[idx] += 1;
if cycle_counts[idx] == cycles {
removal_flags[idx] = true; removal_flags[idx] = true;
} }
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_counts[idx] += 1;
if cycle_counts[idx] == cycles {
removal_flags[idx] = true;
}
}
} }
} }
} let mut removal_iter = removal_flags.iter();
let mut removal_iter = removal_flags.iter(); executable_vec.retain(|_| !*removal_iter.next().unwrap());
executable_vec.retain(|_| !*removal_iter.next().unwrap()); removal_iter = removal_flags.iter();
removal_iter = removal_flags.iter(); cycle_counts.retain(|_| !*removal_iter.next().unwrap());
cycle_counts.retain(|_| !*removal_iter.next().unwrap()); removal_flags.retain(|&i| !i);
removal_flags.retain(|&i| !i); if executable_vec.is_empty() {
if executable_vec.is_empty() { return Ok(OpResult::Ok);
return Ok(OpResult::Ok); }
} let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); thread::sleep(freq);
thread::sleep(freq); })
})
} }
#[cfg(test)] #[cfg(test)]
@ -294,7 +297,8 @@ mod tests {
Some(Duration::from_millis(100)), Some(Duration::from_millis(100)),
expected_op_code, expected_op_code,
None, None,
); )
.expect("thread creation failed");
let thread_res = jhandle.join().expect("One Shot Task failed"); let thread_res = jhandle.join().expect("One Shot Task failed");
assert!(thread_res.is_ok()); assert!(thread_res.is_ok());
assert_eq!(thread_res.unwrap(), OpResult::Ok); assert_eq!(thread_res.unwrap(), OpResult::Ok);
@ -319,7 +323,8 @@ mod tests {
Some(Duration::from_millis(100)), Some(Duration::from_millis(100)),
op_code_inducing_failure, op_code_inducing_failure,
None, None,
); )
.expect("thread creation failed");
let thread_res = jhandle.join().expect("One Shot Task failed"); let thread_res = jhandle.join().expect("One Shot Task failed");
assert!(thread_res.is_err()); assert!(thread_res.is_err());
let error = thread_res.unwrap_err(); let error = thread_res.unwrap_err();
@ -356,11 +361,13 @@ mod tests {
assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME); assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME);
} }
let jhandle = exec_sched_multi( let jhandle = exec_sched_multi(
"multi-task-name",
task_vec, task_vec,
Some(Duration::from_millis(100)), Some(Duration::from_millis(100)),
expected_op_code, expected_op_code,
None, None,
); )
.expect("thread creation failed");
let thread_res = jhandle.join().expect("One Shot Task failed"); let thread_res = jhandle.join().expect("One Shot Task failed");
assert!(thread_res.is_ok()); assert!(thread_res.is_ok());
assert_eq!(thread_res.unwrap(), OpResult::Ok); assert_eq!(thread_res.unwrap(), OpResult::Ok);
@ -386,7 +393,8 @@ mod tests {
Some(Duration::from_millis(100)), Some(Duration::from_millis(100)),
expected_op_code, expected_op_code,
None, None,
); )
.expect("thread creation failed");
let thread_res = jh.join().expect("Cycles Task failed"); let thread_res = jh.join().expect("Cycles Task failed");
assert!(thread_res.is_ok()); assert!(thread_res.is_ok());
let data = shared.lock().expect("Locking Mutex failed"); let data = shared.lock().expect("Locking Mutex failed");
@ -418,11 +426,13 @@ mod tests {
let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> = let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
vec![one_shot_task, cycled_task_0, cycled_task_1]; vec![one_shot_task, cycled_task_0, cycled_task_1];
let jh = exec_sched_multi( let jh = exec_sched_multi(
"multi-task-name",
task_vec, task_vec,
Some(Duration::from_millis(100)), Some(Duration::from_millis(100)),
expected_op_code, expected_op_code,
None, None,
); )
.expect("thread creation failed");
let thread_res = jh.join().expect("Cycles Task failed"); let thread_res = jh.join().expect("Cycles Task failed");
assert!(thread_res.is_ok()); assert!(thread_res.is_ok());
let data = shared.lock().expect("Locking Mutex failed"); let data = shared.lock().expect("Locking Mutex failed");
@ -449,7 +459,8 @@ mod tests {
Some(Duration::from_millis(20)), Some(Duration::from_millis(20)),
expected_op_code, expected_op_code,
Some(terminator.add_rx()), Some(terminator.add_rx()),
); )
.expect("thread creation failed");
thread::sleep(Duration::from_millis(40)); thread::sleep(Duration::from_millis(40));
terminator.broadcast(()); terminator.broadcast(());
let thread_res = jh.join().expect("Periodic Task failed"); let thread_res = jh.join().expect("Periodic Task failed");
@ -485,11 +496,13 @@ mod tests {
let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> = let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
vec![cycled_task, periodic_task_0, periodic_task_1]; vec![cycled_task, periodic_task_0, periodic_task_1];
let jh = exec_sched_multi( let jh = exec_sched_multi(
"multi-task-name",
task_vec, task_vec,
Some(Duration::from_millis(20)), Some(Duration::from_millis(20)),
expected_op_code, expected_op_code,
Some(terminator.add_rx()), Some(terminator.add_rx()),
); )
.expect("thread creation failed");
thread::sleep(Duration::from_millis(60)); thread::sleep(Duration::from_millis(60));
terminator.broadcast(()); terminator.broadcast(());
let thread_res = jh.join().expect("Periodic Task failed"); let thread_res = jh.join().expect("Periodic Task failed");