1
0
forked from ROMEO/nexosim

Deactivate worker only after global queue re-check

This commit is contained in:
Serge Barral 2022-10-16 12:16:01 +02:00
parent 77e992da4a
commit b009f4481e
2 changed files with 111 additions and 102 deletions

View File

@ -19,7 +19,7 @@ mod worker;
#[cfg(all(test, not(asynchronix_loom)))] #[cfg(all(test, not(asynchronix_loom)))]
mod tests; mod tests;
use self::pool::{Pool, PoolState}; use self::pool::Pool;
use self::rng::Rng; use self::rng::Rng;
use self::task::{CancelToken, Promise, Runnable}; use self::task::{CancelToken, Promise, Runnable};
use self::worker::Worker; use self::worker::Worker;
@ -41,8 +41,8 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
/// fairness as a goal in itself. While it does use fair local queues inasmuch /// fairness as a goal in itself. While it does use fair local queues inasmuch
/// as these tend to perform better in message-passing applications, it uses an /// as these tend to perform better in message-passing applications, it uses an
/// unfair injection queue and a LIFO slot without attempt to mitigate the /// unfair injection queue and a LIFO slot without attempt to mitigate the
/// effect of badly behaving code (e.g. futures that use spin-locks and hope for /// effect of badly behaving code (e.g. futures that spin-lock by yielding to
/// the best by yielding to the executor with something like tokio's /// the executor; there is for this reason no support for something like tokio's
/// `yield_now`). /// `yield_now`).
/// ///
/// Another way in which it differs from other `async` executors is that it /// Another way in which it differs from other `async` executors is that it
@ -50,8 +50,7 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
/// discrete-time simulator, the simulation of a system at a given time step /// discrete-time simulator, the simulation of a system at a given time step
/// will make as much progress as possible until it technically reaches a /// will make as much progress as possible until it technically reaches a
/// deadlock. Only then does the simulator advance the simulated time until the /// deadlock. Only then does the simulator advance the simulated time until the
/// next "event" extracted from a time-sorted priority queue, sending it to /// next "event" extracted from a time-sorted priority queue.
/// enable further progress in the computation.
/// ///
/// The design of the executor is largely influenced by the tokio and go /// The design of the executor is largely influenced by the tokio and go
/// schedulers, both of which are optimized for message-passing applications. In /// schedulers, both of which are optimized for message-passing applications. In
@ -62,15 +61,14 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
/// simple by taking advantage of the fact that the injector is not required to /// simple by taking advantage of the fact that the injector is not required to
/// be either LIFO or FIFO. /// be either LIFO or FIFO.
/// ///
/// Probably the largest difference with tokio is the task system, which boasts /// Probably the largest difference with tokio is the task system, which
/// a higher throughput achieved by reducing the need for synchronization. /// achieves a higher throughput by reducing the need for synchronization.
/// Another difference is that, at the moment, the complete subset of active /// Another difference is that, at the moment, the complete subset of active
/// worker threads is stored in a single atomic variable. This makes it in /// worker threads is stored in a single atomic variable. This makes it possible
/// particular possible to rapidly identify free worker threads for stealing /// to rapidly identify free worker threads for stealing operations. The
/// operations. The downside of this approach is that the maximum number of /// downside of this approach is that the maximum number of worker threads is
/// worker threads is limited to `usize::BITS`, but this is unlikely to /// limited to `usize::BITS`, but this is unlikely to constitute a limitation
/// constitute a limitation since system simulation is not typically an /// since system simulation is not typically embarrassingly parallel.
/// embarrassingly parallel problem.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Executor { pub(crate) struct Executor {
pool: Arc<Pool>, pool: Arc<Pool>,
@ -139,7 +137,7 @@ impl Executor {
// Wait until all workers are blocked on the signal barrier. // Wait until all workers are blocked on the signal barrier.
parker.park(); parker.park();
assert!(pool.is_idle()); assert!(pool.is_pool_idle());
Self { Self {
pool, pool,
@ -208,7 +206,7 @@ impl Executor {
if let Some(worker_panic) = self.pool.take_panic() { if let Some(worker_panic) = self.pool.take_panic() {
panic::resume_unwind(worker_panic); panic::resume_unwind(worker_panic);
} }
if self.pool.is_idle() { if self.pool.is_pool_idle() {
return; return;
} }
@ -358,26 +356,25 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
loop { loop {
// Signal barrier: park until notified to continue or terminate. // Signal barrier: park until notified to continue or terminate.
if worker.pool.set_worker_inactive(id) == PoolState::Idle { if worker.pool.try_set_worker_inactive(id) {
// If this worker was the last active worker, it is necessary to parker.park();
// check again whether the global queue is not populated. This
// could happen if the executor thread pushed a task to the
// global queue but could not activate a new worker because all
// workers were then activated.
if !worker.pool.global_queue.is_empty() {
worker.pool.set_worker_active(id);
} else { } else {
// This worker is the last active worker so it is necessary to
// check if the global queue is populated. This could happen if
// the executor thread pushed a task to the global queue but
// could not activate a new worker because all workers were then
// activated.
if worker.pool.global_queue.is_empty() {
worker.pool.set_all_workers_inactive();
worker.pool.executor_unparker.unpark(); worker.pool.executor_unparker.unpark();
parker.park(); parker.park();
} }
} else {
parker.park();
} }
if worker.pool.termination_is_triggered() { if worker.pool.termination_is_triggered() {
return; return;
} }
// We may spin for a little while: start counting.
let mut search_start = Instant::now(); let mut search_start = Instant::now();
// Process the tasks one by one. // Process the tasks one by one.
@ -388,17 +385,17 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// There is a _very_ remote possibility that, even though // There is a _very_ remote possibility that, even though
// the local queue is empty, it has temporarily too little // the local queue is empty, it has temporarily too little
// spare capacity for the bucket. This could happen because // spare capacity for the bucket. This could happen if a
// a concurrent steal operation could be preempted for all // concurrent steal operation was preempted for all the time
// the time it took to pop and process the remaining tasks // it took to pop and process the remaining tasks and it
// and hasn't released the stolen capacity yet. // hasn't released the stolen capacity yet.
// //
// Unfortunately, we cannot just skip checking the global // Unfortunately, we cannot just skip checking the global
// queue altogether when there isn't enough spare capacity // queue altogether when there isn't enough spare capacity
// in the local queue, as this could lead to a race: suppose // in the local queue because this could lead to a race:
// that (1) this thread has earlier pushed tasks onto the // suppose that (1) this thread has earlier pushed tasks
// global queue, and (2) the stealer has processed all // onto the global queue, and (2) the stealer has processed
// stolen tasks before this thread sees the capacity // all stolen tasks before this thread sees the capacity
// restored and at the same time (3) the stealer does not // restored and at the same time (3) the stealer does not
// yet see the tasks this thread pushed to the global queue; // yet see the tasks this thread pushed to the global queue;
// in such scenario, both this thread and the stealer thread // in such scenario, both this thread and the stealer thread

View File

@ -7,12 +7,13 @@ use super::injector::Injector;
use super::rng; use super::rng;
use super::{GlobalQueue, Stealer}; use super::{GlobalQueue, Stealer};
/// A view of the thread pool shared between the executor and all workers.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Pool { pub(crate) struct Pool {
pub(crate) global_queue: GlobalQueue, pub(crate) global_queue: GlobalQueue,
pub(crate) executor_id: usize, pub(crate) executor_id: usize,
pub(crate) executor_unparker: parking::Unparker, pub(crate) executor_unparker: parking::Unparker,
state: PoolRegistry, registry: PoolRegistry,
stealers: Box<[Stealer]>, stealers: Box<[Stealer]>,
worker_unparkers: Box<[parking::Unparker]>, worker_unparkers: Box<[parking::Unparker]>,
searching_workers: AtomicUsize, searching_workers: AtomicUsize,
@ -34,7 +35,7 @@ impl Pool {
global_queue: Injector::new(), global_queue: Injector::new(),
executor_id, executor_id,
executor_unparker, executor_unparker,
state: PoolRegistry::new(worker_unparkers.len()), registry: PoolRegistry::new(worker_unparkers.len()),
stealers: stealers.into_boxed_slice(), stealers: stealers.into_boxed_slice(),
worker_unparkers, worker_unparkers,
searching_workers: AtomicUsize::new(0), searching_workers: AtomicUsize::new(0),
@ -47,45 +48,48 @@ impl Pool {
/// ///
/// Unparking the worker threads is the responsibility of the caller. /// Unparking the worker threads is the responsibility of the caller.
pub(crate) fn set_all_workers_active(&self) { pub(crate) fn set_all_workers_active(&self) {
self.state.set_all_active(); self.registry.set_all_active();
} }
/// Marks the specified worker as active. /// Marks all pool workers as inactive.
/// ///
/// Unparking the worker thread is the responsibility of the caller. /// Unparking the executor threads is the responsibility of the caller.
pub(crate) fn set_worker_active(&self, worker_id: usize) { pub(crate) fn set_all_workers_inactive(&self) {
self.state.set_active(worker_id); self.registry.set_all_inactive();
} }
/// Marks the specified worker as idle. /// Marks the specified worker as inactive unless it is the last active
/// worker.
/// ///
/// Parking the worker thread is the responsibility of the caller. /// Parking the worker thread is the responsibility of the caller.
/// ///
/// If this was the last active worker, the main executor thread is /// If this was the last active worker, `false` is returned and it is
/// unparked. /// guaranteed that all memory operations performed by threads that called
pub(crate) fn set_worker_inactive(&self, worker_id: usize) -> PoolState { /// `activate_worker` will be visible.
self.state.set_inactive(worker_id) pub(crate) fn try_set_worker_inactive(&self, worker_id: usize) -> bool {
self.registry.try_set_inactive(worker_id)
} }
/// Unparks an idle worker if any is found, or do nothing otherwise. /// Unparks an idle worker if any is found and mark it as active, or do
/// nothing otherwise.
/// ///
/// For performance reasons, no synchronization is established if no worker /// For performance reasons, no synchronization is established if no worker
/// is found, meaning that workers in other threads may later transition to /// is found, meaning that workers in other threads may later transition to
/// idle state without observing the tasks scheduled by the caller to this /// idle state without observing the tasks scheduled by this caller. If this
/// method. If this is not tolerable (for instance if this method is called /// is not tolerable (for instance if this method is called from a
/// from a non-worker thread), use the more expensive `activate_worker`. /// non-worker thread), use the more expensive `activate_worker`.
pub(crate) fn activate_worker_relaxed(&self) { pub(crate) fn activate_worker_relaxed(&self) {
if let Some(worker_id) = self.state.set_one_active_relaxed() { if let Some(worker_id) = self.registry.set_one_active_relaxed() {
self.searching_workers.fetch_add(1, Ordering::Relaxed); self.searching_workers.fetch_add(1, Ordering::Relaxed);
self.worker_unparkers[worker_id].unpark(); self.worker_unparkers[worker_id].unpark();
} }
} }
/// Unparks an idle worker if any is found, or ensure that at least the last /// Unparks an idle worker if any is found and mark it as active, or ensure
/// worker to transition to idle state will observe all tasks previously /// that at least the last active worker will observe all memory operations
/// scheduled by the caller to this method. /// performed before this call when calling `try_set_worker_inactive`.
pub(crate) fn activate_worker(&self) { pub(crate) fn activate_worker(&self) {
if let Some(worker_id) = self.state.set_one_active() { if let Some(worker_id) = self.registry.set_one_active() {
self.searching_workers.fetch_add(1, Ordering::Relaxed); self.searching_workers.fetch_add(1, Ordering::Relaxed);
self.worker_unparkers[worker_id].unpark(); self.worker_unparkers[worker_id].unpark();
} }
@ -95,8 +99,8 @@ impl Pool {
/// ///
/// If `true` is returned, it is guaranteed that all operations performed by /// If `true` is returned, it is guaranteed that all operations performed by
/// the now-inactive workers become visible in this thread. /// the now-inactive workers become visible in this thread.
pub(crate) fn is_idle(&self) -> bool { pub(crate) fn is_pool_idle(&self) -> bool {
self.state.pool_state() == PoolState::Idle self.registry.is_idle()
} }
/// Increments the count of workers actively searching for tasks. /// Increments the count of workers actively searching for tasks.
@ -119,7 +123,7 @@ impl Pool {
pub(crate) fn trigger_termination(&self) { pub(crate) fn trigger_termination(&self) {
self.terminate_signal.store(true, Ordering::Relaxed); self.terminate_signal.store(true, Ordering::Relaxed);
self.state.set_all_active(); self.registry.set_all_active();
for unparker in &*self.worker_unparkers { for unparker in &*self.worker_unparkers {
unparker.unpark(); unparker.unpark();
} }
@ -158,7 +162,7 @@ impl Pool {
rng: &'_ rng::Rng, rng: &'_ rng::Rng,
) -> ShuffledStealers<'a> { ) -> ShuffledStealers<'a> {
// All active workers except the specified one are candidate for stealing. // All active workers except the specified one are candidate for stealing.
let mut candidates = self.state.get_active(); let mut candidates = self.registry.get_active();
if let Some(excluded_worker_id) = excluded_worker_id { if let Some(excluded_worker_id) = excluded_worker_id {
candidates &= !(1 << excluded_worker_id); candidates &= !(1 << excluded_worker_id);
} }
@ -167,6 +171,8 @@ impl Pool {
} }
} }
/// An iterator over active workers that yields their associated stealer,
/// starting from a randomly selected worker.
pub(crate) struct ShuffledStealers<'a> { pub(crate) struct ShuffledStealers<'a> {
stealers: &'a [Stealer], stealers: &'a [Stealer],
// A bit-rotated bit field of the remaining candidate workers to steal from. // A bit-rotated bit field of the remaining candidate workers to steal from.
@ -186,8 +192,7 @@ impl<'a> ShuffledStealers<'a> {
// Right-rotate the candidates so that the bit corresponding to the // Right-rotate the candidates so that the bit corresponding to the
// randomly selected worker becomes the LSB. // randomly selected worker becomes the LSB.
let candidate_count = stealers.len(); let candidate_count = stealers.len();
let lower_mask = (1 << next_candidate) - 1; let lower_bits = candidates & ((1 << next_candidate) - 1);
let lower_bits = candidates & lower_mask;
let candidates = let candidates =
(candidates >> next_candidate) | (lower_bits << (candidate_count - next_candidate)); (candidates >> next_candidate) | (lower_bits << (candidate_count - next_candidate));
@ -266,53 +271,66 @@ impl PoolRegistry {
record: Record::new(pool_size), record: Record::new(pool_size),
} }
} }
/// Returns the state of the pool.
/// Returns whether the pool is idle, i.e. whether there are no active
/// workers.
/// ///
/// This operation has Acquire semantic, which guarantees that if the pool /// It is guaranteed that if `false` is returned, then all operations
/// state returned is `PoolState::Idle`, then all operations performed by /// performed by the now-inactive workers are visible.
/// the now-inactive workers are visible. fn is_idle(&self) -> bool {
fn pool_state(&self) -> PoolState {
// Ordering: this Acquire operation synchronizes with all Release // Ordering: this Acquire operation synchronizes with all Release
// RMWs in the `set_inactive` method via a release sequence. // RMWs in the `set_inactive` method via a release sequence.
let active_workers = self.active_workers.load(Ordering::Acquire); self.active_workers.load(Ordering::Acquire) == 0
if active_workers == 0 {
PoolState::Idle
} else {
PoolState::Busy
}
} }
/// Marks the specified worker as inactive. /// Marks the specified worker as inactive, unless this is the last worker.
/// ///
/// The specified worker must currently be marked as active. Returns /// The specified worker must currently be marked as active. This method
/// `PoolState::Idle` if this was the last active thread. /// will always fail and return `false` if this was the last active worker,
/// /// because the worker is then expected to check again the global queue
/// If this is the last active worker (i.e. `PoolState::Idle` is returned), /// before explicitly calling `set_all_inactive` to confirm that the pool is
/// then it is guaranteed that all operations performed by the now-inactive /// indeed idle.
/// workers and by unsuccessful callers to `set_one_active` are now visible. fn try_set_inactive(&self, worker_id: usize) -> bool {
fn set_inactive(&self, worker_id: usize) -> PoolState { // Ordering: this Release operation synchronizes with the Acquire fence
// Ordering: this Release operation synchronizes with the Acquire // in the below conditional if this is is the last active worker, and/or
// fence in the below conditional when the pool becomes idle, and/or
// with the Acquire state load in the `pool_state` method. // with the Acquire state load in the `pool_state` method.
let active_workers = self let active_workers = self
.active_workers .active_workers
.fetch_and(!(1 << worker_id), Ordering::Release); .fetch_update(Ordering::Release, Ordering::Relaxed, |active_workers| {
if active_workers == (1 << worker_id) {
if active_workers & !(1 << worker_id) == 0 { // It looks like this is the last worker, but the value
// Ordering: this Acquire fence synchronizes with all Release // could be stale so it is necessary to make sure of this by
// RMWs in this and in the previous calls to `set_inactive` via a // enforcing the CAS rather than returning `None`.
// release sequence. Some(active_workers)
atomic::fence(Ordering::Acquire);
PoolState::Idle
} else { } else {
PoolState::Busy Some(active_workers & !(1 << worker_id))
}
})
.unwrap();
assert_ne!(active_workers & (1 << worker_id), 0);
if active_workers == (1 << worker_id) {
// This is the last worker so we need to ensures that after this
// call, all tasks pushed on the global queue before
// `set_one_active` was called unsuccessfully are visible.
//
// Ordering: this Acquire fence synchronizes with all Release RMWs
// in this and in the previous calls to `set_inactive` via a release
// sequence.
atomic::fence(Ordering::Acquire);
false
} else {
true
} }
} }
/// Marks the specified worker as active. /// Marks all workers as inactive.
fn set_active(&self, worker_id: usize) { fn set_all_inactive(&self) {
self.active_workers // Ordering: this Release store synchronizes with the Acquire load in
.fetch_or(1 << worker_id, Ordering::Relaxed); // `is_idle`.
self.active_workers.store(0, Ordering::Release);
} }
/// Marks all workers as active. /// Marks all workers as active.
@ -359,8 +377,8 @@ impl PoolRegistry {
// There is apparently no free worker, so a dummy RMW with // There is apparently no free worker, so a dummy RMW with
// Release ordering is performed with the sole purpose of // Release ordering is performed with the sole purpose of
// synchronizing with the Acquire fence in `set_inactive` so // synchronizing with the Acquire fence in `set_inactive` so
// that the last worker to transition to idle can see the tasks // that the last worker see the tasks that were queued prior to
// that were queued prior to this call. // this call when calling (unsuccessfully) `set_inactive`..
let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release); let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release);
if new_active_workers == active_workers { if new_active_workers == active_workers {
return None; return None;
@ -385,12 +403,6 @@ impl PoolRegistry {
} }
} }
#[derive(PartialEq)]
pub(crate) enum PoolState {
Idle,
Busy,
}
#[cfg(feature = "dev-logs")] #[cfg(feature = "dev-logs")]
impl Drop for PoolRegistry { impl Drop for PoolRegistry {
fn drop(&mut self) { fn drop(&mut self) {