diff --git a/asynchronix/src/runtime/executor.rs b/asynchronix/src/runtime/executor.rs index 6d4d5da..da93a89 100644 --- a/asynchronix/src/runtime/executor.rs +++ b/asynchronix/src/runtime/executor.rs @@ -10,7 +10,7 @@ use slab::Slab; mod find_bit; mod injector; -mod pool; +mod pool_manager; mod queue; mod rng; mod task; @@ -19,14 +19,14 @@ mod worker; #[cfg(all(test, not(asynchronix_loom)))] mod tests; -use self::pool::Pool; +use self::pool_manager::PoolManager; use self::rng::Rng; use self::task::{CancelToken, Promise, Runnable}; use self::worker::Worker; use crate::macros::scoped_local_key::scoped_thread_local; type Bucket = injector::Bucket; -type GlobalQueue = injector::Injector; +type Injector = injector::Injector; type LocalQueue = queue::Worker; type Stealer = queue::Stealer; @@ -55,23 +55,31 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); /// The design of the executor is largely influenced by the tokio and go /// schedulers, both of which are optimized for message-passing applications. In /// particular, it uses fast, fixed-size thread-local work-stealing queues with -/// a "fast" non-stealable slot in combination with a global injector queue. The +/// a non-stealable LIFO slot in combination with an injector queue, which /// injector queue is used both to schedule new tasks and to absorb temporary -/// overflow in the local queues. The design of the injector queue is kept very -/// simple by taking advantage of the fact that the injector is not required to -/// be either LIFO or FIFO. +/// overflow in the local queues. /// -/// Probably the largest difference with tokio is the task system, which -/// achieves a higher throughput by reducing the need for synchronization. -/// Another difference is that, at the moment, the complete subset of active -/// worker threads is stored in a single atomic variable. This makes it possible -/// to rapidly identify free worker threads for stealing operations. The -/// downside of this approach is that the maximum number of worker threads is -/// limited to `usize::BITS`, but this is unlikely to constitute a limitation -/// since system simulation is not typically embarrassingly parallel. +/// The design of the injector queue is kept very simple compared to tokio, by +/// taking advantage of the fact that the injector is not required to be either +/// LIFO or FIFO. Moving tasks between a local queue and the injector is fast +/// because tasks are moved in batch and are stored contiguously in memory. +/// +/// Another difference with tokio is that, at the moment, the complete subset of +/// active worker threads is stored in a single atomic variable. This makes it +/// possible to rapidly identify free worker threads for stealing operations, +/// with the downside that the maximum number of worker threads is currently +/// limited to `usize::BITS`. This is unlikely to constitute a limitation in +/// practice though since system simulation is not typically embarrassingly +/// parallel. +/// +/// Probably the largest difference with tokio is the task system, which has +/// better throughput due to less need for synchronization. This mainly results +/// from the use of an atomic notification counter rather than an atomic +/// notification flag, thus alleviating the need to reset the notification flag +/// before polling a future. #[derive(Debug)] pub(crate) struct Executor { - pool: Arc, + context: Arc, active_tasks: Arc>>, parker: parking::Parker, join_handles: Vec>, @@ -103,13 +111,17 @@ impl Executor { usize::MAX / 2 ); - let pool = Arc::new(Pool::new(executor_id, unparker, shared_data.into_iter())); + let context = Arc::new(ExecutorContext::new( + executor_id, + unparker, + shared_data.into_iter(), + )); let active_tasks = Arc::new(Mutex::new(Slab::new())); // All workers must be marked as active _before_ spawning the threads to // make sure that the count of active workers does not fall to zero // before all workers are blocked on the signal barrier. - pool.set_all_workers_active(); + context.pool_manager.set_all_workers_active(); // Spawn all worker threads. let join_handles: Vec<_> = local_data @@ -121,10 +133,10 @@ impl Executor { thread_builder .spawn({ - let pool = pool.clone(); + let context = context.clone(); let active_tasks = active_tasks.clone(); move || { - let worker = Worker::new(local_queue, pool); + let worker = Worker::new(local_queue, context); ACTIVE_TASKS.set(&active_tasks, || { LOCAL_WORKER .set(&worker, || run_local_worker(&worker, id, worker_parker)) @@ -137,10 +149,10 @@ impl Executor { // Wait until all workers are blocked on the signal barrier. parker.park(); - assert!(pool.is_pool_idle()); + assert!(context.pool_manager.is_pool_idle()); Self { - pool, + context, active_tasks, parker, join_handles, @@ -163,12 +175,12 @@ impl Executor { let future = CancellableFuture::new(future, task_entry.key()); let (promise, runnable, cancel_token) = - task::spawn(future, schedule_task, self.pool.executor_id); + task::spawn(future, schedule_task, self.context.executor_id); task_entry.insert(cancel_token); - self.pool.global_queue.insert_task(runnable); + self.context.injector.insert_task(runnable); - self.pool.activate_worker(); + self.context.pool_manager.activate_worker(); promise } @@ -191,22 +203,22 @@ impl Executor { let future = CancellableFuture::new(future, task_entry.key()); let (runnable, cancel_token) = - task::spawn_and_forget(future, schedule_task, self.pool.executor_id); + task::spawn_and_forget(future, schedule_task, self.context.executor_id); task_entry.insert(cancel_token); - self.pool.global_queue.insert_task(runnable); + self.context.injector.insert_task(runnable); - self.pool.activate_worker(); + self.context.pool_manager.activate_worker(); } /// Let the executor run, blocking until all futures have completed or until /// the executor deadlocks. pub(crate) fn run(&mut self) { loop { - if let Some(worker_panic) = self.pool.take_panic() { + if let Some(worker_panic) = self.context.pool_manager.take_panic() { panic::resume_unwind(worker_panic); } - if self.pool.is_pool_idle() { + if self.context.pool_manager.is_pool_idle() { return; } @@ -218,7 +230,7 @@ impl Executor { impl Drop for Executor { fn drop(&mut self) { // Force all threads to return. - self.pool.trigger_termination(); + self.context.pool_manager.trigger_termination(); for join_handle in self.join_handles.drain(0..) { join_handle.join().unwrap(); } @@ -227,7 +239,7 @@ impl Drop for Executor { // // A local worker must be set because some tasks may schedule other // tasks when dropped, which requires that a local worker be available. - let worker = Worker::new(LocalQueue::new(), self.pool.clone()); + let worker = Worker::new(LocalQueue::new(), self.context.clone()); LOCAL_WORKER.set(&worker, || { // Cancel all pending futures. // @@ -246,7 +258,7 @@ impl Drop for Executor { // Some of the dropped tasks may have scheduled other tasks that // were not yet cancelled, preventing them from being dropped // upon cancellation. This is OK: the scheduled tasks will be - // dropped when the local and global queues are dropped, and + // dropped when the local and injector queues are dropped, and // they cannot re-schedule one another since all tasks were // cancelled. }); @@ -254,6 +266,38 @@ impl Drop for Executor { } } +/// Shared executor context. +#[derive(Debug)] +struct ExecutorContext { + injector: Injector, + executor_id: usize, + executor_unparker: parking::Unparker, + pool_manager: PoolManager, +} + +impl ExecutorContext { + /// Creates a new shared executor context. + pub(super) fn new( + executor_id: usize, + executor_unparker: parking::Unparker, + shared_data: impl Iterator, + ) -> Self { + let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = shared_data.into_iter().unzip(); + let worker_unparkers = worker_unparkers.into_boxed_slice(); + + Self { + injector: Injector::new(), + executor_id, + executor_unparker, + pool_manager: PoolManager::new( + worker_unparkers.len(), + stealers.into_boxed_slice(), + worker_unparkers, + ), + } + } +} + // A `Future` wrapper that removes its cancellation token from the executor's // list of active tasks when dropped. struct CancellableFuture { @@ -301,15 +345,20 @@ impl Drop for CancellableFuture { fn schedule_task(task: Runnable, executor_id: usize) { LOCAL_WORKER .map(|worker| { + let pool_manager = &worker.executor_context.pool_manager; + let injector = &worker.executor_context.injector; + let local_queue = &worker.local_queue; + let fast_slot = &worker.fast_slot; + // Check that this task was indeed spawned on this executor. assert_eq!( - executor_id, worker.pool.executor_id, + executor_id, worker.executor_context.executor_id, "Tasks must be awaken on the same executor they are spawned on" ); // Store the task in the fast slot and retrieve the one that was // formerly stored, if any. - let prev_task = match worker.fast_slot.replace(Some(task)) { + let prev_task = match fast_slot.replace(Some(task)) { // If there already was a task in the slot, proceed so it can be // moved to a task queue. Some(t) => t, @@ -319,26 +368,24 @@ fn schedule_task(task: Runnable, executor_id: usize) { }; // Push the previous task to the local queue if possible or on the - // global queue otherwise. - if let Err(prev_task) = worker.local_queue.push(prev_task) { - // The local queue is full. Try to move half of it to the global - // queue; if this fails, just push one task to the global queue. - if let Ok(drain) = worker.local_queue.drain(|_| Bucket::capacity()) { - worker - .pool - .global_queue - .push_bucket(Bucket::from_iter(drain)); - worker.local_queue.push(prev_task).unwrap(); + // injector queue otherwise. + if let Err(prev_task) = local_queue.push(prev_task) { + // The local queue is full. Try to move half of it to the + // injector queue; if this fails, just push one task to the + // injector queue. + if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) { + injector.push_bucket(Bucket::from_iter(drain)); + local_queue.push(prev_task).unwrap(); } else { - worker.pool.global_queue.insert_task(prev_task); + injector.insert_task(prev_task); } } - // A task has been pushed to the local or global queue: try to + // A task has been pushed to the local or injector queue: try to // activate another worker if no worker is currently searching for a // task. - if worker.pool.searching_worker_count() == 0 { - worker.pool.activate_worker_relaxed(); + if pool_manager.searching_worker_count() == 0 { + pool_manager.activate_worker_relaxed(); } }) .expect("Tasks may not be awaken outside executor threads"); @@ -347,6 +394,12 @@ fn schedule_task(task: Runnable, executor_id: usize) { /// Processes all incoming tasks on a worker thread until the `Terminate` signal /// is received or until it panics. fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { + let pool_manager = &worker.executor_context.pool_manager; + let injector = &worker.executor_context.injector; + let executor_unparker = &worker.executor_context.executor_unparker; + let local_queue = &worker.local_queue; + let fast_slot = &worker.fast_slot; + let result = panic::catch_unwind(AssertUnwindSafe(|| { // Set how long to spin when searching for a task. const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); @@ -356,22 +409,30 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { loop { // Signal barrier: park until notified to continue or terminate. - if worker.pool.try_set_worker_inactive(id) { + + // Try to deactivate the worker. + if pool_manager.try_set_worker_inactive(id) { parker.park(); + // No need to call `begin_worker_search()`: this was done by the + // thread that unparked the worker. + } else if injector.is_empty() { + // This worker could not be deactivated because it was the last + // active worker. In such case, the call to + // `try_set_worker_inactive` establishes a synchronization with + // all threads that pushed tasks to the injector queue but could + // not activate a new worker, which is why some tasks may now be + // visible in the injector queue. + pool_manager.set_all_workers_inactive(); + executor_unparker.unpark(); + parker.park(); + // No need to call `begin_worker_search()`: this was done by the + // thread that unparked the worker. } else { - // This worker is the last active worker so it is necessary to - // check if the global queue is populated. This could happen if - // the executor thread pushed a task to the global queue but - // could not activate a new worker because all workers were then - // activated. - if worker.pool.global_queue.is_empty() { - worker.pool.set_all_workers_inactive(); - worker.pool.executor_unparker.unpark(); - parker.park(); - } + // There are tasks in the injector: resume the search. + pool_manager.begin_worker_search(); } - if worker.pool.termination_is_triggered() { + if pool_manager.termination_is_triggered() { return; } @@ -379,8 +440,8 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // Process the tasks one by one. loop { - // Check the global queue first. - if let Some(bucket) = worker.pool.global_queue.pop_bucket() { + // Check the injector queue first. + if let Some(bucket) = injector.pop_bucket() { let bucket_iter = bucket.into_iter(); // There is a _very_ remote possibility that, even though @@ -390,42 +451,43 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // it took to pop and process the remaining tasks and it // hasn't released the stolen capacity yet. // - // Unfortunately, we cannot just skip checking the global + // Unfortunately, we cannot just skip checking the injector // queue altogether when there isn't enough spare capacity // in the local queue because this could lead to a race: // suppose that (1) this thread has earlier pushed tasks - // onto the global queue, and (2) the stealer has processed - // all stolen tasks before this thread sees the capacity - // restored and at the same time (3) the stealer does not - // yet see the tasks this thread pushed to the global queue; - // in such scenario, both this thread and the stealer thread - // may park and leave unprocessed tasks in the global queue. + // onto the injector queue, and (2) the stealer has + // processed all stolen tasks before this thread sees the + // capacity restored and at the same time (3) the stealer + // does not yet see the tasks this thread pushed to the + // injector queue; in such scenario, both this thread and + // the stealer thread may park and leave unprocessed tasks + // in the injector queue. // // This is the only instance where spinning is used, as the // probability of this happening is close to zero and the // complexity of a signaling mechanism (condvar & friends) // wouldn't carry its weight. - while worker.local_queue.spare_capacity() < bucket_iter.len() {} + while local_queue.spare_capacity() < bucket_iter.len() {} - // Since empty buckets are never pushed onto the global + // Since empty buckets are never pushed onto the injector // queue, we should now have at least one task to process. - worker.local_queue.extend(bucket_iter); + local_queue.extend(bucket_iter); } else { - // The global queue is empty. Try to steal from active + // The injector queue is empty. Try to steal from active // siblings. - let mut stealers = worker.pool.shuffled_stealers(Some(id), &rng); + let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng); if stealers.all(|stealer| { stealer - .steal_and_pop(&worker.local_queue, |n| n - n / 2) + .steal_and_pop(local_queue, |n| n - n / 2) .map(|task| { - let prev_task = worker.fast_slot.replace(Some(task)); + let prev_task = fast_slot.replace(Some(task)); assert!(prev_task.is_none()); }) .is_err() }) { // Give up if unsuccessful for too long. if (Instant::now() - search_start) > MAX_SEARCH_DURATION { - worker.pool.end_worker_search(); + pool_manager.end_worker_search(); break; } @@ -436,19 +498,18 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // Signal the end of the search so that another worker can be // activated when a new task is scheduled. - worker.pool.end_worker_search(); + pool_manager.end_worker_search(); // Pop tasks from the fast slot or the local queue. - while let Some(task) = worker.fast_slot.take().or_else(|| worker.local_queue.pop()) - { - if worker.pool.termination_is_triggered() { + while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { + if pool_manager.termination_is_triggered() { return; } task.run(); } // Resume the search for tasks. - worker.pool.begin_worker_search(); + pool_manager.begin_worker_search(); search_start = Instant::now(); } } @@ -456,8 +517,8 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // Propagate the panic, if any. if let Err(panic) = result { - worker.pool.register_panic(panic); - worker.pool.trigger_termination(); - worker.pool.executor_unparker.unpark(); + pool_manager.register_panic(panic); + pool_manager.trigger_termination(); + executor_unparker.unpark(); } } diff --git a/asynchronix/src/runtime/executor/injector.rs b/asynchronix/src/runtime/executor/injector.rs index 18c53fe..4349681 100644 --- a/asynchronix/src/runtime/executor/injector.rs +++ b/asynchronix/src/runtime/executor/injector.rs @@ -77,7 +77,7 @@ impl Injector { inner.push(new_bucket); // Ordering: this flag is only used as a hint so Relaxed ordering is - // enough. + // sufficient. self.is_empty.store(false, Ordering::Relaxed); } @@ -91,7 +91,7 @@ impl Injector { // If the queue was empty before, update the flag. if was_empty { // Ordering: this flag is only used as a hint so Relaxed ordering is - // enough. + // sufficient. self.is_empty.store(false, Ordering::Relaxed); } } @@ -106,15 +106,15 @@ impl Injector { /// This is not an issue in practice because it cannot lead to executor /// deadlock. Indeed, if the last task/bucket was inserted by a worker /// thread, this worker thread will always see that the injector queue is - /// populated (unless the bucket was already popped) so it will never exit - /// before all tasks in the injector are processed. Likewise, if the last - /// task/bucket was inserted by the main executor thread before - /// `Executor::run()` is called, the synchronization established when the - /// executor unparks worker threads ensures that the task is visible to all - /// unparked workers. + /// populated (unless the bucket was already popped) so if all workers exit, + /// then all tasks they have re-injected will necessarily have been + /// processed. Likewise, if the last task/bucket was inserted by the main + /// executor thread before `Executor::run()` is called, the synchronization + /// established when the executor unparks worker threads ensures that the + /// task is visible to all unparked workers. pub(crate) fn pop_bucket(&self) -> Option> { // Ordering: this flag is only used as a hint so Relaxed ordering is - // enough. + // sufficient. if self.is_empty.load(Ordering::Relaxed) { return None; } @@ -125,7 +125,7 @@ impl Injector { if inner.is_empty() { // Ordering: this flag is only used as a hint so Relaxed ordering is - // enough. + // sufficient. self.is_empty.store(true, Ordering::Relaxed); } diff --git a/asynchronix/src/runtime/executor/pool.rs b/asynchronix/src/runtime/executor/pool_manager.rs similarity index 76% rename from asynchronix/src/runtime/executor/pool.rs rename to asynchronix/src/runtime/executor/pool_manager.rs index 8b8d278..e83b415 100644 --- a/asynchronix/src/runtime/executor/pool.rs +++ b/asynchronix/src/runtime/executor/pool_manager.rs @@ -3,73 +3,60 @@ use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; use std::sync::Mutex; use super::find_bit; -use super::injector::Injector; use super::rng; -use super::{GlobalQueue, Stealer}; +use super::Stealer; -/// A view of the thread pool shared between the executor and all workers. +/// Manager of worker threads. +/// +/// The manager currently only supports up to `usize::BITS` threads. #[derive(Debug)] -pub(super) struct Pool { - pub(super) global_queue: GlobalQueue, - pub(super) executor_id: usize, - pub(super) executor_unparker: parking::Unparker, - registry: PoolRegistry, +pub(super) struct PoolManager { + pool_size: usize, stealers: Box<[Stealer]>, worker_unparkers: Box<[parking::Unparker]>, + active_workers: AtomicUsize, searching_workers: AtomicUsize, terminate_signal: AtomicBool, worker_panic: Mutex>>, + #[cfg(feature = "dev-logs")] + record: Record, } -impl Pool { - /// Creates a new pool. +impl PoolManager { + /// Creates a new pool manager. + /// + /// #Panic + /// + /// This will panic if the specified pool size is zero or is more than + /// `usize::BITS`. pub(super) fn new( - executor_id: usize, - executor_unparker: parking::Unparker, - shared_data: impl Iterator, + pool_size: usize, + stealers: Box<[Stealer]>, + worker_unparkers: Box<[parking::Unparker]>, ) -> Self { - let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = shared_data.into_iter().unzip(); - let worker_unparkers = worker_unparkers.into_boxed_slice(); + assert!( + pool_size >= 1, + "the executor pool size should be at least one" + ); + assert!( + pool_size <= usize::BITS as usize, + "the executor pool size should be at most {}", + usize::BITS + ); Self { - global_queue: Injector::new(), - executor_id, - executor_unparker, - registry: PoolRegistry::new(worker_unparkers.len()), - stealers: stealers.into_boxed_slice(), + pool_size, + stealers, worker_unparkers, + active_workers: AtomicUsize::new(0), searching_workers: AtomicUsize::new(0), terminate_signal: AtomicBool::new(false), worker_panic: Mutex::new(None), + #[cfg(feature = "dev-logs")] + record: Record::new(pool_size), } } - /// Marks all pool workers as active. - /// - /// Unparking the worker threads is the responsibility of the caller. - pub(super) fn set_all_workers_active(&self) { - self.registry.set_all_active(); - } - - /// Marks all pool workers as inactive. - /// - /// Unparking the executor threads is the responsibility of the caller. - pub(super) fn set_all_workers_inactive(&self) { - self.registry.set_all_inactive(); - } - - /// Marks the specified worker as inactive unless it is the last active - /// worker. - /// - /// Parking the worker thread is the responsibility of the caller. - /// - /// If this was the last active worker, `false` is returned and it is - /// guaranteed that all memory operations performed by threads that called - /// `activate_worker` will be visible. - pub(super) fn try_set_worker_inactive(&self, worker_id: usize) -> bool { - self.registry.try_set_inactive(worker_id) - } - /// Unparks an idle worker if any is found and mark it as active, or do /// nothing otherwise. /// @@ -79,9 +66,21 @@ impl Pool { /// is not tolerable (for instance if this method is called from a /// non-worker thread), use the more expensive `activate_worker`. pub(super) fn activate_worker_relaxed(&self) { - if let Some(worker_id) = self.registry.set_one_active_relaxed() { - self.searching_workers.fetch_add(1, Ordering::Relaxed); - self.worker_unparkers[worker_id].unpark(); + let mut active_workers = self.active_workers.load(Ordering::Relaxed); + loop { + let first_idle_worker = active_workers.trailing_ones() as usize; + if first_idle_worker >= self.pool_size { + return; + }; + active_workers = self + .active_workers + .fetch_or(1 << first_idle_worker, Ordering::Relaxed); + if active_workers & (1 << first_idle_worker) == 0 { + #[cfg(feature = "dev-logs")] + self.record.increment(first_idle_worker); + self.begin_worker_search(); + self.worker_unparkers[first_idle_worker].unpark(); + } } } @@ -89,18 +88,109 @@ impl Pool { /// that at least the last active worker will observe all memory operations /// performed before this call when calling `try_set_worker_inactive`. pub(super) fn activate_worker(&self) { - if let Some(worker_id) = self.registry.set_one_active() { - self.searching_workers.fetch_add(1, Ordering::Relaxed); - self.worker_unparkers[worker_id].unpark(); + let mut active_workers = self.active_workers.load(Ordering::Relaxed); + loop { + let first_idle_worker = active_workers.trailing_ones() as usize; + + if first_idle_worker >= self.pool_size { + // There is apparently no free worker, so a dummy RMW with + // Release ordering is performed with the sole purpose of + // synchronizing with the Acquire fence in `set_inactive` so + // that the last worker see the tasks that were queued prior to + // this call when calling (unsuccessfully) `set_inactive`.. + let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release); + if new_active_workers == active_workers { + return; + } + active_workers = new_active_workers; + } else { + active_workers = self + .active_workers + .fetch_or(1 << first_idle_worker, Ordering::Relaxed); + if active_workers & (1 << first_idle_worker) == 0 { + #[cfg(feature = "dev-logs")] + self.record.increment(first_idle_worker); + self.begin_worker_search(); + self.worker_unparkers[first_idle_worker].unpark(); + } + } } } + /// Marks the specified worker as inactive unless it is the last active + /// worker. + /// + /// Parking the worker thread is the responsibility of the caller. + /// + /// If this was the last active worker, `false` is returned and it is + /// guaranteed that all memory operations performed by threads that called + /// `activate_worker` will be visible. The worker is in such case expected + /// to check again the injector queue before explicitly calling + /// `set_all_workers_inactive` to confirm that the pool is indeed idle. + pub(super) fn try_set_worker_inactive(&self, worker_id: usize) -> bool { + // Ordering: this Release operation synchronizes with the Acquire fence + // in the below conditional if this is is the last active worker, and/or + // with the Acquire state load in the `pool_state` method. + let active_workers = self + .active_workers + .fetch_update(Ordering::Release, Ordering::Relaxed, |active_workers| { + if active_workers == (1 << worker_id) { + // It looks like this is the last worker, but the value + // could be stale so it is necessary to make sure of this by + // enforcing the CAS rather than returning `None`. + Some(active_workers) + } else { + Some(active_workers & !(1 << worker_id)) + } + }) + .unwrap(); + + assert_ne!(active_workers & (1 << worker_id), 0); + + if active_workers == (1 << worker_id) { + // This is the last worker so we need to ensures that after this + // call, all tasks pushed on the injector queue before + // `set_one_active` was called unsuccessfully are visible. + // + // Ordering: this Acquire fence synchronizes with all Release RMWs + // in this and in the previous calls to `set_inactive` via a release + // sequence. + atomic::fence(Ordering::Acquire); + + false + } else { + true + } + } + + /// Marks all pool workers as active. + /// + /// Unparking the worker threads is the responsibility of the caller. + pub(super) fn set_all_workers_active(&self) { + // Mark all workers as busy. + self.active_workers.store( + !0 >> (usize::BITS - self.pool_size as u32), + Ordering::Relaxed, + ); + } + + /// Marks all pool workers as inactive. + /// + /// Unparking the executor threads is the responsibility of the caller. + pub(super) fn set_all_workers_inactive(&self) { + // Ordering: this Release store synchronizes with the Acquire load in + // `is_idle`. + self.active_workers.store(0, Ordering::Release); + } + /// Check if the pool is idle, i.e. if no worker is currently active. /// /// If `true` is returned, it is guaranteed that all operations performed by /// the now-inactive workers become visible in this thread. pub(super) fn is_pool_idle(&self) -> bool { - self.registry.is_idle() + // Ordering: this Acquire operation synchronizes with all Release + // RMWs in the `set_worker_inactive` method via a release sequence. + self.active_workers.load(Ordering::Acquire) == 0 } /// Increments the count of workers actively searching for tasks. @@ -123,7 +213,7 @@ impl Pool { pub(super) fn trigger_termination(&self) { self.terminate_signal.store(true, Ordering::Relaxed); - self.registry.set_all_active(); + self.set_all_workers_active(); for unparker in &*self.worker_unparkers { unparker.unpark(); } @@ -162,7 +252,7 @@ impl Pool { rng: &'_ rng::Rng, ) -> ShuffledStealers<'a> { // All active workers except the specified one are candidate for stealing. - let mut candidates = self.registry.get_active(); + let mut candidates = self.active_workers.load(Ordering::Relaxed); if let Some(excluded_worker_id) = excluded_worker_id { candidates &= !(1 << excluded_worker_id); } @@ -171,8 +261,15 @@ impl Pool { } } +#[cfg(feature = "dev-logs")] +impl Drop for PoolManager { + fn drop(&mut self) { + println!("Thread launch count: {:?}", self.record.get()); + } +} + /// An iterator over active workers that yields their associated stealer, -/// starting from a randomly selected worker. +/// starting from a randomly selected active worker. pub(super) struct ShuffledStealers<'a> { stealers: &'a [Stealer], // A bit-rotated bit field of the remaining candidate workers to steal from. @@ -236,180 +333,6 @@ impl<'a> Iterator for ShuffledStealers<'a> { } } -/// Registry of active/idle worker threads. -/// -/// The registry only supports up to `usize::BITS` threads. -#[derive(Debug)] -struct PoolRegistry { - active_workers: AtomicUsize, - pool_size: usize, - #[cfg(feature = "dev-logs")] - record: Record, -} -impl PoolRegistry { - /// Creates a new pool registry. - /// - /// #Panic - /// - /// This will panic if the specified pool size is zero or is more than - /// `usize::BITS`. - fn new(pool_size: usize) -> Self { - assert!( - pool_size >= 1, - "the executor pool size should be at least one" - ); - assert!( - pool_size <= usize::BITS as usize, - "the executor pool size should be at most {}", - usize::BITS - ); - - Self { - active_workers: AtomicUsize::new(0), - pool_size, - #[cfg(feature = "dev-logs")] - record: Record::new(pool_size), - } - } - - /// Returns whether the pool is idle, i.e. whether there are no active - /// workers. - /// - /// It is guaranteed that if `false` is returned, then all operations - /// performed by the now-inactive workers are visible. - fn is_idle(&self) -> bool { - // Ordering: this Acquire operation synchronizes with all Release - // RMWs in the `set_inactive` method via a release sequence. - self.active_workers.load(Ordering::Acquire) == 0 - } - - /// Marks the specified worker as inactive, unless this is the last worker. - /// - /// The specified worker must currently be marked as active. This method - /// will always fail and return `false` if this was the last active worker, - /// because the worker is then expected to check again the global queue - /// before explicitly calling `set_all_inactive` to confirm that the pool is - /// indeed idle. - fn try_set_inactive(&self, worker_id: usize) -> bool { - // Ordering: this Release operation synchronizes with the Acquire fence - // in the below conditional if this is is the last active worker, and/or - // with the Acquire state load in the `pool_state` method. - let active_workers = self - .active_workers - .fetch_update(Ordering::Release, Ordering::Relaxed, |active_workers| { - if active_workers == (1 << worker_id) { - // It looks like this is the last worker, but the value - // could be stale so it is necessary to make sure of this by - // enforcing the CAS rather than returning `None`. - Some(active_workers) - } else { - Some(active_workers & !(1 << worker_id)) - } - }) - .unwrap(); - - assert_ne!(active_workers & (1 << worker_id), 0); - - if active_workers == (1 << worker_id) { - // This is the last worker so we need to ensures that after this - // call, all tasks pushed on the global queue before - // `set_one_active` was called unsuccessfully are visible. - // - // Ordering: this Acquire fence synchronizes with all Release RMWs - // in this and in the previous calls to `set_inactive` via a release - // sequence. - atomic::fence(Ordering::Acquire); - - false - } else { - true - } - } - - /// Marks all workers as inactive. - fn set_all_inactive(&self) { - // Ordering: this Release store synchronizes with the Acquire load in - // `is_idle`. - self.active_workers.store(0, Ordering::Release); - } - - /// Marks all workers as active. - fn set_all_active(&self) { - // Mark all workers as busy. - self.active_workers.store( - !0 >> (usize::BITS - self.pool_size as u32), - Ordering::Relaxed, - ); - } - - /// Marks a worker as active if any is found, otherwise do nothing. - /// - /// The worker ID is returned if successful. - fn set_one_active_relaxed(&self) -> Option { - let mut active_workers = self.active_workers.load(Ordering::Relaxed); - loop { - let first_idle_worker = active_workers.trailing_ones() as usize; - if first_idle_worker >= self.pool_size { - return None; - }; - active_workers = self - .active_workers - .fetch_or(1 << first_idle_worker, Ordering::Relaxed); - if active_workers & (1 << first_idle_worker) == 0 { - #[cfg(feature = "dev-logs")] - self.record.increment(first_idle_worker); - return Some(first_idle_worker); - } - } - } - - /// Marks a worker as active if any is found, otherwise ensure that all - /// memory operations made by the caller prior to this call are visible by - /// the last worker transitioning to idle state. - /// - /// The worker ID is returned if successful. - fn set_one_active(&self) -> Option { - let mut active_workers = self.active_workers.load(Ordering::Relaxed); - loop { - let first_idle_worker = active_workers.trailing_ones() as usize; - - if first_idle_worker >= self.pool_size { - // There is apparently no free worker, so a dummy RMW with - // Release ordering is performed with the sole purpose of - // synchronizing with the Acquire fence in `set_inactive` so - // that the last worker see the tasks that were queued prior to - // this call when calling (unsuccessfully) `set_inactive`.. - let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release); - if new_active_workers == active_workers { - return None; - } - active_workers = new_active_workers; - } else { - active_workers = self - .active_workers - .fetch_or(1 << first_idle_worker, Ordering::Relaxed); - if active_workers & (1 << first_idle_worker) == 0 { - #[cfg(feature = "dev-logs")] - self.record.increment(first_idle_worker); - return Some(first_idle_worker); - } - } - } - } - - /// Returns a bit field that indicates all active workers. - fn get_active(&self) -> usize { - self.active_workers.load(Ordering::Relaxed) - } -} - -#[cfg(feature = "dev-logs")] -impl Drop for PoolRegistry { - fn drop(&mut self) { - println!("Thread launch count: {:?}", self.record.get()); - } -} - #[cfg(feature = "dev-logs")] #[derive(Debug)] struct Record { diff --git a/asynchronix/src/runtime/executor/rng.rs b/asynchronix/src/runtime/executor/rng.rs index 6a3604d..22f54b9 100644 --- a/asynchronix/src/runtime/executor/rng.rs +++ b/asynchronix/src/runtime/executor/rng.rs @@ -2,7 +2,7 @@ use std::cell::Cell; /// A pseudo-random number generator based on Wang Yi's Wyrand. /// -/// See: https://github.com/wangyi-fudan/wyhash +/// See: #[derive(Clone, Debug)] pub(crate) struct Rng { seed: Cell, diff --git a/asynchronix/src/runtime/executor/worker.rs b/asynchronix/src/runtime/executor/worker.rs index c45dc29..6f13b87 100644 --- a/asynchronix/src/runtime/executor/worker.rs +++ b/asynchronix/src/runtime/executor/worker.rs @@ -3,23 +3,23 @@ use std::sync::Arc; use super::task::Runnable; -use super::pool::Pool; +use super::ExecutorContext; use super::LocalQueue; /// A local worker with access to global executor resources. pub(crate) struct Worker { pub(super) local_queue: LocalQueue, pub(super) fast_slot: Cell>, - pub(super) pool: Arc, + pub(super) executor_context: Arc, } impl Worker { /// Creates a new worker. - pub(super) fn new(local_queue: LocalQueue, pool: Arc) -> Self { + pub(super) fn new(local_queue: LocalQueue, executor_context: Arc) -> Self { Self { local_queue, fast_slot: Cell::new(None), - pool, + executor_context: executor_context, } } }