From b009f4481eec102889603a0c5b123b3238ea7d0d Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sun, 16 Oct 2022 12:16:01 +0200 Subject: [PATCH] Deactivate worker only after global queue re-check --- asynchronix/src/runtime/executor.rs | 67 +++++------ asynchronix/src/runtime/executor/pool.rs | 146 ++++++++++++----------- 2 files changed, 111 insertions(+), 102 deletions(-) diff --git a/asynchronix/src/runtime/executor.rs b/asynchronix/src/runtime/executor.rs index 08b4385..6d4d5da 100644 --- a/asynchronix/src/runtime/executor.rs +++ b/asynchronix/src/runtime/executor.rs @@ -19,7 +19,7 @@ mod worker; #[cfg(all(test, not(asynchronix_loom)))] mod tests; -use self::pool::{Pool, PoolState}; +use self::pool::Pool; use self::rng::Rng; use self::task::{CancelToken, Promise, Runnable}; use self::worker::Worker; @@ -41,8 +41,8 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); /// fairness as a goal in itself. While it does use fair local queues inasmuch /// as these tend to perform better in message-passing applications, it uses an /// unfair injection queue and a LIFO slot without attempt to mitigate the -/// effect of badly behaving code (e.g. futures that use spin-locks and hope for -/// the best by yielding to the executor with something like tokio's +/// effect of badly behaving code (e.g. futures that spin-lock by yielding to +/// the executor; there is for this reason no support for something like tokio's /// `yield_now`). /// /// Another way in which it differs from other `async` executors is that it @@ -50,8 +50,7 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); /// discrete-time simulator, the simulation of a system at a given time step /// will make as much progress as possible until it technically reaches a /// deadlock. Only then does the simulator advance the simulated time until the -/// next "event" extracted from a time-sorted priority queue, sending it to -/// enable further progress in the computation. +/// next "event" extracted from a time-sorted priority queue. /// /// The design of the executor is largely influenced by the tokio and go /// schedulers, both of which are optimized for message-passing applications. In @@ -62,15 +61,14 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); /// simple by taking advantage of the fact that the injector is not required to /// be either LIFO or FIFO. /// -/// Probably the largest difference with tokio is the task system, which boasts -/// a higher throughput achieved by reducing the need for synchronization. +/// Probably the largest difference with tokio is the task system, which +/// achieves a higher throughput by reducing the need for synchronization. /// Another difference is that, at the moment, the complete subset of active -/// worker threads is stored in a single atomic variable. This makes it in -/// particular possible to rapidly identify free worker threads for stealing -/// operations. The downside of this approach is that the maximum number of -/// worker threads is limited to `usize::BITS`, but this is unlikely to -/// constitute a limitation since system simulation is not typically an -/// embarrassingly parallel problem. +/// worker threads is stored in a single atomic variable. This makes it possible +/// to rapidly identify free worker threads for stealing operations. The +/// downside of this approach is that the maximum number of worker threads is +/// limited to `usize::BITS`, but this is unlikely to constitute a limitation +/// since system simulation is not typically embarrassingly parallel. #[derive(Debug)] pub(crate) struct Executor { pool: Arc, @@ -139,7 +137,7 @@ impl Executor { // Wait until all workers are blocked on the signal barrier. parker.park(); - assert!(pool.is_idle()); + assert!(pool.is_pool_idle()); Self { pool, @@ -208,7 +206,7 @@ impl Executor { if let Some(worker_panic) = self.pool.take_panic() { panic::resume_unwind(worker_panic); } - if self.pool.is_idle() { + if self.pool.is_pool_idle() { return; } @@ -358,26 +356,25 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { loop { // Signal barrier: park until notified to continue or terminate. - if worker.pool.set_worker_inactive(id) == PoolState::Idle { - // If this worker was the last active worker, it is necessary to - // check again whether the global queue is not populated. This - // could happen if the executor thread pushed a task to the - // global queue but could not activate a new worker because all - // workers were then activated. - if !worker.pool.global_queue.is_empty() { - worker.pool.set_worker_active(id); - } else { + if worker.pool.try_set_worker_inactive(id) { + parker.park(); + } else { + // This worker is the last active worker so it is necessary to + // check if the global queue is populated. This could happen if + // the executor thread pushed a task to the global queue but + // could not activate a new worker because all workers were then + // activated. + if worker.pool.global_queue.is_empty() { + worker.pool.set_all_workers_inactive(); worker.pool.executor_unparker.unpark(); parker.park(); } - } else { - parker.park(); } + if worker.pool.termination_is_triggered() { return; } - // We may spin for a little while: start counting. let mut search_start = Instant::now(); // Process the tasks one by one. @@ -388,17 +385,17 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // There is a _very_ remote possibility that, even though // the local queue is empty, it has temporarily too little - // spare capacity for the bucket. This could happen because - // a concurrent steal operation could be preempted for all - // the time it took to pop and process the remaining tasks - // and hasn't released the stolen capacity yet. + // spare capacity for the bucket. This could happen if a + // concurrent steal operation was preempted for all the time + // it took to pop and process the remaining tasks and it + // hasn't released the stolen capacity yet. // // Unfortunately, we cannot just skip checking the global // queue altogether when there isn't enough spare capacity - // in the local queue, as this could lead to a race: suppose - // that (1) this thread has earlier pushed tasks onto the - // global queue, and (2) the stealer has processed all - // stolen tasks before this thread sees the capacity + // in the local queue because this could lead to a race: + // suppose that (1) this thread has earlier pushed tasks + // onto the global queue, and (2) the stealer has processed + // all stolen tasks before this thread sees the capacity // restored and at the same time (3) the stealer does not // yet see the tasks this thread pushed to the global queue; // in such scenario, both this thread and the stealer thread diff --git a/asynchronix/src/runtime/executor/pool.rs b/asynchronix/src/runtime/executor/pool.rs index 34dfb91..1b076d8 100644 --- a/asynchronix/src/runtime/executor/pool.rs +++ b/asynchronix/src/runtime/executor/pool.rs @@ -7,12 +7,13 @@ use super::injector::Injector; use super::rng; use super::{GlobalQueue, Stealer}; +/// A view of the thread pool shared between the executor and all workers. #[derive(Debug)] pub(crate) struct Pool { pub(crate) global_queue: GlobalQueue, pub(crate) executor_id: usize, pub(crate) executor_unparker: parking::Unparker, - state: PoolRegistry, + registry: PoolRegistry, stealers: Box<[Stealer]>, worker_unparkers: Box<[parking::Unparker]>, searching_workers: AtomicUsize, @@ -34,7 +35,7 @@ impl Pool { global_queue: Injector::new(), executor_id, executor_unparker, - state: PoolRegistry::new(worker_unparkers.len()), + registry: PoolRegistry::new(worker_unparkers.len()), stealers: stealers.into_boxed_slice(), worker_unparkers, searching_workers: AtomicUsize::new(0), @@ -47,45 +48,48 @@ impl Pool { /// /// Unparking the worker threads is the responsibility of the caller. pub(crate) fn set_all_workers_active(&self) { - self.state.set_all_active(); + self.registry.set_all_active(); } - /// Marks the specified worker as active. + /// Marks all pool workers as inactive. /// - /// Unparking the worker thread is the responsibility of the caller. - pub(crate) fn set_worker_active(&self, worker_id: usize) { - self.state.set_active(worker_id); + /// Unparking the executor threads is the responsibility of the caller. + pub(crate) fn set_all_workers_inactive(&self) { + self.registry.set_all_inactive(); } - /// Marks the specified worker as idle. + /// Marks the specified worker as inactive unless it is the last active + /// worker. /// /// Parking the worker thread is the responsibility of the caller. /// - /// If this was the last active worker, the main executor thread is - /// unparked. - pub(crate) fn set_worker_inactive(&self, worker_id: usize) -> PoolState { - self.state.set_inactive(worker_id) + /// If this was the last active worker, `false` is returned and it is + /// guaranteed that all memory operations performed by threads that called + /// `activate_worker` will be visible. + pub(crate) fn try_set_worker_inactive(&self, worker_id: usize) -> bool { + self.registry.try_set_inactive(worker_id) } - /// Unparks an idle worker if any is found, or do nothing otherwise. + /// Unparks an idle worker if any is found and mark it as active, or do + /// nothing otherwise. /// /// For performance reasons, no synchronization is established if no worker /// is found, meaning that workers in other threads may later transition to - /// idle state without observing the tasks scheduled by the caller to this - /// method. If this is not tolerable (for instance if this method is called - /// from a non-worker thread), use the more expensive `activate_worker`. + /// idle state without observing the tasks scheduled by this caller. If this + /// is not tolerable (for instance if this method is called from a + /// non-worker thread), use the more expensive `activate_worker`. pub(crate) fn activate_worker_relaxed(&self) { - if let Some(worker_id) = self.state.set_one_active_relaxed() { + if let Some(worker_id) = self.registry.set_one_active_relaxed() { self.searching_workers.fetch_add(1, Ordering::Relaxed); self.worker_unparkers[worker_id].unpark(); } } - /// Unparks an idle worker if any is found, or ensure that at least the last - /// worker to transition to idle state will observe all tasks previously - /// scheduled by the caller to this method. + /// Unparks an idle worker if any is found and mark it as active, or ensure + /// that at least the last active worker will observe all memory operations + /// performed before this call when calling `try_set_worker_inactive`. pub(crate) fn activate_worker(&self) { - if let Some(worker_id) = self.state.set_one_active() { + if let Some(worker_id) = self.registry.set_one_active() { self.searching_workers.fetch_add(1, Ordering::Relaxed); self.worker_unparkers[worker_id].unpark(); } @@ -95,8 +99,8 @@ impl Pool { /// /// If `true` is returned, it is guaranteed that all operations performed by /// the now-inactive workers become visible in this thread. - pub(crate) fn is_idle(&self) -> bool { - self.state.pool_state() == PoolState::Idle + pub(crate) fn is_pool_idle(&self) -> bool { + self.registry.is_idle() } /// Increments the count of workers actively searching for tasks. @@ -119,7 +123,7 @@ impl Pool { pub(crate) fn trigger_termination(&self) { self.terminate_signal.store(true, Ordering::Relaxed); - self.state.set_all_active(); + self.registry.set_all_active(); for unparker in &*self.worker_unparkers { unparker.unpark(); } @@ -158,7 +162,7 @@ impl Pool { rng: &'_ rng::Rng, ) -> ShuffledStealers<'a> { // All active workers except the specified one are candidate for stealing. - let mut candidates = self.state.get_active(); + let mut candidates = self.registry.get_active(); if let Some(excluded_worker_id) = excluded_worker_id { candidates &= !(1 << excluded_worker_id); } @@ -167,6 +171,8 @@ impl Pool { } } +/// An iterator over active workers that yields their associated stealer, +/// starting from a randomly selected worker. pub(crate) struct ShuffledStealers<'a> { stealers: &'a [Stealer], // A bit-rotated bit field of the remaining candidate workers to steal from. @@ -186,8 +192,7 @@ impl<'a> ShuffledStealers<'a> { // Right-rotate the candidates so that the bit corresponding to the // randomly selected worker becomes the LSB. let candidate_count = stealers.len(); - let lower_mask = (1 << next_candidate) - 1; - let lower_bits = candidates & lower_mask; + let lower_bits = candidates & ((1 << next_candidate) - 1); let candidates = (candidates >> next_candidate) | (lower_bits << (candidate_count - next_candidate)); @@ -266,53 +271,66 @@ impl PoolRegistry { record: Record::new(pool_size), } } - /// Returns the state of the pool. + + /// Returns whether the pool is idle, i.e. whether there are no active + /// workers. /// - /// This operation has Acquire semantic, which guarantees that if the pool - /// state returned is `PoolState::Idle`, then all operations performed by - /// the now-inactive workers are visible. - fn pool_state(&self) -> PoolState { + /// It is guaranteed that if `false` is returned, then all operations + /// performed by the now-inactive workers are visible. + fn is_idle(&self) -> bool { // Ordering: this Acquire operation synchronizes with all Release // RMWs in the `set_inactive` method via a release sequence. - let active_workers = self.active_workers.load(Ordering::Acquire); - if active_workers == 0 { - PoolState::Idle - } else { - PoolState::Busy - } + self.active_workers.load(Ordering::Acquire) == 0 } - /// Marks the specified worker as inactive. + /// Marks the specified worker as inactive, unless this is the last worker. /// - /// The specified worker must currently be marked as active. Returns - /// `PoolState::Idle` if this was the last active thread. - /// - /// If this is the last active worker (i.e. `PoolState::Idle` is returned), - /// then it is guaranteed that all operations performed by the now-inactive - /// workers and by unsuccessful callers to `set_one_active` are now visible. - fn set_inactive(&self, worker_id: usize) -> PoolState { - // Ordering: this Release operation synchronizes with the Acquire - // fence in the below conditional when the pool becomes idle, and/or + /// The specified worker must currently be marked as active. This method + /// will always fail and return `false` if this was the last active worker, + /// because the worker is then expected to check again the global queue + /// before explicitly calling `set_all_inactive` to confirm that the pool is + /// indeed idle. + fn try_set_inactive(&self, worker_id: usize) -> bool { + // Ordering: this Release operation synchronizes with the Acquire fence + // in the below conditional if this is is the last active worker, and/or // with the Acquire state load in the `pool_state` method. let active_workers = self .active_workers - .fetch_and(!(1 << worker_id), Ordering::Release); + .fetch_update(Ordering::Release, Ordering::Relaxed, |active_workers| { + if active_workers == (1 << worker_id) { + // It looks like this is the last worker, but the value + // could be stale so it is necessary to make sure of this by + // enforcing the CAS rather than returning `None`. + Some(active_workers) + } else { + Some(active_workers & !(1 << worker_id)) + } + }) + .unwrap(); - if active_workers & !(1 << worker_id) == 0 { - // Ordering: this Acquire fence synchronizes with all Release - // RMWs in this and in the previous calls to `set_inactive` via a - // release sequence. + assert_ne!(active_workers & (1 << worker_id), 0); + + if active_workers == (1 << worker_id) { + // This is the last worker so we need to ensures that after this + // call, all tasks pushed on the global queue before + // `set_one_active` was called unsuccessfully are visible. + // + // Ordering: this Acquire fence synchronizes with all Release RMWs + // in this and in the previous calls to `set_inactive` via a release + // sequence. atomic::fence(Ordering::Acquire); - PoolState::Idle + + false } else { - PoolState::Busy + true } } - /// Marks the specified worker as active. - fn set_active(&self, worker_id: usize) { - self.active_workers - .fetch_or(1 << worker_id, Ordering::Relaxed); + /// Marks all workers as inactive. + fn set_all_inactive(&self) { + // Ordering: this Release store synchronizes with the Acquire load in + // `is_idle`. + self.active_workers.store(0, Ordering::Release); } /// Marks all workers as active. @@ -359,8 +377,8 @@ impl PoolRegistry { // There is apparently no free worker, so a dummy RMW with // Release ordering is performed with the sole purpose of // synchronizing with the Acquire fence in `set_inactive` so - // that the last worker to transition to idle can see the tasks - // that were queued prior to this call. + // that the last worker see the tasks that were queued prior to + // this call when calling (unsuccessfully) `set_inactive`.. let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release); if new_active_workers == active_workers { return None; @@ -385,12 +403,6 @@ impl PoolRegistry { } } -#[derive(PartialEq)] -pub(crate) enum PoolState { - Idle, - Busy, -} - #[cfg(feature = "dev-logs")] impl Drop for PoolRegistry { fn drop(&mut self) {