1
0
forked from ROMEO/nexosim

Refactor code

This commit is contained in:
Serge Barral 2022-10-18 09:50:22 +02:00
parent abab030b4a
commit eba882b4b1
5 changed files with 315 additions and 331 deletions

View File

@ -10,7 +10,7 @@ use slab::Slab;
mod find_bit; mod find_bit;
mod injector; mod injector;
mod pool; mod pool_manager;
mod queue; mod queue;
mod rng; mod rng;
mod task; mod task;
@ -19,14 +19,14 @@ mod worker;
#[cfg(all(test, not(asynchronix_loom)))] #[cfg(all(test, not(asynchronix_loom)))]
mod tests; mod tests;
use self::pool::Pool; use self::pool_manager::PoolManager;
use self::rng::Rng; use self::rng::Rng;
use self::task::{CancelToken, Promise, Runnable}; use self::task::{CancelToken, Promise, Runnable};
use self::worker::Worker; use self::worker::Worker;
use crate::macros::scoped_local_key::scoped_thread_local; use crate::macros::scoped_local_key::scoped_thread_local;
type Bucket = injector::Bucket<Runnable, 128>; type Bucket = injector::Bucket<Runnable, 128>;
type GlobalQueue = injector::Injector<Runnable, 128>; type Injector = injector::Injector<Runnable, 128>;
type LocalQueue = queue::Worker<Runnable, queue::B256>; type LocalQueue = queue::Worker<Runnable, queue::B256>;
type Stealer = queue::Stealer<Runnable, queue::B256>; type Stealer = queue::Stealer<Runnable, queue::B256>;
@ -55,23 +55,31 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
/// The design of the executor is largely influenced by the tokio and go /// The design of the executor is largely influenced by the tokio and go
/// schedulers, both of which are optimized for message-passing applications. In /// schedulers, both of which are optimized for message-passing applications. In
/// particular, it uses fast, fixed-size thread-local work-stealing queues with /// particular, it uses fast, fixed-size thread-local work-stealing queues with
/// a "fast" non-stealable slot in combination with a global injector queue. The /// a non-stealable LIFO slot in combination with an injector queue, which
/// injector queue is used both to schedule new tasks and to absorb temporary /// injector queue is used both to schedule new tasks and to absorb temporary
/// overflow in the local queues. The design of the injector queue is kept very /// overflow in the local queues.
/// simple by taking advantage of the fact that the injector is not required to
/// be either LIFO or FIFO.
/// ///
/// Probably the largest difference with tokio is the task system, which /// The design of the injector queue is kept very simple compared to tokio, by
/// achieves a higher throughput by reducing the need for synchronization. /// taking advantage of the fact that the injector is not required to be either
/// Another difference is that, at the moment, the complete subset of active /// LIFO or FIFO. Moving tasks between a local queue and the injector is fast
/// worker threads is stored in a single atomic variable. This makes it possible /// because tasks are moved in batch and are stored contiguously in memory.
/// to rapidly identify free worker threads for stealing operations. The ///
/// downside of this approach is that the maximum number of worker threads is /// Another difference with tokio is that, at the moment, the complete subset of
/// limited to `usize::BITS`, but this is unlikely to constitute a limitation /// active worker threads is stored in a single atomic variable. This makes it
/// since system simulation is not typically embarrassingly parallel. /// possible to rapidly identify free worker threads for stealing operations,
/// with the downside that the maximum number of worker threads is currently
/// limited to `usize::BITS`. This is unlikely to constitute a limitation in
/// practice though since system simulation is not typically embarrassingly
/// parallel.
///
/// Probably the largest difference with tokio is the task system, which has
/// better throughput due to less need for synchronization. This mainly results
/// from the use of an atomic notification counter rather than an atomic
/// notification flag, thus alleviating the need to reset the notification flag
/// before polling a future.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Executor { pub(crate) struct Executor {
pool: Arc<Pool>, context: Arc<ExecutorContext>,
active_tasks: Arc<Mutex<Slab<CancelToken>>>, active_tasks: Arc<Mutex<Slab<CancelToken>>>,
parker: parking::Parker, parker: parking::Parker,
join_handles: Vec<JoinHandle<()>>, join_handles: Vec<JoinHandle<()>>,
@ -103,13 +111,17 @@ impl Executor {
usize::MAX / 2 usize::MAX / 2
); );
let pool = Arc::new(Pool::new(executor_id, unparker, shared_data.into_iter())); let context = Arc::new(ExecutorContext::new(
executor_id,
unparker,
shared_data.into_iter(),
));
let active_tasks = Arc::new(Mutex::new(Slab::new())); let active_tasks = Arc::new(Mutex::new(Slab::new()));
// All workers must be marked as active _before_ spawning the threads to // All workers must be marked as active _before_ spawning the threads to
// make sure that the count of active workers does not fall to zero // make sure that the count of active workers does not fall to zero
// before all workers are blocked on the signal barrier. // before all workers are blocked on the signal barrier.
pool.set_all_workers_active(); context.pool_manager.set_all_workers_active();
// Spawn all worker threads. // Spawn all worker threads.
let join_handles: Vec<_> = local_data let join_handles: Vec<_> = local_data
@ -121,10 +133,10 @@ impl Executor {
thread_builder thread_builder
.spawn({ .spawn({
let pool = pool.clone(); let context = context.clone();
let active_tasks = active_tasks.clone(); let active_tasks = active_tasks.clone();
move || { move || {
let worker = Worker::new(local_queue, pool); let worker = Worker::new(local_queue, context);
ACTIVE_TASKS.set(&active_tasks, || { ACTIVE_TASKS.set(&active_tasks, || {
LOCAL_WORKER LOCAL_WORKER
.set(&worker, || run_local_worker(&worker, id, worker_parker)) .set(&worker, || run_local_worker(&worker, id, worker_parker))
@ -137,10 +149,10 @@ impl Executor {
// Wait until all workers are blocked on the signal barrier. // Wait until all workers are blocked on the signal barrier.
parker.park(); parker.park();
assert!(pool.is_pool_idle()); assert!(context.pool_manager.is_pool_idle());
Self { Self {
pool, context,
active_tasks, active_tasks,
parker, parker,
join_handles, join_handles,
@ -163,12 +175,12 @@ impl Executor {
let future = CancellableFuture::new(future, task_entry.key()); let future = CancellableFuture::new(future, task_entry.key());
let (promise, runnable, cancel_token) = let (promise, runnable, cancel_token) =
task::spawn(future, schedule_task, self.pool.executor_id); task::spawn(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token); task_entry.insert(cancel_token);
self.pool.global_queue.insert_task(runnable); self.context.injector.insert_task(runnable);
self.pool.activate_worker(); self.context.pool_manager.activate_worker();
promise promise
} }
@ -191,22 +203,22 @@ impl Executor {
let future = CancellableFuture::new(future, task_entry.key()); let future = CancellableFuture::new(future, task_entry.key());
let (runnable, cancel_token) = let (runnable, cancel_token) =
task::spawn_and_forget(future, schedule_task, self.pool.executor_id); task::spawn_and_forget(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token); task_entry.insert(cancel_token);
self.pool.global_queue.insert_task(runnable); self.context.injector.insert_task(runnable);
self.pool.activate_worker(); self.context.pool_manager.activate_worker();
} }
/// Let the executor run, blocking until all futures have completed or until /// Let the executor run, blocking until all futures have completed or until
/// the executor deadlocks. /// the executor deadlocks.
pub(crate) fn run(&mut self) { pub(crate) fn run(&mut self) {
loop { loop {
if let Some(worker_panic) = self.pool.take_panic() { if let Some(worker_panic) = self.context.pool_manager.take_panic() {
panic::resume_unwind(worker_panic); panic::resume_unwind(worker_panic);
} }
if self.pool.is_pool_idle() { if self.context.pool_manager.is_pool_idle() {
return; return;
} }
@ -218,7 +230,7 @@ impl Executor {
impl Drop for Executor { impl Drop for Executor {
fn drop(&mut self) { fn drop(&mut self) {
// Force all threads to return. // Force all threads to return.
self.pool.trigger_termination(); self.context.pool_manager.trigger_termination();
for join_handle in self.join_handles.drain(0..) { for join_handle in self.join_handles.drain(0..) {
join_handle.join().unwrap(); join_handle.join().unwrap();
} }
@ -227,7 +239,7 @@ impl Drop for Executor {
// //
// A local worker must be set because some tasks may schedule other // A local worker must be set because some tasks may schedule other
// tasks when dropped, which requires that a local worker be available. // tasks when dropped, which requires that a local worker be available.
let worker = Worker::new(LocalQueue::new(), self.pool.clone()); let worker = Worker::new(LocalQueue::new(), self.context.clone());
LOCAL_WORKER.set(&worker, || { LOCAL_WORKER.set(&worker, || {
// Cancel all pending futures. // Cancel all pending futures.
// //
@ -246,7 +258,7 @@ impl Drop for Executor {
// Some of the dropped tasks may have scheduled other tasks that // Some of the dropped tasks may have scheduled other tasks that
// were not yet cancelled, preventing them from being dropped // were not yet cancelled, preventing them from being dropped
// upon cancellation. This is OK: the scheduled tasks will be // upon cancellation. This is OK: the scheduled tasks will be
// dropped when the local and global queues are dropped, and // dropped when the local and injector queues are dropped, and
// they cannot re-schedule one another since all tasks were // they cannot re-schedule one another since all tasks were
// cancelled. // cancelled.
}); });
@ -254,6 +266,38 @@ impl Drop for Executor {
} }
} }
/// Shared executor context.
#[derive(Debug)]
struct ExecutorContext {
injector: Injector,
executor_id: usize,
executor_unparker: parking::Unparker,
pool_manager: PoolManager,
}
impl ExecutorContext {
/// Creates a new shared executor context.
pub(super) fn new(
executor_id: usize,
executor_unparker: parking::Unparker,
shared_data: impl Iterator<Item = (Stealer, parking::Unparker)>,
) -> Self {
let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = shared_data.into_iter().unzip();
let worker_unparkers = worker_unparkers.into_boxed_slice();
Self {
injector: Injector::new(),
executor_id,
executor_unparker,
pool_manager: PoolManager::new(
worker_unparkers.len(),
stealers.into_boxed_slice(),
worker_unparkers,
),
}
}
}
// A `Future` wrapper that removes its cancellation token from the executor's // A `Future` wrapper that removes its cancellation token from the executor's
// list of active tasks when dropped. // list of active tasks when dropped.
struct CancellableFuture<T: Future> { struct CancellableFuture<T: Future> {
@ -301,15 +345,20 @@ impl<T: Future> Drop for CancellableFuture<T> {
fn schedule_task(task: Runnable, executor_id: usize) { fn schedule_task(task: Runnable, executor_id: usize) {
LOCAL_WORKER LOCAL_WORKER
.map(|worker| { .map(|worker| {
let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector;
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
// Check that this task was indeed spawned on this executor. // Check that this task was indeed spawned on this executor.
assert_eq!( assert_eq!(
executor_id, worker.pool.executor_id, executor_id, worker.executor_context.executor_id,
"Tasks must be awaken on the same executor they are spawned on" "Tasks must be awaken on the same executor they are spawned on"
); );
// Store the task in the fast slot and retrieve the one that was // Store the task in the fast slot and retrieve the one that was
// formerly stored, if any. // formerly stored, if any.
let prev_task = match worker.fast_slot.replace(Some(task)) { let prev_task = match fast_slot.replace(Some(task)) {
// If there already was a task in the slot, proceed so it can be // If there already was a task in the slot, proceed so it can be
// moved to a task queue. // moved to a task queue.
Some(t) => t, Some(t) => t,
@ -319,26 +368,24 @@ fn schedule_task(task: Runnable, executor_id: usize) {
}; };
// Push the previous task to the local queue if possible or on the // Push the previous task to the local queue if possible or on the
// global queue otherwise. // injector queue otherwise.
if let Err(prev_task) = worker.local_queue.push(prev_task) { if let Err(prev_task) = local_queue.push(prev_task) {
// The local queue is full. Try to move half of it to the global // The local queue is full. Try to move half of it to the
// queue; if this fails, just push one task to the global queue. // injector queue; if this fails, just push one task to the
if let Ok(drain) = worker.local_queue.drain(|_| Bucket::capacity()) { // injector queue.
worker if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) {
.pool injector.push_bucket(Bucket::from_iter(drain));
.global_queue local_queue.push(prev_task).unwrap();
.push_bucket(Bucket::from_iter(drain));
worker.local_queue.push(prev_task).unwrap();
} else { } else {
worker.pool.global_queue.insert_task(prev_task); injector.insert_task(prev_task);
} }
} }
// A task has been pushed to the local or global queue: try to // A task has been pushed to the local or injector queue: try to
// activate another worker if no worker is currently searching for a // activate another worker if no worker is currently searching for a
// task. // task.
if worker.pool.searching_worker_count() == 0 { if pool_manager.searching_worker_count() == 0 {
worker.pool.activate_worker_relaxed(); pool_manager.activate_worker_relaxed();
} }
}) })
.expect("Tasks may not be awaken outside executor threads"); .expect("Tasks may not be awaken outside executor threads");
@ -347,6 +394,12 @@ fn schedule_task(task: Runnable, executor_id: usize) {
/// Processes all incoming tasks on a worker thread until the `Terminate` signal /// Processes all incoming tasks on a worker thread until the `Terminate` signal
/// is received or until it panics. /// is received or until it panics.
fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector;
let executor_unparker = &worker.executor_context.executor_unparker;
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
let result = panic::catch_unwind(AssertUnwindSafe(|| { let result = panic::catch_unwind(AssertUnwindSafe(|| {
// Set how long to spin when searching for a task. // Set how long to spin when searching for a task.
const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000);
@ -356,22 +409,30 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
loop { loop {
// Signal barrier: park until notified to continue or terminate. // Signal barrier: park until notified to continue or terminate.
if worker.pool.try_set_worker_inactive(id) {
// Try to deactivate the worker.
if pool_manager.try_set_worker_inactive(id) {
parker.park(); parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
} else if injector.is_empty() {
// This worker could not be deactivated because it was the last
// active worker. In such case, the call to
// `try_set_worker_inactive` establishes a synchronization with
// all threads that pushed tasks to the injector queue but could
// not activate a new worker, which is why some tasks may now be
// visible in the injector queue.
pool_manager.set_all_workers_inactive();
executor_unparker.unpark();
parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
} else { } else {
// This worker is the last active worker so it is necessary to // There are tasks in the injector: resume the search.
// check if the global queue is populated. This could happen if pool_manager.begin_worker_search();
// the executor thread pushed a task to the global queue but
// could not activate a new worker because all workers were then
// activated.
if worker.pool.global_queue.is_empty() {
worker.pool.set_all_workers_inactive();
worker.pool.executor_unparker.unpark();
parker.park();
}
} }
if worker.pool.termination_is_triggered() { if pool_manager.termination_is_triggered() {
return; return;
} }
@ -379,8 +440,8 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Process the tasks one by one. // Process the tasks one by one.
loop { loop {
// Check the global queue first. // Check the injector queue first.
if let Some(bucket) = worker.pool.global_queue.pop_bucket() { if let Some(bucket) = injector.pop_bucket() {
let bucket_iter = bucket.into_iter(); let bucket_iter = bucket.into_iter();
// There is a _very_ remote possibility that, even though // There is a _very_ remote possibility that, even though
@ -390,42 +451,43 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// it took to pop and process the remaining tasks and it // it took to pop and process the remaining tasks and it
// hasn't released the stolen capacity yet. // hasn't released the stolen capacity yet.
// //
// Unfortunately, we cannot just skip checking the global // Unfortunately, we cannot just skip checking the injector
// queue altogether when there isn't enough spare capacity // queue altogether when there isn't enough spare capacity
// in the local queue because this could lead to a race: // in the local queue because this could lead to a race:
// suppose that (1) this thread has earlier pushed tasks // suppose that (1) this thread has earlier pushed tasks
// onto the global queue, and (2) the stealer has processed // onto the injector queue, and (2) the stealer has
// all stolen tasks before this thread sees the capacity // processed all stolen tasks before this thread sees the
// restored and at the same time (3) the stealer does not // capacity restored and at the same time (3) the stealer
// yet see the tasks this thread pushed to the global queue; // does not yet see the tasks this thread pushed to the
// in such scenario, both this thread and the stealer thread // injector queue; in such scenario, both this thread and
// may park and leave unprocessed tasks in the global queue. // the stealer thread may park and leave unprocessed tasks
// in the injector queue.
// //
// This is the only instance where spinning is used, as the // This is the only instance where spinning is used, as the
// probability of this happening is close to zero and the // probability of this happening is close to zero and the
// complexity of a signaling mechanism (condvar & friends) // complexity of a signaling mechanism (condvar & friends)
// wouldn't carry its weight. // wouldn't carry its weight.
while worker.local_queue.spare_capacity() < bucket_iter.len() {} while local_queue.spare_capacity() < bucket_iter.len() {}
// Since empty buckets are never pushed onto the global // Since empty buckets are never pushed onto the injector
// queue, we should now have at least one task to process. // queue, we should now have at least one task to process.
worker.local_queue.extend(bucket_iter); local_queue.extend(bucket_iter);
} else { } else {
// The global queue is empty. Try to steal from active // The injector queue is empty. Try to steal from active
// siblings. // siblings.
let mut stealers = worker.pool.shuffled_stealers(Some(id), &rng); let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng);
if stealers.all(|stealer| { if stealers.all(|stealer| {
stealer stealer
.steal_and_pop(&worker.local_queue, |n| n - n / 2) .steal_and_pop(local_queue, |n| n - n / 2)
.map(|task| { .map(|task| {
let prev_task = worker.fast_slot.replace(Some(task)); let prev_task = fast_slot.replace(Some(task));
assert!(prev_task.is_none()); assert!(prev_task.is_none());
}) })
.is_err() .is_err()
}) { }) {
// Give up if unsuccessful for too long. // Give up if unsuccessful for too long.
if (Instant::now() - search_start) > MAX_SEARCH_DURATION { if (Instant::now() - search_start) > MAX_SEARCH_DURATION {
worker.pool.end_worker_search(); pool_manager.end_worker_search();
break; break;
} }
@ -436,19 +498,18 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Signal the end of the search so that another worker can be // Signal the end of the search so that another worker can be
// activated when a new task is scheduled. // activated when a new task is scheduled.
worker.pool.end_worker_search(); pool_manager.end_worker_search();
// Pop tasks from the fast slot or the local queue. // Pop tasks from the fast slot or the local queue.
while let Some(task) = worker.fast_slot.take().or_else(|| worker.local_queue.pop()) while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) {
{ if pool_manager.termination_is_triggered() {
if worker.pool.termination_is_triggered() {
return; return;
} }
task.run(); task.run();
} }
// Resume the search for tasks. // Resume the search for tasks.
worker.pool.begin_worker_search(); pool_manager.begin_worker_search();
search_start = Instant::now(); search_start = Instant::now();
} }
} }
@ -456,8 +517,8 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Propagate the panic, if any. // Propagate the panic, if any.
if let Err(panic) = result { if let Err(panic) = result {
worker.pool.register_panic(panic); pool_manager.register_panic(panic);
worker.pool.trigger_termination(); pool_manager.trigger_termination();
worker.pool.executor_unparker.unpark(); executor_unparker.unpark();
} }
} }

View File

@ -77,7 +77,7 @@ impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
inner.push(new_bucket); inner.push(new_bucket);
// Ordering: this flag is only used as a hint so Relaxed ordering is // Ordering: this flag is only used as a hint so Relaxed ordering is
// enough. // sufficient.
self.is_empty.store(false, Ordering::Relaxed); self.is_empty.store(false, Ordering::Relaxed);
} }
@ -91,7 +91,7 @@ impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
// If the queue was empty before, update the flag. // If the queue was empty before, update the flag.
if was_empty { if was_empty {
// Ordering: this flag is only used as a hint so Relaxed ordering is // Ordering: this flag is only used as a hint so Relaxed ordering is
// enough. // sufficient.
self.is_empty.store(false, Ordering::Relaxed); self.is_empty.store(false, Ordering::Relaxed);
} }
} }
@ -106,15 +106,15 @@ impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
/// This is not an issue in practice because it cannot lead to executor /// This is not an issue in practice because it cannot lead to executor
/// deadlock. Indeed, if the last task/bucket was inserted by a worker /// deadlock. Indeed, if the last task/bucket was inserted by a worker
/// thread, this worker thread will always see that the injector queue is /// thread, this worker thread will always see that the injector queue is
/// populated (unless the bucket was already popped) so it will never exit /// populated (unless the bucket was already popped) so if all workers exit,
/// before all tasks in the injector are processed. Likewise, if the last /// then all tasks they have re-injected will necessarily have been
/// task/bucket was inserted by the main executor thread before /// processed. Likewise, if the last task/bucket was inserted by the main
/// `Executor::run()` is called, the synchronization established when the /// executor thread before `Executor::run()` is called, the synchronization
/// executor unparks worker threads ensures that the task is visible to all /// established when the executor unparks worker threads ensures that the
/// unparked workers. /// task is visible to all unparked workers.
pub(crate) fn pop_bucket(&self) -> Option<Bucket<T, BUCKET_CAPACITY>> { pub(crate) fn pop_bucket(&self) -> Option<Bucket<T, BUCKET_CAPACITY>> {
// Ordering: this flag is only used as a hint so Relaxed ordering is // Ordering: this flag is only used as a hint so Relaxed ordering is
// enough. // sufficient.
if self.is_empty.load(Ordering::Relaxed) { if self.is_empty.load(Ordering::Relaxed) {
return None; return None;
} }
@ -125,7 +125,7 @@ impl<T, const BUCKET_CAPACITY: usize> Injector<T, BUCKET_CAPACITY> {
if inner.is_empty() { if inner.is_empty() {
// Ordering: this flag is only used as a hint so Relaxed ordering is // Ordering: this flag is only used as a hint so Relaxed ordering is
// enough. // sufficient.
self.is_empty.store(true, Ordering::Relaxed); self.is_empty.store(true, Ordering::Relaxed);
} }

View File

@ -3,73 +3,60 @@ use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex; use std::sync::Mutex;
use super::find_bit; use super::find_bit;
use super::injector::Injector;
use super::rng; use super::rng;
use super::{GlobalQueue, Stealer}; use super::Stealer;
/// A view of the thread pool shared between the executor and all workers. /// Manager of worker threads.
///
/// The manager currently only supports up to `usize::BITS` threads.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Pool { pub(super) struct PoolManager {
pub(super) global_queue: GlobalQueue, pool_size: usize,
pub(super) executor_id: usize,
pub(super) executor_unparker: parking::Unparker,
registry: PoolRegistry,
stealers: Box<[Stealer]>, stealers: Box<[Stealer]>,
worker_unparkers: Box<[parking::Unparker]>, worker_unparkers: Box<[parking::Unparker]>,
active_workers: AtomicUsize,
searching_workers: AtomicUsize, searching_workers: AtomicUsize,
terminate_signal: AtomicBool, terminate_signal: AtomicBool,
worker_panic: Mutex<Option<Box<dyn Any + Send + 'static>>>, worker_panic: Mutex<Option<Box<dyn Any + Send + 'static>>>,
#[cfg(feature = "dev-logs")]
record: Record,
} }
impl Pool { impl PoolManager {
/// Creates a new pool. /// Creates a new pool manager.
///
/// #Panic
///
/// This will panic if the specified pool size is zero or is more than
/// `usize::BITS`.
pub(super) fn new( pub(super) fn new(
executor_id: usize, pool_size: usize,
executor_unparker: parking::Unparker, stealers: Box<[Stealer]>,
shared_data: impl Iterator<Item = (Stealer, parking::Unparker)>, worker_unparkers: Box<[parking::Unparker]>,
) -> Self { ) -> Self {
let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = shared_data.into_iter().unzip(); assert!(
let worker_unparkers = worker_unparkers.into_boxed_slice(); pool_size >= 1,
"the executor pool size should be at least one"
);
assert!(
pool_size <= usize::BITS as usize,
"the executor pool size should be at most {}",
usize::BITS
);
Self { Self {
global_queue: Injector::new(), pool_size,
executor_id, stealers,
executor_unparker,
registry: PoolRegistry::new(worker_unparkers.len()),
stealers: stealers.into_boxed_slice(),
worker_unparkers, worker_unparkers,
active_workers: AtomicUsize::new(0),
searching_workers: AtomicUsize::new(0), searching_workers: AtomicUsize::new(0),
terminate_signal: AtomicBool::new(false), terminate_signal: AtomicBool::new(false),
worker_panic: Mutex::new(None), worker_panic: Mutex::new(None),
#[cfg(feature = "dev-logs")]
record: Record::new(pool_size),
} }
} }
/// Marks all pool workers as active.
///
/// Unparking the worker threads is the responsibility of the caller.
pub(super) fn set_all_workers_active(&self) {
self.registry.set_all_active();
}
/// Marks all pool workers as inactive.
///
/// Unparking the executor threads is the responsibility of the caller.
pub(super) fn set_all_workers_inactive(&self) {
self.registry.set_all_inactive();
}
/// Marks the specified worker as inactive unless it is the last active
/// worker.
///
/// Parking the worker thread is the responsibility of the caller.
///
/// If this was the last active worker, `false` is returned and it is
/// guaranteed that all memory operations performed by threads that called
/// `activate_worker` will be visible.
pub(super) fn try_set_worker_inactive(&self, worker_id: usize) -> bool {
self.registry.try_set_inactive(worker_id)
}
/// Unparks an idle worker if any is found and mark it as active, or do /// Unparks an idle worker if any is found and mark it as active, or do
/// nothing otherwise. /// nothing otherwise.
/// ///
@ -79,9 +66,21 @@ impl Pool {
/// is not tolerable (for instance if this method is called from a /// is not tolerable (for instance if this method is called from a
/// non-worker thread), use the more expensive `activate_worker`. /// non-worker thread), use the more expensive `activate_worker`.
pub(super) fn activate_worker_relaxed(&self) { pub(super) fn activate_worker_relaxed(&self) {
if let Some(worker_id) = self.registry.set_one_active_relaxed() { let mut active_workers = self.active_workers.load(Ordering::Relaxed);
self.searching_workers.fetch_add(1, Ordering::Relaxed); loop {
self.worker_unparkers[worker_id].unpark(); let first_idle_worker = active_workers.trailing_ones() as usize;
if first_idle_worker >= self.pool_size {
return;
};
active_workers = self
.active_workers
.fetch_or(1 << first_idle_worker, Ordering::Relaxed);
if active_workers & (1 << first_idle_worker) == 0 {
#[cfg(feature = "dev-logs")]
self.record.increment(first_idle_worker);
self.begin_worker_search();
self.worker_unparkers[first_idle_worker].unpark();
}
} }
} }
@ -89,10 +88,99 @@ impl Pool {
/// that at least the last active worker will observe all memory operations /// that at least the last active worker will observe all memory operations
/// performed before this call when calling `try_set_worker_inactive`. /// performed before this call when calling `try_set_worker_inactive`.
pub(super) fn activate_worker(&self) { pub(super) fn activate_worker(&self) {
if let Some(worker_id) = self.registry.set_one_active() { let mut active_workers = self.active_workers.load(Ordering::Relaxed);
self.searching_workers.fetch_add(1, Ordering::Relaxed); loop {
self.worker_unparkers[worker_id].unpark(); let first_idle_worker = active_workers.trailing_ones() as usize;
if first_idle_worker >= self.pool_size {
// There is apparently no free worker, so a dummy RMW with
// Release ordering is performed with the sole purpose of
// synchronizing with the Acquire fence in `set_inactive` so
// that the last worker see the tasks that were queued prior to
// this call when calling (unsuccessfully) `set_inactive`..
let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release);
if new_active_workers == active_workers {
return;
} }
active_workers = new_active_workers;
} else {
active_workers = self
.active_workers
.fetch_or(1 << first_idle_worker, Ordering::Relaxed);
if active_workers & (1 << first_idle_worker) == 0 {
#[cfg(feature = "dev-logs")]
self.record.increment(first_idle_worker);
self.begin_worker_search();
self.worker_unparkers[first_idle_worker].unpark();
}
}
}
}
/// Marks the specified worker as inactive unless it is the last active
/// worker.
///
/// Parking the worker thread is the responsibility of the caller.
///
/// If this was the last active worker, `false` is returned and it is
/// guaranteed that all memory operations performed by threads that called
/// `activate_worker` will be visible. The worker is in such case expected
/// to check again the injector queue before explicitly calling
/// `set_all_workers_inactive` to confirm that the pool is indeed idle.
pub(super) fn try_set_worker_inactive(&self, worker_id: usize) -> bool {
// Ordering: this Release operation synchronizes with the Acquire fence
// in the below conditional if this is is the last active worker, and/or
// with the Acquire state load in the `pool_state` method.
let active_workers = self
.active_workers
.fetch_update(Ordering::Release, Ordering::Relaxed, |active_workers| {
if active_workers == (1 << worker_id) {
// It looks like this is the last worker, but the value
// could be stale so it is necessary to make sure of this by
// enforcing the CAS rather than returning `None`.
Some(active_workers)
} else {
Some(active_workers & !(1 << worker_id))
}
})
.unwrap();
assert_ne!(active_workers & (1 << worker_id), 0);
if active_workers == (1 << worker_id) {
// This is the last worker so we need to ensures that after this
// call, all tasks pushed on the injector queue before
// `set_one_active` was called unsuccessfully are visible.
//
// Ordering: this Acquire fence synchronizes with all Release RMWs
// in this and in the previous calls to `set_inactive` via a release
// sequence.
atomic::fence(Ordering::Acquire);
false
} else {
true
}
}
/// Marks all pool workers as active.
///
/// Unparking the worker threads is the responsibility of the caller.
pub(super) fn set_all_workers_active(&self) {
// Mark all workers as busy.
self.active_workers.store(
!0 >> (usize::BITS - self.pool_size as u32),
Ordering::Relaxed,
);
}
/// Marks all pool workers as inactive.
///
/// Unparking the executor threads is the responsibility of the caller.
pub(super) fn set_all_workers_inactive(&self) {
// Ordering: this Release store synchronizes with the Acquire load in
// `is_idle`.
self.active_workers.store(0, Ordering::Release);
} }
/// Check if the pool is idle, i.e. if no worker is currently active. /// Check if the pool is idle, i.e. if no worker is currently active.
@ -100,7 +188,9 @@ impl Pool {
/// If `true` is returned, it is guaranteed that all operations performed by /// If `true` is returned, it is guaranteed that all operations performed by
/// the now-inactive workers become visible in this thread. /// the now-inactive workers become visible in this thread.
pub(super) fn is_pool_idle(&self) -> bool { pub(super) fn is_pool_idle(&self) -> bool {
self.registry.is_idle() // Ordering: this Acquire operation synchronizes with all Release
// RMWs in the `set_worker_inactive` method via a release sequence.
self.active_workers.load(Ordering::Acquire) == 0
} }
/// Increments the count of workers actively searching for tasks. /// Increments the count of workers actively searching for tasks.
@ -123,7 +213,7 @@ impl Pool {
pub(super) fn trigger_termination(&self) { pub(super) fn trigger_termination(&self) {
self.terminate_signal.store(true, Ordering::Relaxed); self.terminate_signal.store(true, Ordering::Relaxed);
self.registry.set_all_active(); self.set_all_workers_active();
for unparker in &*self.worker_unparkers { for unparker in &*self.worker_unparkers {
unparker.unpark(); unparker.unpark();
} }
@ -162,7 +252,7 @@ impl Pool {
rng: &'_ rng::Rng, rng: &'_ rng::Rng,
) -> ShuffledStealers<'a> { ) -> ShuffledStealers<'a> {
// All active workers except the specified one are candidate for stealing. // All active workers except the specified one are candidate for stealing.
let mut candidates = self.registry.get_active(); let mut candidates = self.active_workers.load(Ordering::Relaxed);
if let Some(excluded_worker_id) = excluded_worker_id { if let Some(excluded_worker_id) = excluded_worker_id {
candidates &= !(1 << excluded_worker_id); candidates &= !(1 << excluded_worker_id);
} }
@ -171,8 +261,15 @@ impl Pool {
} }
} }
#[cfg(feature = "dev-logs")]
impl Drop for PoolManager {
fn drop(&mut self) {
println!("Thread launch count: {:?}", self.record.get());
}
}
/// An iterator over active workers that yields their associated stealer, /// An iterator over active workers that yields their associated stealer,
/// starting from a randomly selected worker. /// starting from a randomly selected active worker.
pub(super) struct ShuffledStealers<'a> { pub(super) struct ShuffledStealers<'a> {
stealers: &'a [Stealer], stealers: &'a [Stealer],
// A bit-rotated bit field of the remaining candidate workers to steal from. // A bit-rotated bit field of the remaining candidate workers to steal from.
@ -236,180 +333,6 @@ impl<'a> Iterator for ShuffledStealers<'a> {
} }
} }
/// Registry of active/idle worker threads.
///
/// The registry only supports up to `usize::BITS` threads.
#[derive(Debug)]
struct PoolRegistry {
active_workers: AtomicUsize,
pool_size: usize,
#[cfg(feature = "dev-logs")]
record: Record,
}
impl PoolRegistry {
/// Creates a new pool registry.
///
/// #Panic
///
/// This will panic if the specified pool size is zero or is more than
/// `usize::BITS`.
fn new(pool_size: usize) -> Self {
assert!(
pool_size >= 1,
"the executor pool size should be at least one"
);
assert!(
pool_size <= usize::BITS as usize,
"the executor pool size should be at most {}",
usize::BITS
);
Self {
active_workers: AtomicUsize::new(0),
pool_size,
#[cfg(feature = "dev-logs")]
record: Record::new(pool_size),
}
}
/// Returns whether the pool is idle, i.e. whether there are no active
/// workers.
///
/// It is guaranteed that if `false` is returned, then all operations
/// performed by the now-inactive workers are visible.
fn is_idle(&self) -> bool {
// Ordering: this Acquire operation synchronizes with all Release
// RMWs in the `set_inactive` method via a release sequence.
self.active_workers.load(Ordering::Acquire) == 0
}
/// Marks the specified worker as inactive, unless this is the last worker.
///
/// The specified worker must currently be marked as active. This method
/// will always fail and return `false` if this was the last active worker,
/// because the worker is then expected to check again the global queue
/// before explicitly calling `set_all_inactive` to confirm that the pool is
/// indeed idle.
fn try_set_inactive(&self, worker_id: usize) -> bool {
// Ordering: this Release operation synchronizes with the Acquire fence
// in the below conditional if this is is the last active worker, and/or
// with the Acquire state load in the `pool_state` method.
let active_workers = self
.active_workers
.fetch_update(Ordering::Release, Ordering::Relaxed, |active_workers| {
if active_workers == (1 << worker_id) {
// It looks like this is the last worker, but the value
// could be stale so it is necessary to make sure of this by
// enforcing the CAS rather than returning `None`.
Some(active_workers)
} else {
Some(active_workers & !(1 << worker_id))
}
})
.unwrap();
assert_ne!(active_workers & (1 << worker_id), 0);
if active_workers == (1 << worker_id) {
// This is the last worker so we need to ensures that after this
// call, all tasks pushed on the global queue before
// `set_one_active` was called unsuccessfully are visible.
//
// Ordering: this Acquire fence synchronizes with all Release RMWs
// in this and in the previous calls to `set_inactive` via a release
// sequence.
atomic::fence(Ordering::Acquire);
false
} else {
true
}
}
/// Marks all workers as inactive.
fn set_all_inactive(&self) {
// Ordering: this Release store synchronizes with the Acquire load in
// `is_idle`.
self.active_workers.store(0, Ordering::Release);
}
/// Marks all workers as active.
fn set_all_active(&self) {
// Mark all workers as busy.
self.active_workers.store(
!0 >> (usize::BITS - self.pool_size as u32),
Ordering::Relaxed,
);
}
/// Marks a worker as active if any is found, otherwise do nothing.
///
/// The worker ID is returned if successful.
fn set_one_active_relaxed(&self) -> Option<usize> {
let mut active_workers = self.active_workers.load(Ordering::Relaxed);
loop {
let first_idle_worker = active_workers.trailing_ones() as usize;
if first_idle_worker >= self.pool_size {
return None;
};
active_workers = self
.active_workers
.fetch_or(1 << first_idle_worker, Ordering::Relaxed);
if active_workers & (1 << first_idle_worker) == 0 {
#[cfg(feature = "dev-logs")]
self.record.increment(first_idle_worker);
return Some(first_idle_worker);
}
}
}
/// Marks a worker as active if any is found, otherwise ensure that all
/// memory operations made by the caller prior to this call are visible by
/// the last worker transitioning to idle state.
///
/// The worker ID is returned if successful.
fn set_one_active(&self) -> Option<usize> {
let mut active_workers = self.active_workers.load(Ordering::Relaxed);
loop {
let first_idle_worker = active_workers.trailing_ones() as usize;
if first_idle_worker >= self.pool_size {
// There is apparently no free worker, so a dummy RMW with
// Release ordering is performed with the sole purpose of
// synchronizing with the Acquire fence in `set_inactive` so
// that the last worker see the tasks that were queued prior to
// this call when calling (unsuccessfully) `set_inactive`..
let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release);
if new_active_workers == active_workers {
return None;
}
active_workers = new_active_workers;
} else {
active_workers = self
.active_workers
.fetch_or(1 << first_idle_worker, Ordering::Relaxed);
if active_workers & (1 << first_idle_worker) == 0 {
#[cfg(feature = "dev-logs")]
self.record.increment(first_idle_worker);
return Some(first_idle_worker);
}
}
}
}
/// Returns a bit field that indicates all active workers.
fn get_active(&self) -> usize {
self.active_workers.load(Ordering::Relaxed)
}
}
#[cfg(feature = "dev-logs")]
impl Drop for PoolRegistry {
fn drop(&mut self) {
println!("Thread launch count: {:?}", self.record.get());
}
}
#[cfg(feature = "dev-logs")] #[cfg(feature = "dev-logs")]
#[derive(Debug)] #[derive(Debug)]
struct Record { struct Record {

View File

@ -2,7 +2,7 @@ use std::cell::Cell;
/// A pseudo-random number generator based on Wang Yi's Wyrand. /// A pseudo-random number generator based on Wang Yi's Wyrand.
/// ///
/// See: https://github.com/wangyi-fudan/wyhash /// See: <https://github.com/wangyi-fudan/wyhash>
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct Rng { pub(crate) struct Rng {
seed: Cell<u64>, seed: Cell<u64>,

View File

@ -3,23 +3,23 @@ use std::sync::Arc;
use super::task::Runnable; use super::task::Runnable;
use super::pool::Pool; use super::ExecutorContext;
use super::LocalQueue; use super::LocalQueue;
/// A local worker with access to global executor resources. /// A local worker with access to global executor resources.
pub(crate) struct Worker { pub(crate) struct Worker {
pub(super) local_queue: LocalQueue, pub(super) local_queue: LocalQueue,
pub(super) fast_slot: Cell<Option<Runnable>>, pub(super) fast_slot: Cell<Option<Runnable>>,
pub(super) pool: Arc<Pool>, pub(super) executor_context: Arc<ExecutorContext>,
} }
impl Worker { impl Worker {
/// Creates a new worker. /// Creates a new worker.
pub(super) fn new(local_queue: LocalQueue, pool: Arc<Pool>) -> Self { pub(super) fn new(local_queue: LocalQueue, executor_context: Arc<ExecutorContext>) -> Self {
Self { Self {
local_queue, local_queue,
fast_slot: Cell::new(None), fast_slot: Cell::new(None),
pool, executor_context: executor_context,
} }
} }
} }