From c3ca7fc0e1fa5c3a1f383f3abcafebed75f10f24 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Tue, 18 Oct 2022 11:36:07 +0200 Subject: [PATCH] Add comments + minor renaming --- asynchronix/src/runtime.rs | 2 +- asynchronix/src/runtime/executor.rs | 129 +++++++++++------- asynchronix/src/runtime/executor/injector.rs | 21 +-- .../src/runtime/executor/pool_manager.rs | 27 ++-- asynchronix/src/runtime/executor/worker.rs | 2 +- 5 files changed, 110 insertions(+), 71 deletions(-) diff --git a/asynchronix/src/runtime.rs b/asynchronix/src/runtime.rs index bb50d71..7fd93f8 100644 --- a/asynchronix/src/runtime.rs +++ b/asynchronix/src/runtime.rs @@ -1,3 +1,3 @@ -//! Executor and tasks. +//! The asynchronix executor and supporting runtime. pub(crate) mod executor; diff --git a/asynchronix/src/runtime/executor.rs b/asynchronix/src/runtime/executor.rs index da93a89..045b0af 100644 --- a/asynchronix/src/runtime/executor.rs +++ b/asynchronix/src/runtime/executor.rs @@ -1,3 +1,47 @@ +//! Multi-threaded `async` executor. +//! +//! The executor is exclusively designed for message-passing computational +//! tasks. As such, it does not include an I/O reactor and does not consider +//! fairness as a goal in itself. While it does use fair local queues inasmuch +//! as these tend to perform better in message-passing applications, it uses an +//! unfair injection queue and a LIFO slot without attempt to mitigate the +//! effect of badly behaving code (e.g. futures that spin-lock by yielding to +//! the executor; there is for this reason no support for something like tokio's +//! `yield_now`). +//! +//! Another way in which it differs from other `async` executors is that it +//! treats deadlocking as a normal occurrence. This is because in a +//! discrete-time simulator, the simulation of a system at a given time step +//! will make as much progress as possible until it technically reaches a +//! deadlock. Only then does the simulator advance the simulated time until the +//! next "event" extracted from a time-sorted priority queue. +//! +//! The design of the executor is largely influenced by the tokio and go +//! schedulers, both of which are optimized for message-passing applications. In +//! particular, it uses fast, fixed-size thread-local work-stealing queues with +//! a non-stealable LIFO slot in combination with an injector queue, which +//! injector queue is used both to schedule new tasks and to absorb temporary +//! overflow in the local queues. +//! +//! The design of the injector queue is kept very simple compared to tokio, by +//! taking advantage of the fact that the injector is not required to be either +//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast +//! because tasks are moved in batch and are stored contiguously in memory. +//! +//! Another difference with tokio is that, at the moment, the complete subset of +//! active worker threads is stored in a single atomic variable. This makes it +//! possible to rapidly identify free worker threads for stealing operations, +//! with the downside that the maximum number of worker threads is currently +//! limited to `usize::BITS`. This is unlikely to constitute a limitation in +//! practice though since system simulation is not typically embarrassingly +//! parallel. +//! +//! Probably the largest difference with tokio is the task system, which has +//! better throughput due to less need for synchronization. This mainly results +//! from the use of an atomic notification counter rather than an atomic +//! notification flag, thus alleviating the need to reset the notification flag +//! before polling a future. + use std::future::Future; use std::panic::{self, AssertUnwindSafe}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -35,54 +79,16 @@ scoped_thread_local!(static ACTIVE_TASKS: Mutex>); static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); /// A multi-threaded `async` executor. -/// -/// The executor is exclusively designed for message-passing computational -/// tasks. As such, it does not include an I/O reactor and does not consider -/// fairness as a goal in itself. While it does use fair local queues inasmuch -/// as these tend to perform better in message-passing applications, it uses an -/// unfair injection queue and a LIFO slot without attempt to mitigate the -/// effect of badly behaving code (e.g. futures that spin-lock by yielding to -/// the executor; there is for this reason no support for something like tokio's -/// `yield_now`). -/// -/// Another way in which it differs from other `async` executors is that it -/// treats deadlocking as a normal occurrence. This is because in a -/// discrete-time simulator, the simulation of a system at a given time step -/// will make as much progress as possible until it technically reaches a -/// deadlock. Only then does the simulator advance the simulated time until the -/// next "event" extracted from a time-sorted priority queue. -/// -/// The design of the executor is largely influenced by the tokio and go -/// schedulers, both of which are optimized for message-passing applications. In -/// particular, it uses fast, fixed-size thread-local work-stealing queues with -/// a non-stealable LIFO slot in combination with an injector queue, which -/// injector queue is used both to schedule new tasks and to absorb temporary -/// overflow in the local queues. -/// -/// The design of the injector queue is kept very simple compared to tokio, by -/// taking advantage of the fact that the injector is not required to be either -/// LIFO or FIFO. Moving tasks between a local queue and the injector is fast -/// because tasks are moved in batch and are stored contiguously in memory. -/// -/// Another difference with tokio is that, at the moment, the complete subset of -/// active worker threads is stored in a single atomic variable. This makes it -/// possible to rapidly identify free worker threads for stealing operations, -/// with the downside that the maximum number of worker threads is currently -/// limited to `usize::BITS`. This is unlikely to constitute a limitation in -/// practice though since system simulation is not typically embarrassingly -/// parallel. -/// -/// Probably the largest difference with tokio is the task system, which has -/// better throughput due to less need for synchronization. This mainly results -/// from the use of an atomic notification counter rather than an atomic -/// notification flag, thus alleviating the need to reset the notification flag -/// before polling a future. #[derive(Debug)] pub(crate) struct Executor { + /// Shared executor data. context: Arc, + /// List of tasks that have not completed yet. active_tasks: Arc>>, + /// Parker for the main executor thread. parker: parking::Parker, - join_handles: Vec>, + /// Join handles of the worker threads. + worker_handles: Vec>, } impl Executor { @@ -124,7 +130,7 @@ impl Executor { context.pool_manager.set_all_workers_active(); // Spawn all worker threads. - let join_handles: Vec<_> = local_data + let worker_handles: Vec<_> = local_data .into_iter() .enumerate() .into_iter() @@ -149,13 +155,13 @@ impl Executor { // Wait until all workers are blocked on the signal barrier. parker.park(); - assert!(context.pool_manager.is_pool_idle()); + assert!(context.pool_manager.pool_is_idle()); Self { context, active_tasks, parker, - join_handles, + worker_handles, } } @@ -218,7 +224,7 @@ impl Executor { if let Some(worker_panic) = self.context.pool_manager.take_panic() { panic::resume_unwind(worker_panic); } - if self.context.pool_manager.is_pool_idle() { + if self.context.pool_manager.pool_is_idle() { return; } @@ -231,8 +237,8 @@ impl Drop for Executor { fn drop(&mut self) { // Force all threads to return. self.context.pool_manager.trigger_termination(); - for join_handle in self.join_handles.drain(0..) { - join_handle.join().unwrap(); + for handle in self.worker_handles.drain(0..) { + handle.join().unwrap(); } // Drop all tasks that have not completed. @@ -267,11 +273,17 @@ impl Drop for Executor { } /// Shared executor context. +/// +/// This contains all executor resources that can be shared between threads. #[derive(Debug)] struct ExecutorContext { + /// Injector queue. injector: Injector, + /// Unique executor ID inherited by all tasks spawned on this executor instance. executor_id: usize, + /// Unparker for the main executor thread. executor_unparker: parking::Unparker, + /// Manager for all worker threads. pool_manager: PoolManager, } @@ -298,13 +310,15 @@ impl ExecutorContext { } } -// A `Future` wrapper that removes its cancellation token from the executor's -// list of active tasks when dropped. +/// A `Future` wrapper that removes its cancellation token from the list of +/// active tasks when dropped. struct CancellableFuture { inner: T, cancellation_key: usize, } + impl CancellableFuture { + /// Creates a new `CancellableFuture`. fn new(fut: T, cancellation_key: usize) -> Self { Self { inner: fut, @@ -312,6 +326,7 @@ impl CancellableFuture { } } } + impl Future for CancellableFuture { type Output = T::Output; @@ -323,6 +338,7 @@ impl Future for CancellableFuture { unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } } } + impl Drop for CancellableFuture { fn drop(&mut self) { // Remove the task from the list of active tasks if the future is @@ -341,7 +357,13 @@ impl Drop for CancellableFuture { } } -// Schedules a `Runnable`. +/// Schedules a `Runnable` from within a worker thread. +/// +/// # Panics +/// +/// This function will panic if called from a non-worker thread or if called +/// from the worker thread of another executor instance than the one the task +/// for this `Runnable` was spawned on. fn schedule_task(task: Runnable, executor_id: usize) { LOCAL_WORKER .map(|worker| { @@ -393,6 +415,8 @@ fn schedule_task(task: Runnable, executor_id: usize) { /// Processes all incoming tasks on a worker thread until the `Terminate` signal /// is received or until it panics. +/// +/// Panics caught in this thread are relayed to the main executor thread. fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { let pool_manager = &worker.executor_context.pool_manager; let injector = &worker.executor_context.injector; @@ -428,7 +452,6 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // No need to call `begin_worker_search()`: this was done by the // thread that unparked the worker. } else { - // There are tasks in the injector: resume the search. pool_manager.begin_worker_search(); } diff --git a/asynchronix/src/runtime/executor/injector.rs b/asynchronix/src/runtime/executor/injector.rs index 4349681..396e1f0 100644 --- a/asynchronix/src/runtime/executor/injector.rs +++ b/asynchronix/src/runtime/executor/injector.rs @@ -28,14 +28,16 @@ use std::{mem, vec}; /// queue and this bucket is only moved to the back of the queue when full. #[derive(Debug)] pub(crate) struct Injector { + /// A mutex-protected list of tasks. inner: Mutex>>, + /// A flag indicating whether the injector queue is empty. is_empty: AtomicBool, } impl Injector { /// Creates an empty injector queue. /// - /// # Panic + /// # Panics /// /// Panics if the capacity is 0. pub(crate) const fn new() -> Self { @@ -105,13 +107,16 @@ impl Injector { /// /// This is not an issue in practice because it cannot lead to executor /// deadlock. Indeed, if the last task/bucket was inserted by a worker - /// thread, this worker thread will always see that the injector queue is - /// populated (unless the bucket was already popped) so if all workers exit, - /// then all tasks they have re-injected will necessarily have been - /// processed. Likewise, if the last task/bucket was inserted by the main - /// executor thread before `Executor::run()` is called, the synchronization - /// established when the executor unparks worker threads ensures that the - /// task is visible to all unparked workers. + /// thread, that worker thread will always see that the injector queue is + /// populated (unless the bucket was already popped). Therefore, if all + /// workers exit, then all tasks they have re-injected will necessarily have + /// been processed. Likewise, if the last task/bucket was inserted by the + /// main executor thread before `Executor::run()` is called, the + /// synchronization established when the executor unparks worker threads + /// ensures that the task is visible to all unparked workers (there is + /// actually an edge case when the executor cannot unpark a thread after + /// pushing tasks, but this is taken care of by some extra synchronization + /// when deactivating workers). pub(crate) fn pop_bucket(&self) -> Option> { // Ordering: this flag is only used as a hint so Relaxed ordering is // sufficient. diff --git a/asynchronix/src/runtime/executor/pool_manager.rs b/asynchronix/src/runtime/executor/pool_manager.rs index e83b415..c1e327f 100644 --- a/asynchronix/src/runtime/executor/pool_manager.rs +++ b/asynchronix/src/runtime/executor/pool_manager.rs @@ -11,21 +11,29 @@ use super::Stealer; /// The manager currently only supports up to `usize::BITS` threads. #[derive(Debug)] pub(super) struct PoolManager { + /// Number of worker threads. pool_size: usize, + /// List of the stealers associated to each worker thread. stealers: Box<[Stealer]>, + /// List of the thread unparkers associated to each worker thread. worker_unparkers: Box<[parking::Unparker]>, + /// Bit field of all workers that are currently unparked. active_workers: AtomicUsize, + /// Count of all workers currently searching for tasks. searching_workers: AtomicUsize, + /// Flag requesting all workers to return immediately. terminate_signal: AtomicBool, + /// Panic caught in a worker thread. worker_panic: Mutex>>, #[cfg(feature = "dev-logs")] + /// Thread wake-up statistics. record: Record, } impl PoolManager { /// Creates a new pool manager. /// - /// #Panic + /// # Panics /// /// This will panic if the specified pool size is zero or is more than /// `usize::BITS`. @@ -91,13 +99,12 @@ impl PoolManager { let mut active_workers = self.active_workers.load(Ordering::Relaxed); loop { let first_idle_worker = active_workers.trailing_ones() as usize; - if first_idle_worker >= self.pool_size { // There is apparently no free worker, so a dummy RMW with // Release ordering is performed with the sole purpose of // synchronizing with the Acquire fence in `set_inactive` so - // that the last worker see the tasks that were queued prior to - // this call when calling (unsuccessfully) `set_inactive`.. + // that the last worker sees the tasks that were queued prior to + // this call to `activate_worker`. let new_active_workers = self.active_workers.fetch_or(0, Ordering::Release); if new_active_workers == active_workers { return; @@ -125,8 +132,9 @@ impl PoolManager { /// If this was the last active worker, `false` is returned and it is /// guaranteed that all memory operations performed by threads that called /// `activate_worker` will be visible. The worker is in such case expected - /// to check again the injector queue before explicitly calling - /// `set_all_workers_inactive` to confirm that the pool is indeed idle. + /// to check again the injector queue and then to explicitly call + /// `set_all_workers_inactive` if it can confirm that the injector queue is + /// empty. pub(super) fn try_set_worker_inactive(&self, worker_id: usize) -> bool { // Ordering: this Release operation synchronizes with the Acquire fence // in the below conditional if this is is the last active worker, and/or @@ -176,7 +184,8 @@ impl PoolManager { /// Marks all pool workers as inactive. /// - /// Unparking the executor threads is the responsibility of the caller. + /// This should only be called by the last active worker. Unparking the + /// executor threads is the responsibility of the caller. pub(super) fn set_all_workers_inactive(&self) { // Ordering: this Release store synchronizes with the Acquire load in // `is_idle`. @@ -187,7 +196,7 @@ impl PoolManager { /// /// If `true` is returned, it is guaranteed that all operations performed by /// the now-inactive workers become visible in this thread. - pub(super) fn is_pool_idle(&self) -> bool { + pub(super) fn pool_is_idle(&self) -> bool { // Ordering: this Acquire operation synchronizes with all Release // RMWs in the `set_worker_inactive` method via a release sequence. self.active_workers.load(Ordering::Acquire) == 0 @@ -278,6 +287,8 @@ pub(super) struct ShuffledStealers<'a> { next_candidate: usize, // index of the next candidate } impl<'a> ShuffledStealers<'a> { + /// A new `ShuffledStealer` iterator initialized at a randomly selected + /// active worker. fn new(candidates: usize, stealers: &'a [Stealer], rng: &'_ rng::Rng) -> Self { let (candidates, next_candidate) = if candidates == 0 { (0, 0) diff --git a/asynchronix/src/runtime/executor/worker.rs b/asynchronix/src/runtime/executor/worker.rs index 6f13b87..b815276 100644 --- a/asynchronix/src/runtime/executor/worker.rs +++ b/asynchronix/src/runtime/executor/worker.rs @@ -19,7 +19,7 @@ impl Worker { Self { local_queue, fast_slot: Cell::new(None), - executor_context: executor_context, + executor_context, } } }