From 77e6e569ffbc4c016d180d44f567111c53e8edaa Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Tue, 7 May 2024 17:30:11 +0200 Subject: [PATCH] Add same-thread executor support --- .github/workflows/ci.yml | 42 +- asynchronix/src/dev_hooks.rs | 2 +- asynchronix/src/executor.rs | 678 +++++------------- asynchronix/src/executor/mt_executor.rs | 576 +++++++++++++++ .../executor/{ => mt_executor}/injector.rs | 0 .../{ => mt_executor}/pool_manager.rs | 0 asynchronix/src/executor/st_executor.rs | 244 +++++++ asynchronix/src/executor/tests.rs | 140 ---- asynchronix/src/executor/worker.rs | 25 - asynchronix/src/macros/scoped_thread_local.rs | 32 +- asynchronix/src/rpc/generic_server.rs | 4 +- asynchronix/src/simulation/sim_init.rs | 19 +- 12 files changed, 1066 insertions(+), 696 deletions(-) create mode 100644 asynchronix/src/executor/mt_executor.rs rename asynchronix/src/executor/{ => mt_executor}/injector.rs (100%) rename asynchronix/src/executor/{ => mt_executor}/pool_manager.rs (100%) create mode 100644 asynchronix/src/executor/st_executor.rs delete mode 100644 asynchronix/src/executor/tests.rs delete mode 100644 asynchronix/src/executor/worker.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b0b9912..1821e5a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: pull_request: push: - branches: [ main, dev ] + branches: [main, dev] env: RUSTFLAGS: -Dwarnings @@ -42,7 +42,7 @@ jobs: - name: Run cargo test run: cargo test --features="rpc grpc-server" - + loom-dry-run: name: Loom dry run runs-on: ubuntu-latest @@ -70,23 +70,53 @@ jobs: with: components: miri - - name: Run cargo miri tests + - name: Run cargo miri tests (single-threaded executor) + run: cargo miri test --tests --lib --features="rpc grpc-server" + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri tests (multi-threaded executor) run: cargo miri test --tests --lib --features="rpc grpc-server" env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example1 + - name: Run cargo miri example1 (single-threaded executor) + run: cargo miri run --example espresso_machine + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example1 (multi-threaded executor) run: cargo miri run --example espresso_machine env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example2 + - name: Run cargo miri example2 (single-threaded executor) + run: cargo miri run --example power_supply + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example2 (multi-threaded executor) run: cargo miri run --example power_supply env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example3 + - name: Run cargo miri example3 (single-threaded executor) run: cargo miri run --example stepper_motor + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example3 (multi-threaded executor) + run: cargo miri run --example stepper_motor + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 + + - name: Run cargo miri example4 (single-threaded executor) + run: cargo miri run --example assembly + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example4 (multi-threaded executor) + run: cargo miri run --example assembly env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 96d948c..f79102f 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -15,7 +15,7 @@ impl Executor { /// /// The maximum number of threads is set with the `pool_size` parameter. pub fn new(pool_size: usize) -> Self { - Self(executor::Executor::new(pool_size)) + Self(executor::Executor::new_multi_threaded(pool_size)) } /// Spawns a task which output will never be retrieved. diff --git a/asynchronix/src/executor.rs b/asynchronix/src/executor.rs index b33a603..3d5a8de 100644 --- a/asynchronix/src/executor.rs +++ b/asynchronix/src/executor.rs @@ -1,98 +1,30 @@ -//! Multi-threaded `async` executor. -//! -//! The executor is exclusively designed for message-passing computational -//! tasks. As such, it does not include an I/O reactor and does not consider -//! fairness as a goal in itself. While it does use fair local queues inasmuch -//! as these tend to perform better in message-passing applications, it uses an -//! unfair injection queue and a LIFO slot without attempt to mitigate the -//! effect of badly behaving code (e.g. futures that spin-lock by yielding to -//! the executor; there is for this reason no support for something like tokio's -//! `yield_now`). -//! -//! Another way in which it differs from other `async` executors is that it -//! treats deadlocking as a normal occurrence. This is because in a -//! discrete-time simulator, the simulation of a system at a given time step -//! will make as much progress as possible until it technically reaches a -//! deadlock. Only then does the simulator advance the simulated time to that of -//! the next "event" extracted from a time-sorted priority queue. -//! -//! The design of the executor is largely influenced by the tokio and Go -//! schedulers, both of which are optimized for message-passing applications. In -//! particular, it uses fast, fixed-size thread-local work-stealing queues with -//! a non-stealable LIFO slot in combination with an injector queue, which -//! injector queue is used both to schedule new tasks and to absorb temporary -//! overflow in the local queues. -//! -//! The design of the injector queue is kept very simple compared to tokio, by -//! taking advantage of the fact that the injector is not required to be either -//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast -//! because tasks are moved in batch and are stored contiguously in memory. -//! -//! Another difference with tokio is that, at the moment, the complete subset of -//! active worker threads is stored in a single atomic variable. This makes it -//! possible to rapidly identify free worker threads for stealing operations, -//! with the downside that the maximum number of worker threads is currently -//! limited to `usize::BITS`. This is not expected to constitute a limitation in -//! practice since system simulation is not typically embarrassingly parallel. -//! -//! Probably the largest difference with tokio is the task system, which has -//! better throughput due to less need for synchronization. This mainly results -//! from the use of an atomic notification counter rather than an atomic -//! notification flag, thus alleviating the need to reset the notification flag -//! before polling a future. +//! `async` executor trait. -use std::fmt; -use std::future::Future; -use std::panic::{self, AssertUnwindSafe}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::{self, JoinHandle}; -use std::time::{Duration, Instant}; - -use crossbeam_utils::sync::{Parker, Unparker}; -use slab::Slab; - -mod injector; -mod pool_manager; +mod mt_executor; +mod st_executor; mod task; -mod worker; -#[cfg(all(test, not(asynchronix_loom)))] -mod tests; +use std::future::Future; +use std::sync::atomic::AtomicUsize; -use crate::macros::scoped_thread_local::scoped_thread_local; -use crate::util::rng::Rng; - -use self::pool_manager::PoolManager; -use self::task::{CancelToken, Promise, Runnable}; -use self::worker::Worker; - -const BUCKET_SIZE: usize = 128; -const QUEUE_SIZE: usize = BUCKET_SIZE * 2; +use task::Promise; +/// Unique identifier for executor instances. static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); -type Bucket = injector::Bucket; -type Injector = injector::Injector; -type LocalQueue = st3::fifo::Worker; -type Stealer = st3::fifo::Stealer; - -scoped_thread_local!(static LOCAL_WORKER: Worker); -scoped_thread_local!(static ACTIVE_TASKS: Mutex>); - -/// A multi-threaded `async` executor. -pub(crate) struct Executor { - /// Shared executor data. - context: Arc, - /// List of tasks that have not completed yet. - active_tasks: Arc>>, - /// Parker for the main executor thread. - parker: Parker, - /// Handles to the worker threads. - worker_handles: Vec>, +/// A single-threaded or multi-threaded `async` executor. +#[derive(Debug)] +pub(crate) enum Executor { + StExecutor(st_executor::Executor), + MtExecutor(mt_executor::Executor), } impl Executor { + /// Creates an executor that runs futures on the current thread. + pub(crate) fn new_single_threaded() -> Self { + Self::StExecutor(st_executor::Executor::new()) + } + /// Creates an executor that runs futures on a thread pool. /// /// The maximum number of threads is set with the `num_threads` parameter. @@ -101,78 +33,11 @@ impl Executor { /// /// This will panic if the specified number of threads is zero or is more /// than `usize::BITS`. - pub(crate) fn new(num_threads: usize) -> Self { - let parker = Parker::new(); - let unparker = parker.unparker().clone(); - - let (local_queues_and_parkers, stealers_and_unparkers): (Vec<_>, Vec<_>) = (0..num_threads) - .map(|_| { - let parker = Parker::new(); - let unparker = parker.unparker().clone(); - let local_queue = LocalQueue::new(QUEUE_SIZE); - let stealer = local_queue.stealer(); - - ((local_queue, parker), (stealer, unparker)) - }) - .unzip(); - - // Each executor instance has a unique ID inherited by tasks to ensure - // that tasks are scheduled on their parent executor. - let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); - assert!( - executor_id <= usize::MAX / 2, - "too many executors have been instantiated" - ); - - let context = Arc::new(ExecutorContext::new( - executor_id, - unparker, - stealers_and_unparkers.into_iter(), - )); - let active_tasks = Arc::new(Mutex::new(Slab::new())); - - // All workers must be marked as active _before_ spawning the threads to - // make sure that the count of active workers does not fall to zero - // before all workers are blocked on the signal barrier. - context.pool_manager.set_all_workers_active(); - - // Spawn all worker threads. - let worker_handles: Vec<_> = local_queues_and_parkers - .into_iter() - .enumerate() - .map(|(id, (local_queue, worker_parker))| { - let thread_builder = thread::Builder::new().name(format!("Worker #{}", id)); - - thread_builder - .spawn({ - let context = context.clone(); - let active_tasks = active_tasks.clone(); - move || { - let worker = Worker::new(local_queue, context); - ACTIVE_TASKS.set(&active_tasks, || { - LOCAL_WORKER - .set(&worker, || run_local_worker(&worker, id, worker_parker)) - }); - } - }) - .unwrap() - }) - .collect(); - - // Wait until all workers are blocked on the signal barrier. - parker.park(); - assert!(context.pool_manager.pool_is_idle()); - - Self { - context, - active_tasks, - parker, - worker_handles, - } + pub(crate) fn new_multi_threaded(num_threads: usize) -> Self { + Self::MtExecutor(mt_executor::Executor::new(num_threads)) } - /// Spawns a task and returns a promise that can be polled to retrieve the - /// task's output. + /// Spawns a task which output will never be retrieved. /// /// Note that spawned tasks are not executed until [`run()`](Executor::run) /// is called. @@ -182,28 +47,14 @@ impl Executor { T: Future + Send + 'static, T::Output: Send + 'static, { - // Book a slot to store the task cancellation token. - let mut active_tasks = self.active_tasks.lock().unwrap(); - let task_entry = active_tasks.vacant_entry(); - - // Wrap the future so that it removes its cancel token from the - // executor's list when dropped. - let future = CancellableFuture::new(future, task_entry.key()); - - let (promise, runnable, cancel_token) = - task::spawn(future, schedule_task, self.context.executor_id); - - task_entry.insert(cancel_token); - self.context.injector.insert_task(runnable); - - promise + match self { + Self::StExecutor(executor) => executor.spawn(future), + Self::MtExecutor(executor) => executor.spawn(future), + } } /// Spawns a task which output will never be retrieved. /// - /// This is mostly useful to avoid undue reference counting for futures that - /// return a `()` type. - /// /// Note that spawned tasks are not executed until [`run()`](Executor::run) /// is called. pub(crate) fn spawn_and_forget(&self, future: T) @@ -211,354 +62,171 @@ impl Executor { T: Future + Send + 'static, T::Output: Send + 'static, { - // Book a slot to store the task cancellation token. - let mut active_tasks = self.active_tasks.lock().unwrap(); - let task_entry = active_tasks.vacant_entry(); - - // Wrap the future so that it removes its cancel token from the - // executor's list when dropped. - let future = CancellableFuture::new(future, task_entry.key()); - - let (runnable, cancel_token) = - task::spawn_and_forget(future, schedule_task, self.context.executor_id); - - task_entry.insert(cancel_token); - self.context.injector.insert_task(runnable); + match self { + Self::StExecutor(executor) => executor.spawn_and_forget(future), + Self::MtExecutor(executor) => executor.spawn_and_forget(future), + } } /// Execute spawned tasks, blocking until all futures have completed or /// until the executor reaches a deadlock. pub(crate) fn run(&mut self) { - self.context.pool_manager.activate_worker(); + match self { + Self::StExecutor(executor) => executor.run(), + Self::MtExecutor(executor) => executor.run(), + } + } +} - loop { - if let Some(worker_panic) = self.context.pool_manager.take_panic() { - panic::resume_unwind(worker_panic); +#[cfg(all(test, not(asynchronix_loom)))] +mod tests { + use std::sync::atomic::Ordering; + use std::sync::Arc; + + use futures_channel::{mpsc, oneshot}; + use futures_util::StreamExt; + + use super::*; + + /// An object that runs an arbitrary closure when dropped. + struct RunOnDrop { + drop_fn: Option, + } + impl RunOnDrop { + /// Creates a new `RunOnDrop`. + fn new(drop_fn: F) -> Self { + Self { + drop_fn: Some(drop_fn), } - if self.context.pool_manager.pool_is_idle() { - return; - } - - self.parker.park(); } } -} - -impl Drop for Executor { - fn drop(&mut self) { - // Force all threads to return. - self.context.pool_manager.trigger_termination(); - for handle in self.worker_handles.drain(0..) { - handle.join().unwrap(); - } - - // Drop all tasks that have not completed. - // - // A local worker must be set because some tasks may schedule other - // tasks when dropped, which requires that a local worker be available. - let worker = Worker::new(LocalQueue::new(QUEUE_SIZE), self.context.clone()); - LOCAL_WORKER.set(&worker, || { - // Cancel all pending futures. - // - // `ACTIVE_TASKS` is explicitly unset to prevent - // `CancellableFuture::drop()` from trying to remove its own token - // from the list of active tasks as this would result in a reentrant - // lock. This is mainly to stay on the safe side: `ACTIVE_TASKS` - // should not be set on this thread anyway, unless for some reason - // the executor runs inside another executor. - ACTIVE_TASKS.unset(|| { - let mut tasks = self.active_tasks.lock().unwrap(); - for task in tasks.drain() { - task.cancel(); - } - - // Some of the dropped tasks may have scheduled other tasks that - // were not yet cancelled, preventing them from being dropped - // upon cancellation. This is OK: the scheduled tasks will be - // dropped when the local and injector queues are dropped, and - // they cannot re-schedule one another since all tasks were - // cancelled. - }); - }); - } -} - -impl fmt::Debug for Executor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Executor").finish_non_exhaustive() - } -} - -/// Shared executor context. -/// -/// This contains all executor resources that can be shared between threads. -struct ExecutorContext { - /// Injector queue. - injector: Injector, - /// Unique executor ID inherited by all tasks spawned on this executor instance. - executor_id: usize, - /// Unparker for the main executor thread. - executor_unparker: Unparker, - /// Manager for all worker threads. - pool_manager: PoolManager, -} - -impl ExecutorContext { - /// Creates a new shared executor context. - pub(super) fn new( - executor_id: usize, - executor_unparker: Unparker, - stealers_and_unparkers: impl Iterator, - ) -> Self { - let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = - stealers_and_unparkers.into_iter().unzip(); - let worker_unparkers = worker_unparkers.into_boxed_slice(); - - Self { - injector: Injector::new(), - executor_id, - executor_unparker, - pool_manager: PoolManager::new( - worker_unparkers.len(), - stealers.into_boxed_slice(), - worker_unparkers, - ), + impl Drop for RunOnDrop { + fn drop(&mut self) { + self.drop_fn.take().map(|f| f()); } } -} -/// A `Future` wrapper that removes its cancellation token from the list of -/// active tasks when dropped. -struct CancellableFuture { - inner: T, - cancellation_key: usize, -} + fn executor_deadlock(mut executor: Executor) { + let (_sender1, receiver1) = oneshot::channel::<()>(); + let (_sender2, receiver2) = oneshot::channel::<()>(); -impl CancellableFuture { - /// Creates a new `CancellableFuture`. - fn new(fut: T, cancellation_key: usize) -> Self { - Self { - inner: fut, - cancellation_key, - } - } -} + let launch_count = Arc::new(AtomicUsize::new(0)); + let completion_count = Arc::new(AtomicUsize::new(0)); -impl Future for CancellableFuture { - type Output = T::Output; + executor.spawn_and_forget({ + let launch_count = launch_count.clone(); + let completion_count = completion_count.clone(); - #[inline(always)] - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } - } -} - -impl Drop for CancellableFuture { - fn drop(&mut self) { - // Remove the task from the list of active tasks if the future is - // dropped on a worker thread. Otherwise do nothing and let the - // executor's drop handler do the cleanup. - let _ = ACTIVE_TASKS.map(|active_tasks| { - // Don't unwrap on `lock()` because this function can be called from - // a destructor and should not panic. In the worse case, the cancel - // token will be left in the list of active tasks, which does - // prevents eager task deallocation but does not cause any issue - // otherwise. - if let Ok(mut active_tasks) = active_tasks.lock() { - let _cancel_token = active_tasks.try_remove(self.cancellation_key); + async move { + launch_count.fetch_add(1, Ordering::Relaxed); + let _ = receiver2.await; + completion_count.fetch_add(1, Ordering::Relaxed); } }); - } -} - -/// Schedules a `Runnable` from within a worker thread. -/// -/// # Panics -/// -/// This function will panic if called from a non-worker thread or if called -/// from the worker thread of another executor instance than the one the task -/// for this `Runnable` was spawned on. -fn schedule_task(task: Runnable, executor_id: usize) { - LOCAL_WORKER - .map(|worker| { - let pool_manager = &worker.executor_context.pool_manager; - let injector = &worker.executor_context.injector; - let local_queue = &worker.local_queue; - let fast_slot = &worker.fast_slot; - - // Check that this task was indeed spawned on this executor. - assert_eq!( - executor_id, worker.executor_context.executor_id, - "Tasks must be awaken on the same executor they are spawned on" - ); - - // Store the task in the fast slot and retrieve the one that was - // formerly stored, if any. - let prev_task = match fast_slot.replace(Some(task)) { - // If there already was a task in the slot, proceed so it can be - // moved to a task queue. - Some(t) => t, - // Otherwise return immediately: this task cannot be stolen so - // there is no point in activating a sibling worker. - None => return, - }; - - // Push the previous task to the local queue if possible or on the - // injector queue otherwise. - if let Err(prev_task) = local_queue.push(prev_task) { - // The local queue is full. Try to move half of it to the - // injector queue; if this fails, just push one task to the - // injector queue. - if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) { - injector.push_bucket(Bucket::from_iter(drain)); - local_queue.push(prev_task).unwrap(); - } else { - injector.insert_task(prev_task); - } - } - - // A task has been pushed to the local or injector queue: try to - // activate another worker if no worker is currently searching for a - // task. - if pool_manager.searching_worker_count() == 0 { - pool_manager.activate_worker_relaxed(); - } - }) - .expect("Tasks may not be awaken outside executor threads"); -} - -/// Processes all incoming tasks on a worker thread until the `Terminate` signal -/// is received or until it panics. -/// -/// Panics caught in this thread are relayed to the main executor thread. -fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { - let pool_manager = &worker.executor_context.pool_manager; - let injector = &worker.executor_context.injector; - let executor_unparker = &worker.executor_context.executor_unparker; - let local_queue = &worker.local_queue; - let fast_slot = &worker.fast_slot; - - let result = panic::catch_unwind(AssertUnwindSafe(|| { - // Set how long to spin when searching for a task. - const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); - - // Seed a thread RNG with the worker ID. - let rng = Rng::new(id as u64); - - loop { - // Signal barrier: park until notified to continue or terminate. - - // Try to deactivate the worker. - if pool_manager.try_set_worker_inactive(id) { - parker.park(); - // No need to call `begin_worker_search()`: this was done by the - // thread that unparked the worker. - } else if injector.is_empty() { - // This worker could not be deactivated because it was the last - // active worker. In such case, the call to - // `try_set_worker_inactive` establishes a synchronization with - // all threads that pushed tasks to the injector queue but could - // not activate a new worker, which is why some tasks may now be - // visible in the injector queue. - pool_manager.set_all_workers_inactive(); - executor_unparker.unpark(); - parker.park(); - // No need to call `begin_worker_search()`: this was done by the - // thread that unparked the worker. - } else { - pool_manager.begin_worker_search(); - } - - if pool_manager.termination_is_triggered() { - return; - } - - let mut search_start = Instant::now(); - - // Process the tasks one by one. - loop { - // Check the injector queue first. - if let Some(bucket) = injector.pop_bucket() { - let bucket_iter = bucket.into_iter(); - - // There is a _very_ remote possibility that, even though - // the local queue is empty, it has temporarily too little - // spare capacity for the bucket. This could happen if a - // concurrent steal operation was preempted for all the time - // it took to pop and process the remaining tasks and it - // hasn't released the stolen capacity yet. - // - // Unfortunately, we cannot just skip checking the injector - // queue altogether when there isn't enough spare capacity - // in the local queue because this could lead to a race: - // suppose that (1) this thread has earlier pushed tasks - // onto the injector queue, and (2) the stealer has - // processed all stolen tasks before this thread sees the - // capacity restored and at the same time (3) the stealer - // does not yet see the tasks this thread pushed to the - // injector queue; in such scenario, both this thread and - // the stealer thread may park and leave unprocessed tasks - // in the injector queue. - // - // This is the only instance where spinning is used, as the - // probability of this happening is close to zero and the - // complexity of a signaling mechanism (condvar & friends) - // wouldn't carry its weight. - while local_queue.spare_capacity() < bucket_iter.len() {} - - // Since empty buckets are never pushed onto the injector - // queue, we should now have at least one task to process. - local_queue.extend(bucket_iter); - } else { - // The injector queue is empty. Try to steal from active - // siblings. - let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng); - if stealers.all(|stealer| { - stealer - .steal_and_pop(local_queue, |n| n - n / 2) - .map(|(task, _)| { - let prev_task = fast_slot.replace(Some(task)); - assert!(prev_task.is_none()); - }) - .is_err() - }) { - // Give up if unsuccessful for too long. - if (Instant::now() - search_start) > MAX_SEARCH_DURATION { - pool_manager.end_worker_search(); - break; - } - - // Re-try. - continue; - } - } - - // Signal the end of the search so that another worker can be - // activated when a new task is scheduled. - pool_manager.end_worker_search(); - - // Pop tasks from the fast slot or the local queue. - while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { - if pool_manager.termination_is_triggered() { - return; - } - task.run(); - } - - // Resume the search for tasks. - pool_manager.begin_worker_search(); - search_start = Instant::now(); - } - } - })); - - // Propagate the panic, if any. - if let Err(panic) = result { - pool_manager.register_panic(panic); - pool_manager.trigger_termination(); - executor_unparker.unpark(); + executor.spawn_and_forget({ + let launch_count = launch_count.clone(); + let completion_count = completion_count.clone(); + + async move { + launch_count.fetch_add(1, Ordering::Relaxed); + let _ = receiver1.await; + completion_count.fetch_add(1, Ordering::Relaxed); + } + }); + + executor.run(); + + // Check that the executor returns on deadlock, i.e. none of the task has + // completed. + assert_eq!(launch_count.load(Ordering::Relaxed), 2); + assert_eq!(completion_count.load(Ordering::Relaxed), 0); + + // Drop the executor and thus the receiver tasks before the senders, + // failing which the senders may signal that the channel has been + // dropped and wake the tasks outside the executor. + drop(executor); + } + + fn executor_drop_cycle(mut executor: Executor) { + let (sender1, mut receiver1) = mpsc::channel(2); + let (sender2, mut receiver2) = mpsc::channel(2); + let (sender3, mut receiver3) = mpsc::channel(2); + + let drop_count = Arc::new(AtomicUsize::new(0)); + + // Spawn 3 tasks that wake one another when dropped. + executor.spawn_and_forget({ + let mut sender2 = sender2.clone(); + let mut sender3 = sender3.clone(); + let drop_count = drop_count.clone(); + + async move { + let _guard = RunOnDrop::new(move || { + let _ = sender2.try_send(()); + let _ = sender3.try_send(()); + drop_count.fetch_add(1, Ordering::Relaxed); + }); + let _ = receiver1.next().await; + } + }); + executor.spawn_and_forget({ + let mut sender1 = sender1.clone(); + let mut sender3 = sender3.clone(); + let drop_count = drop_count.clone(); + + async move { + let _guard = RunOnDrop::new(move || { + let _ = sender1.try_send(()); + let _ = sender3.try_send(()); + drop_count.fetch_add(1, Ordering::Relaxed); + }); + let _ = receiver2.next().await; + } + }); + executor.spawn_and_forget({ + let mut sender1 = sender1.clone(); + let mut sender2 = sender2.clone(); + let drop_count = drop_count.clone(); + + async move { + let _guard = RunOnDrop::new(move || { + let _ = sender1.try_send(()); + let _ = sender2.try_send(()); + drop_count.fetch_add(1, Ordering::Relaxed); + }); + let _ = receiver3.next().await; + } + }); + + executor.run(); + + // Make sure that all tasks are eventually dropped even though each task + // wakes the others when dropped. + drop(executor); + assert_eq!(drop_count.load(Ordering::Relaxed), 3); + } + + #[test] + fn executor_deadlock_st() { + executor_deadlock(Executor::new_single_threaded()); + } + + #[test] + fn executor_deadlock_mt() { + executor_deadlock(Executor::new_multi_threaded(3)); + } + + #[test] + fn executor_deadlock_mt_one_worker() { + executor_deadlock(Executor::new_multi_threaded(1)); + } + #[test] + fn executor_drop_cycle_st() { + executor_drop_cycle(Executor::new_single_threaded()); + } + + #[test] + fn executor_drop_cycle_mt() { + executor_drop_cycle(Executor::new_multi_threaded(3)); } } diff --git a/asynchronix/src/executor/mt_executor.rs b/asynchronix/src/executor/mt_executor.rs new file mode 100644 index 0000000..5859cdf --- /dev/null +++ b/asynchronix/src/executor/mt_executor.rs @@ -0,0 +1,576 @@ +//! Multi-threaded `async` executor. +//! +//! The executor is exclusively designed for message-passing computational +//! tasks. As such, it does not include an I/O reactor and does not consider +//! fairness as a goal in itself. While it does use fair local queues inasmuch +//! as these tend to perform better in message-passing applications, it uses an +//! unfair injection queue and a LIFO slot without attempt to mitigate the +//! effect of badly behaving code (e.g. futures that spin-lock by yielding to +//! the executor; there is for this reason no support for something like tokio's +//! `yield_now`). +//! +//! Another way in which it differs from other `async` executors is that it +//! treats deadlocking as a normal occurrence. This is because in a +//! discrete-time simulator, the simulation of a system at a given time step +//! will make as much progress as possible until it technically reaches a +//! deadlock. Only then does the simulator advance the simulated time to that of +//! the next "event" extracted from a time-sorted priority queue. +//! +//! The design of the executor is largely influenced by the tokio and Go +//! schedulers, both of which are optimized for message-passing applications. In +//! particular, it uses fast, fixed-size thread-local work-stealing queues with +//! a non-stealable LIFO slot in combination with an injector queue, which +//! injector queue is used both to schedule new tasks and to absorb temporary +//! overflow in the local queues. +//! +//! The design of the injector queue is kept very simple compared to tokio, by +//! taking advantage of the fact that the injector is not required to be either +//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast +//! because tasks are moved in batch and are stored contiguously in memory. +//! +//! Another difference with tokio is that, at the moment, the complete subset of +//! active worker threads is stored in a single atomic variable. This makes it +//! possible to rapidly identify free worker threads for stealing operations, +//! with the downside that the maximum number of worker threads is currently +//! limited to `usize::BITS`. This is not expected to constitute a limitation in +//! practice since system simulation is not typically embarrassingly parallel. +//! +//! Probably the largest difference with tokio is the task system, which has +//! better throughput due to less need for synchronization. This mainly results +//! from the use of an atomic notification counter rather than an atomic +//! notification flag, thus alleviating the need to reset the notification flag +//! before polling a future. + +mod injector; +mod pool_manager; + +use std::cell::Cell; +use std::fmt; +use std::future::Future; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::atomic::Ordering; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use crossbeam_utils::sync::{Parker, Unparker}; +use slab::Slab; + +use crate::macros::scoped_thread_local::scoped_thread_local; +use crate::util::rng::Rng; + +use super::task::{self, CancelToken, Promise, Runnable}; +use super::NEXT_EXECUTOR_ID; +use pool_manager::PoolManager; + +const BUCKET_SIZE: usize = 128; +const QUEUE_SIZE: usize = BUCKET_SIZE * 2; + +type Bucket = injector::Bucket; +type Injector = injector::Injector; +type LocalQueue = st3::fifo::Worker; +type Stealer = st3::fifo::Stealer; + +scoped_thread_local!(static LOCAL_WORKER: Worker); +scoped_thread_local!(static ACTIVE_TASKS: Mutex>); + +/// A multi-threaded `async` executor. +pub(crate) struct Executor { + /// Shared executor data. + context: Arc, + /// List of tasks that have not completed yet. + active_tasks: Arc>>, + /// Parker for the main executor thread. + parker: Parker, + /// Handles to the worker threads. + worker_handles: Vec>, +} + +impl Executor { + /// Creates an executor that runs futures on a thread pool. + /// + /// The maximum number of threads is set with the `num_threads` parameter. + /// + /// # Panics + /// + /// This will panic if the specified number of threads is zero or is more + /// than `usize::BITS`. + pub(crate) fn new(num_threads: usize) -> Self { + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + + let (local_queues_and_parkers, stealers_and_unparkers): (Vec<_>, Vec<_>) = (0..num_threads) + .map(|_| { + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + let local_queue = LocalQueue::new(QUEUE_SIZE); + let stealer = local_queue.stealer(); + + ((local_queue, parker), (stealer, unparker)) + }) + .unzip(); + + // Each executor instance has a unique ID inherited by tasks to ensure + // that tasks are scheduled on their parent executor. + let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); + assert!( + executor_id <= usize::MAX / 2, + "too many executors have been instantiated" + ); + + let context = Arc::new(ExecutorContext::new( + executor_id, + unparker, + stealers_and_unparkers.into_iter(), + )); + let active_tasks = Arc::new(Mutex::new(Slab::new())); + + // All workers must be marked as active _before_ spawning the threads to + // make sure that the count of active workers does not fall to zero + // before all workers are blocked on the signal barrier. + context.pool_manager.set_all_workers_active(); + + // Spawn all worker threads. + let worker_handles: Vec<_> = local_queues_and_parkers + .into_iter() + .enumerate() + .map(|(id, (local_queue, worker_parker))| { + let thread_builder = thread::Builder::new().name(format!("Worker #{}", id)); + + thread_builder + .spawn({ + let context = context.clone(); + let active_tasks = active_tasks.clone(); + move || { + let worker = Worker::new(local_queue, context); + ACTIVE_TASKS.set(&active_tasks, || { + LOCAL_WORKER + .set(&worker, || run_local_worker(&worker, id, worker_parker)) + }); + } + }) + .unwrap() + }) + .collect(); + + // Wait until all workers are blocked on the signal barrier. + parker.park(); + assert!(context.pool_manager.pool_is_idle()); + + Self { + context, + active_tasks, + parker, + worker_handles, + } + } + + /// Spawns a task and returns a promise that can be polled to retrieve the + /// task's output. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn(&self, future: T) -> Promise + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.lock().unwrap(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (promise, runnable, cancel_token) = + task::spawn(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + self.context.injector.insert_task(runnable); + + promise + } + + /// Spawns a task which output will never be retrieved. + /// + /// This is mostly useful to avoid undue reference counting for futures that + /// return a `()` type. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn_and_forget(&self, future: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.lock().unwrap(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (runnable, cancel_token) = + task::spawn_and_forget(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + self.context.injector.insert_task(runnable); + } + + /// Execute spawned tasks, blocking until all futures have completed or + /// until the executor reaches a deadlock. + pub(crate) fn run(&mut self) { + self.context.pool_manager.activate_worker(); + + loop { + if let Some(worker_panic) = self.context.pool_manager.take_panic() { + panic::resume_unwind(worker_panic); + } + if self.context.pool_manager.pool_is_idle() { + return; + } + + self.parker.park(); + } + } +} + +impl Drop for Executor { + fn drop(&mut self) { + // Force all threads to return. + self.context.pool_manager.trigger_termination(); + for handle in self.worker_handles.drain(0..) { + handle.join().unwrap(); + } + + // Drop all tasks that have not completed. + // + // A local worker must be set because some tasks may schedule other + // tasks when dropped, which requires that a local worker be available. + let worker = Worker::new(LocalQueue::new(QUEUE_SIZE), self.context.clone()); + LOCAL_WORKER.set(&worker, || { + // Cancel all pending futures. + // + // `ACTIVE_TASKS` is explicitly unset to prevent + // `CancellableFuture::drop()` from trying to remove its own token + // from the list of active tasks as this would result in a reentrant + // lock. This is mainly to stay on the safe side: `ACTIVE_TASKS` + // should not be set on this thread anyway, unless for some reason + // the executor runs inside another executor. + ACTIVE_TASKS.unset(|| { + let mut tasks = self.active_tasks.lock().unwrap(); + for task in tasks.drain() { + task.cancel(); + } + + // Some of the dropped tasks may have scheduled other tasks that + // were not yet cancelled, preventing them from being dropped + // upon cancellation. This is OK: the scheduled tasks will be + // dropped when the local and injector queues are dropped, and + // they cannot re-schedule one another since all tasks were + // cancelled. + }); + }); + } +} + +impl fmt::Debug for Executor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Executor").finish_non_exhaustive() + } +} + +/// Shared executor context. +/// +/// This contains all executor resources that can be shared between threads. +struct ExecutorContext { + /// Injector queue. + injector: Injector, + /// Unique executor identifier inherited by all tasks spawned on this + /// executor instance. + executor_id: usize, + /// Unparker for the main executor thread. + executor_unparker: Unparker, + /// Manager for all worker threads. + pool_manager: PoolManager, +} + +impl ExecutorContext { + /// Creates a new shared executor context. + pub(super) fn new( + executor_id: usize, + executor_unparker: Unparker, + stealers_and_unparkers: impl Iterator, + ) -> Self { + let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = + stealers_and_unparkers.into_iter().unzip(); + let worker_unparkers = worker_unparkers.into_boxed_slice(); + + Self { + injector: Injector::new(), + executor_id, + executor_unparker, + pool_manager: PoolManager::new( + worker_unparkers.len(), + stealers.into_boxed_slice(), + worker_unparkers, + ), + } + } +} + +/// A `Future` wrapper that removes its cancellation token from the list of +/// active tasks when dropped. +struct CancellableFuture { + inner: T, + cancellation_key: usize, +} + +impl CancellableFuture { + /// Creates a new `CancellableFuture`. + fn new(fut: T, cancellation_key: usize) -> Self { + Self { + inner: fut, + cancellation_key, + } + } +} + +impl Future for CancellableFuture { + type Output = T::Output; + + #[inline(always)] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } + } +} + +impl Drop for CancellableFuture { + fn drop(&mut self) { + // Remove the task from the list of active tasks if the future is + // dropped on a worker thread. Otherwise do nothing and let the + // executor's drop handler do the cleanup. + let _ = ACTIVE_TASKS.map(|active_tasks| { + // Don't unwrap on `lock()` because this function can be called from + // a destructor and should not panic. In the worse case, the cancel + // token will be left in the list of active tasks, which does + // prevents eager task deallocation but does not cause any issue + // otherwise. + if let Ok(mut active_tasks) = active_tasks.lock() { + let _cancel_token = active_tasks.try_remove(self.cancellation_key); + } + }); + } +} + +/// A local worker with access to global executor resources. +pub(crate) struct Worker { + local_queue: LocalQueue, + fast_slot: Cell>, + executor_context: Arc, +} + +impl Worker { + /// Creates a new worker. + fn new(local_queue: LocalQueue, executor_context: Arc) -> Self { + Self { + local_queue, + fast_slot: Cell::new(None), + executor_context, + } + } +} + +/// Schedules a `Runnable` from within a worker thread. +/// +/// # Panics +/// +/// This function will panic if called from a non-worker thread or if called +/// from the worker thread of another executor instance than the one the task +/// for this `Runnable` was spawned on. +fn schedule_task(task: Runnable, executor_id: usize) { + LOCAL_WORKER + .map(|worker| { + let pool_manager = &worker.executor_context.pool_manager; + let injector = &worker.executor_context.injector; + let local_queue = &worker.local_queue; + let fast_slot = &worker.fast_slot; + + // Check that this task was indeed spawned on this executor. + assert_eq!( + executor_id, worker.executor_context.executor_id, + "Tasks must be awaken on the same executor they are spawned on" + ); + + // Store the task in the fast slot and retrieve the one that was + // formerly stored, if any. + let prev_task = match fast_slot.replace(Some(task)) { + // If there already was a task in the slot, proceed so it can be + // moved to a task queue. + Some(t) => t, + // Otherwise return immediately: this task cannot be stolen so + // there is no point in activating a sibling worker. + None => return, + }; + + // Push the previous task to the local queue if possible or on the + // injector queue otherwise. + if let Err(prev_task) = local_queue.push(prev_task) { + // The local queue is full. Try to move half of it to the + // injector queue; if this fails, just push one task to the + // injector queue. + if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) { + injector.push_bucket(Bucket::from_iter(drain)); + local_queue.push(prev_task).unwrap(); + } else { + injector.insert_task(prev_task); + } + } + + // A task has been pushed to the local or injector queue: try to + // activate another worker if no worker is currently searching for a + // task. + if pool_manager.searching_worker_count() == 0 { + pool_manager.activate_worker_relaxed(); + } + }) + .expect("Tasks may not be awaken outside executor threads"); +} + +/// Processes all incoming tasks on a worker thread until the `Terminate` signal +/// is received or until it panics. +/// +/// Panics caught in this thread are relayed to the main executor thread. +fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { + let pool_manager = &worker.executor_context.pool_manager; + let injector = &worker.executor_context.injector; + let executor_unparker = &worker.executor_context.executor_unparker; + let local_queue = &worker.local_queue; + let fast_slot = &worker.fast_slot; + + let result = panic::catch_unwind(AssertUnwindSafe(|| { + // Set how long to spin when searching for a task. + const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); + + // Seed a thread RNG with the worker ID. + let rng = Rng::new(id as u64); + + loop { + // Signal barrier: park until notified to continue or terminate. + + // Try to deactivate the worker. + if pool_manager.try_set_worker_inactive(id) { + parker.park(); + // No need to call `begin_worker_search()`: this was done by the + // thread that unparked the worker. + } else if injector.is_empty() { + // This worker could not be deactivated because it was the last + // active worker. In such case, the call to + // `try_set_worker_inactive` establishes a synchronization with + // all threads that pushed tasks to the injector queue but could + // not activate a new worker, which is why some tasks may now be + // visible in the injector queue. + pool_manager.set_all_workers_inactive(); + executor_unparker.unpark(); + parker.park(); + // No need to call `begin_worker_search()`: this was done by the + // thread that unparked the worker. + } else { + pool_manager.begin_worker_search(); + } + + if pool_manager.termination_is_triggered() { + return; + } + + let mut search_start = Instant::now(); + + // Process the tasks one by one. + loop { + // Check the injector queue first. + if let Some(bucket) = injector.pop_bucket() { + let bucket_iter = bucket.into_iter(); + + // There is a _very_ remote possibility that, even though + // the local queue is empty, it has temporarily too little + // spare capacity for the bucket. This could happen if a + // concurrent steal operation was preempted for all the time + // it took to pop and process the remaining tasks and it + // hasn't released the stolen capacity yet. + // + // Unfortunately, we cannot just skip checking the injector + // queue altogether when there isn't enough spare capacity + // in the local queue because this could lead to a race: + // suppose that (1) this thread has earlier pushed tasks + // onto the injector queue, and (2) the stealer has + // processed all stolen tasks before this thread sees the + // capacity restored and at the same time (3) the stealer + // does not yet see the tasks this thread pushed to the + // injector queue; in such scenario, both this thread and + // the stealer thread may park and leave unprocessed tasks + // in the injector queue. + // + // This is the only instance where spinning is used, as the + // probability of this happening is close to zero and the + // complexity of a signaling mechanism (condvar & friends) + // wouldn't carry its weight. + while local_queue.spare_capacity() < bucket_iter.len() {} + + // Since empty buckets are never pushed onto the injector + // queue, we should now have at least one task to process. + local_queue.extend(bucket_iter); + } else { + // The injector queue is empty. Try to steal from active + // siblings. + let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng); + if stealers.all(|stealer| { + stealer + .steal_and_pop(local_queue, |n| n - n / 2) + .map(|(task, _)| { + let prev_task = fast_slot.replace(Some(task)); + assert!(prev_task.is_none()); + }) + .is_err() + }) { + // Give up if unsuccessful for too long. + if (Instant::now() - search_start) > MAX_SEARCH_DURATION { + pool_manager.end_worker_search(); + break; + } + + // Re-try. + continue; + } + } + + // Signal the end of the search so that another worker can be + // activated when a new task is scheduled. + pool_manager.end_worker_search(); + + // Pop tasks from the fast slot or the local queue. + while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { + if pool_manager.termination_is_triggered() { + return; + } + task.run(); + } + + // Resume the search for tasks. + pool_manager.begin_worker_search(); + search_start = Instant::now(); + } + } + })); + + // Propagate the panic, if any. + if let Err(panic) = result { + pool_manager.register_panic(panic); + pool_manager.trigger_termination(); + executor_unparker.unpark(); + } +} diff --git a/asynchronix/src/executor/injector.rs b/asynchronix/src/executor/mt_executor/injector.rs similarity index 100% rename from asynchronix/src/executor/injector.rs rename to asynchronix/src/executor/mt_executor/injector.rs diff --git a/asynchronix/src/executor/pool_manager.rs b/asynchronix/src/executor/mt_executor/pool_manager.rs similarity index 100% rename from asynchronix/src/executor/pool_manager.rs rename to asynchronix/src/executor/mt_executor/pool_manager.rs diff --git a/asynchronix/src/executor/st_executor.rs b/asynchronix/src/executor/st_executor.rs new file mode 100644 index 0000000..ced8c9c --- /dev/null +++ b/asynchronix/src/executor/st_executor.rs @@ -0,0 +1,244 @@ +use std::cell::RefCell; +use std::fmt; +use std::future::Future; +use std::sync::atomic::Ordering; + +use slab::Slab; + +use super::task::{self, CancelToken, Promise, Runnable}; +use super::NEXT_EXECUTOR_ID; + +use crate::macros::scoped_thread_local::scoped_thread_local; + +const QUEUE_MIN_CAPACITY: usize = 32; + +scoped_thread_local!(static EXECUTOR_CONTEXT: ExecutorContext); +scoped_thread_local!(static ACTIVE_TASKS: RefCell>); + +/// A single-threaded `async` executor. +pub(crate) struct Executor { + /// Shared executor data. + context: ExecutorContext, + /// List of tasks that have not completed yet. + active_tasks: RefCell>, +} + +impl Executor { + /// Creates an executor that runs futures on the current thread. + pub(crate) fn new() -> Self { + // Each executor instance has a unique ID inherited by tasks to ensure + // that tasks are scheduled on their parent executor. + let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); + assert!( + executor_id <= usize::MAX / 2, + "too many executors have been instantiated" + ); + + let context = ExecutorContext::new(executor_id); + let active_tasks = RefCell::new(Slab::new()); + + Self { + context, + active_tasks, + } + } + + /// Spawns a task and returns a promise that can be polled to retrieve the + /// task's output. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn(&self, future: T) -> Promise + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.borrow_mut(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (promise, runnable, cancel_token) = + task::spawn(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + let mut queue = self.context.queue.borrow_mut(); + queue.push(runnable); + + promise + } + + /// Spawns a task which output will never be retrieved. + /// + /// This is mostly useful to avoid undue reference counting for futures that + /// return a `()` type. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn_and_forget(&self, future: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.borrow_mut(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (runnable, cancel_token) = + task::spawn_and_forget(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + let mut queue = self.context.queue.borrow_mut(); + queue.push(runnable); + } + + /// Execute spawned tasks, blocking until all futures have completed or + /// until the executor reaches a deadlock. + pub(crate) fn run(&mut self) { + ACTIVE_TASKS.set(&self.active_tasks, || { + EXECUTOR_CONTEXT.set(&self.context, || loop { + let task = match self.context.queue.borrow_mut().pop() { + Some(task) => task, + None => break, + }; + + task.run(); + }) + }); + } +} + +impl Drop for Executor { + fn drop(&mut self) { + // Drop all tasks that have not completed. + // + // The executor context must be set because some tasks may schedule + // other tasks when dropped, which requires that the work queue be + // available. + EXECUTOR_CONTEXT.set(&self.context, || { + // Cancel all pending futures. + // + // `ACTIVE_TASKS` is explicitly unset to prevent + // `CancellableFuture::drop()` from trying to remove its own token + // from the list of active tasks as this would result in a nested + // call to `borrow_mut` and thus a panic. This is mainly to stay on + // the safe side: `ACTIVE_TASKS` should not be set anyway, unless + // for some reason the executor runs inside another executor. + ACTIVE_TASKS.unset(|| { + let mut tasks = self.active_tasks.borrow_mut(); + for task in tasks.drain() { + task.cancel(); + } + + // Some of the dropped tasks may have scheduled other tasks that + // were not yet cancelled, preventing them from being dropped + // upon cancellation. This is OK: the scheduled tasks will be + // dropped when the work queue is dropped, and they cannot + // re-schedule one another since all tasks were cancelled. + }); + }); + } +} + +impl fmt::Debug for Executor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Executor").finish_non_exhaustive() + } +} + +/// Shared executor context. +/// +/// This contains all executor resources that can be shared between threads. +struct ExecutorContext { + /// Work queue. + queue: RefCell>, + /// Unique executor identifier inherited by all tasks spawned on this + /// executor instance. + executor_id: usize, +} + +impl ExecutorContext { + /// Creates a new shared executor context. + fn new(executor_id: usize) -> Self { + Self { + queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)), + executor_id, + } + } +} + +/// A `Future` wrapper that removes its cancellation token from the list of +/// active tasks when dropped. +struct CancellableFuture { + inner: T, + cancellation_key: usize, +} + +impl CancellableFuture { + /// Creates a new `CancellableFuture`. + fn new(fut: T, cancellation_key: usize) -> Self { + Self { + inner: fut, + cancellation_key, + } + } +} + +impl Future for CancellableFuture { + type Output = T::Output; + + #[inline(always)] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } + } +} + +impl Drop for CancellableFuture { + fn drop(&mut self) { + // Remove the task from the list of active tasks while the executor is + // running (meaning that `ACTIVE_TASK` is set). Otherwise do nothing and + // let the executor's drop handler do the cleanup. + let _ = ACTIVE_TASKS.map(|active_tasks| { + // Don't use `borrow_mut()` because this function can be called from + // a destructor and should not panic. In the worse case, the cancel + // token will be left in the list of active tasks, which does + // prevents eager task deallocation but does not cause any issue + // otherwise. + if let Ok(mut active_tasks) = active_tasks.try_borrow_mut() { + let _cancel_token = active_tasks.try_remove(self.cancellation_key); + } + }); + } +} + +/// Schedules a `Runnable` from within a worker thread. +/// +/// # Panics +/// +/// This function will panic if called from called outside from the executor +/// work thread or from another executor instance than the one the task for this +/// `Runnable` was spawned on. +fn schedule_task(task: Runnable, executor_id: usize) { + EXECUTOR_CONTEXT + .map(|context| { + // Check that this task was indeed spawned on this executor. + assert_eq!( + executor_id, context.executor_id, + "Tasks must be awaken on the same executor they are spawned on" + ); + + let mut queue = context.queue.borrow_mut(); + queue.push(task); + }) + .expect("Tasks may not be awaken outside executor threads"); +} diff --git a/asynchronix/src/executor/tests.rs b/asynchronix/src/executor/tests.rs deleted file mode 100644 index 7f63b46..0000000 --- a/asynchronix/src/executor/tests.rs +++ /dev/null @@ -1,140 +0,0 @@ -use futures_channel::{mpsc, oneshot}; -use futures_util::StreamExt; - -use super::*; - -/// An object that runs an arbitrary closure when dropped. -struct RunOnDrop { - drop_fn: Option, -} -impl RunOnDrop { - /// Creates a new `RunOnDrop`. - fn new(drop_fn: F) -> Self { - Self { - drop_fn: Some(drop_fn), - } - } -} -impl Drop for RunOnDrop { - fn drop(&mut self) { - self.drop_fn.take().map(|f| f()); - } -} - -#[test] -fn executor_deadlock() { - const NUM_THREADS: usize = 3; - - let (_sender1, receiver1) = oneshot::channel::<()>(); - let (_sender2, receiver2) = oneshot::channel::<()>(); - - let mut executor = Executor::new(NUM_THREADS); - static LAUNCH_COUNT: AtomicUsize = AtomicUsize::new(0); - static COMPLETION_COUNT: AtomicUsize = AtomicUsize::new(0); - - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver2.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver1.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - - executor.run(); - // Check that the executor returns on deadlock, i.e. none of the task has - // completed. - assert_eq!(LAUNCH_COUNT.load(Ordering::Relaxed), 2); - assert_eq!(COMPLETION_COUNT.load(Ordering::Relaxed), 0); -} - -#[test] -fn executor_deadlock_st() { - const NUM_THREADS: usize = 1; - - let (_sender1, receiver1) = oneshot::channel::<()>(); - let (_sender2, receiver2) = oneshot::channel::<()>(); - - let mut executor = Executor::new(NUM_THREADS); - static LAUNCH_COUNT: AtomicUsize = AtomicUsize::new(0); - static COMPLETION_COUNT: AtomicUsize = AtomicUsize::new(0); - - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver2.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver1.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - - executor.run(); - // Check that the executor returnes on deadlock, i.e. none of the task has - // completed. - assert_eq!(LAUNCH_COUNT.load(Ordering::Relaxed), 2); - assert_eq!(COMPLETION_COUNT.load(Ordering::Relaxed), 0); -} - -#[test] -fn executor_drop_cycle() { - const NUM_THREADS: usize = 3; - - let (sender1, mut receiver1) = mpsc::channel(2); - let (sender2, mut receiver2) = mpsc::channel(2); - let (sender3, mut receiver3) = mpsc::channel(2); - - let mut executor = Executor::new(NUM_THREADS); - static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); - - // Spawn 3 tasks that wake one another when dropped. - executor.spawn_and_forget({ - let mut sender2 = sender2.clone(); - let mut sender3 = sender3.clone(); - - async move { - let _guard = RunOnDrop::new(move || { - let _ = sender2.try_send(()); - let _ = sender3.try_send(()); - DROP_COUNT.fetch_add(1, Ordering::Relaxed); - }); - let _ = receiver1.next().await; - } - }); - executor.spawn_and_forget({ - let mut sender1 = sender1.clone(); - let mut sender3 = sender3.clone(); - - async move { - let _guard = RunOnDrop::new(move || { - let _ = sender1.try_send(()); - let _ = sender3.try_send(()); - DROP_COUNT.fetch_add(1, Ordering::Relaxed); - }); - let _ = receiver2.next().await; - } - }); - executor.spawn_and_forget({ - let mut sender1 = sender1.clone(); - let mut sender2 = sender2.clone(); - - async move { - let _guard = RunOnDrop::new(move || { - let _ = sender1.try_send(()); - let _ = sender2.try_send(()); - DROP_COUNT.fetch_add(1, Ordering::Relaxed); - }); - let _ = receiver3.next().await; - } - }); - - executor.run(); - - // Make sure that all tasks are eventually dropped even though each task - // wakes the others when dropped. - drop(executor); - assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3); -} diff --git a/asynchronix/src/executor/worker.rs b/asynchronix/src/executor/worker.rs deleted file mode 100644 index b815276..0000000 --- a/asynchronix/src/executor/worker.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::cell::Cell; -use std::sync::Arc; - -use super::task::Runnable; - -use super::ExecutorContext; -use super::LocalQueue; - -/// A local worker with access to global executor resources. -pub(crate) struct Worker { - pub(super) local_queue: LocalQueue, - pub(super) fast_slot: Cell>, - pub(super) executor_context: Arc, -} - -impl Worker { - /// Creates a new worker. - pub(super) fn new(local_queue: LocalQueue, executor_context: Arc) -> Self { - Self { - local_queue, - fast_slot: Cell::new(None), - executor_context, - } - } -} diff --git a/asynchronix/src/macros/scoped_thread_local.rs b/asynchronix/src/macros/scoped_thread_local.rs index b60b287..587a8f8 100644 --- a/asynchronix/src/macros/scoped_thread_local.rs +++ b/asynchronix/src/macros/scoped_thread_local.rs @@ -7,19 +7,18 @@ use std::ptr; /// Declare a new thread-local storage scoped key of type `ScopedKey`. /// /// This is based on the `scoped-tls` crate, with slight modifications, such as -/// the use of the newly available `const` qualifier for TLS. +/// the addition of a `ScopedLocalKey::unset` method and the use of a `map` +/// method that returns `Option::None` when the value is not set, rather than +/// panicking as `with` would. macro_rules! scoped_thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => ( $(#[$attrs])* $vis static $name: $crate::macros::scoped_thread_local::ScopedLocalKey<$ty> - = $crate::macros::scoped_thread_local::ScopedLocalKey { - inner: { - thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { - std::cell::Cell::new(::std::ptr::null()) - }); - &FOO - }, - _marker: ::std::marker::PhantomData, + = unsafe { + ::std::thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { + ::std::cell::Cell::new(::std::ptr::null()) + }); + $crate::macros::scoped_thread_local::ScopedLocalKey::new(&FOO) }; ) } @@ -28,13 +27,24 @@ pub(crate) use scoped_thread_local; /// Type representing a thread local storage key corresponding to a reference /// to the type parameter `T`. pub(crate) struct ScopedLocalKey { - pub(crate) inner: &'static LocalKey>, - pub(crate) _marker: marker::PhantomData, + inner: &'static LocalKey>, + _marker: marker::PhantomData, } unsafe impl Sync for ScopedLocalKey {} impl ScopedLocalKey { + #[doc(hidden)] + /// # Safety + /// + /// Should only be called through the public macro. + pub(crate) const unsafe fn new(inner: &'static LocalKey>) -> Self { + Self { + inner, + _marker: marker::PhantomData, + } + } + /// Inserts a value into this scoped thread local storage slot for the /// duration of a closure. pub(crate) fn set(&'static self, t: &T, f: F) -> R diff --git a/asynchronix/src/rpc/generic_server.rs b/asynchronix/src/rpc/generic_server.rs index 6f54a93..70032c4 100644 --- a/asynchronix/src/rpc/generic_server.rs +++ b/asynchronix/src/rpc/generic_server.rs @@ -14,8 +14,8 @@ use super::codegen::simulation::*; /// Transport-independent server implementation. /// -/// This implementation implements the protobuf services without any -/// transport-specific management. +/// This implements the protobuf services without any transport-specific +/// management. pub(crate) struct GenericServer { sim_gen: F, sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>, diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index a8527ca..f22e1fc 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -25,15 +25,22 @@ impl SimInit { Self::with_num_threads(num_cpus::get()) } - /// Creates a builder for a multithreaded simulation running on the - /// specified number of threads. + /// Creates a builder for a simulation running on the specified number of + /// threads. + /// + /// Note that the number of worker threads is automatically constrained to + /// be between 1 and `usize::BITS` (inclusive). pub fn with_num_threads(num_threads: usize) -> Self { - // The current executor's implementation caps the number of thread to 64 - // on 64-bit systems and 32 on 32-bit systems. - let num_threads = num_threads.min(usize::BITS as usize); + let num_threads = num_threads.clamp(1, usize::BITS as usize); + + let executor = if num_threads == 1 { + Executor::new_single_threaded() + } else { + Executor::new_multi_threaded(num_threads) + }; Self { - executor: Executor::new(num_threads), + executor, scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time: SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)), clock: Box::new(NoClock::new()),