1
0
forked from ROMEO/nexosim

Add same-thread executor support

This commit is contained in:
Serge Barral 2024-05-07 17:30:11 +02:00
parent 88d954dde5
commit 77e6e569ff
12 changed files with 1066 additions and 696 deletions

View File

@ -70,23 +70,53 @@ jobs:
with: with:
components: miri components: miri
- name: Run cargo miri tests - name: Run cargo miri tests (single-threaded executor)
run: cargo miri test --tests --lib --features="rpc grpc-server"
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri tests (multi-threaded executor)
run: cargo miri test --tests --lib --features="rpc grpc-server" run: cargo miri test --tests --lib --features="rpc grpc-server"
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example1 - name: Run cargo miri example1 (single-threaded executor)
run: cargo miri run --example espresso_machine
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example1 (multi-threaded executor)
run: cargo miri run --example espresso_machine run: cargo miri run --example espresso_machine
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example2 - name: Run cargo miri example2 (single-threaded executor)
run: cargo miri run --example power_supply
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example2 (multi-threaded executor)
run: cargo miri run --example power_supply run: cargo miri run --example power_supply
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example3 - name: Run cargo miri example3 (single-threaded executor)
run: cargo miri run --example stepper_motor run: cargo miri run --example stepper_motor
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example3 (multi-threaded executor)
run: cargo miri run --example stepper_motor
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example4 (single-threaded executor)
run: cargo miri run --example assembly
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example4 (multi-threaded executor)
run: cargo miri run --example assembly
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4

View File

@ -15,7 +15,7 @@ impl Executor {
/// ///
/// The maximum number of threads is set with the `pool_size` parameter. /// The maximum number of threads is set with the `pool_size` parameter.
pub fn new(pool_size: usize) -> Self { pub fn new(pool_size: usize) -> Self {
Self(executor::Executor::new(pool_size)) Self(executor::Executor::new_multi_threaded(pool_size))
} }
/// Spawns a task which output will never be retrieved. /// Spawns a task which output will never be retrieved.

View File

@ -1,98 +1,30 @@
//! Multi-threaded `async` executor. //! `async` executor trait.
//!
//! The executor is exclusively designed for message-passing computational
//! tasks. As such, it does not include an I/O reactor and does not consider
//! fairness as a goal in itself. While it does use fair local queues inasmuch
//! as these tend to perform better in message-passing applications, it uses an
//! unfair injection queue and a LIFO slot without attempt to mitigate the
//! effect of badly behaving code (e.g. futures that spin-lock by yielding to
//! the executor; there is for this reason no support for something like tokio's
//! `yield_now`).
//!
//! Another way in which it differs from other `async` executors is that it
//! treats deadlocking as a normal occurrence. This is because in a
//! discrete-time simulator, the simulation of a system at a given time step
//! will make as much progress as possible until it technically reaches a
//! deadlock. Only then does the simulator advance the simulated time to that of
//! the next "event" extracted from a time-sorted priority queue.
//!
//! The design of the executor is largely influenced by the tokio and Go
//! schedulers, both of which are optimized for message-passing applications. In
//! particular, it uses fast, fixed-size thread-local work-stealing queues with
//! a non-stealable LIFO slot in combination with an injector queue, which
//! injector queue is used both to schedule new tasks and to absorb temporary
//! overflow in the local queues.
//!
//! The design of the injector queue is kept very simple compared to tokio, by
//! taking advantage of the fact that the injector is not required to be either
//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast
//! because tasks are moved in batch and are stored contiguously in memory.
//!
//! Another difference with tokio is that, at the moment, the complete subset of
//! active worker threads is stored in a single atomic variable. This makes it
//! possible to rapidly identify free worker threads for stealing operations,
//! with the downside that the maximum number of worker threads is currently
//! limited to `usize::BITS`. This is not expected to constitute a limitation in
//! practice since system simulation is not typically embarrassingly parallel.
//!
//! Probably the largest difference with tokio is the task system, which has
//! better throughput due to less need for synchronization. This mainly results
//! from the use of an atomic notification counter rather than an atomic
//! notification flag, thus alleviating the need to reset the notification flag
//! before polling a future.
use std::fmt; mod mt_executor;
use std::future::Future; mod st_executor;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker};
use slab::Slab;
mod injector;
mod pool_manager;
mod task; mod task;
mod worker;
#[cfg(all(test, not(asynchronix_loom)))] use std::future::Future;
mod tests; use std::sync::atomic::AtomicUsize;
use crate::macros::scoped_thread_local::scoped_thread_local; use task::Promise;
use crate::util::rng::Rng;
use self::pool_manager::PoolManager;
use self::task::{CancelToken, Promise, Runnable};
use self::worker::Worker;
const BUCKET_SIZE: usize = 128;
const QUEUE_SIZE: usize = BUCKET_SIZE * 2;
/// Unique identifier for executor instances.
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
type Bucket = injector::Bucket<Runnable, BUCKET_SIZE>; /// A single-threaded or multi-threaded `async` executor.
type Injector = injector::Injector<Runnable, BUCKET_SIZE>; #[derive(Debug)]
type LocalQueue = st3::fifo::Worker<Runnable>; pub(crate) enum Executor {
type Stealer = st3::fifo::Stealer<Runnable>; StExecutor(st_executor::Executor),
MtExecutor(mt_executor::Executor),
scoped_thread_local!(static LOCAL_WORKER: Worker);
scoped_thread_local!(static ACTIVE_TASKS: Mutex<Slab<CancelToken>>);
/// A multi-threaded `async` executor.
pub(crate) struct Executor {
/// Shared executor data.
context: Arc<ExecutorContext>,
/// List of tasks that have not completed yet.
active_tasks: Arc<Mutex<Slab<CancelToken>>>,
/// Parker for the main executor thread.
parker: Parker,
/// Handles to the worker threads.
worker_handles: Vec<JoinHandle<()>>,
} }
impl Executor { impl Executor {
/// Creates an executor that runs futures on the current thread.
pub(crate) fn new_single_threaded() -> Self {
Self::StExecutor(st_executor::Executor::new())
}
/// Creates an executor that runs futures on a thread pool. /// Creates an executor that runs futures on a thread pool.
/// ///
/// The maximum number of threads is set with the `num_threads` parameter. /// The maximum number of threads is set with the `num_threads` parameter.
@ -101,78 +33,11 @@ impl Executor {
/// ///
/// This will panic if the specified number of threads is zero or is more /// This will panic if the specified number of threads is zero or is more
/// than `usize::BITS`. /// than `usize::BITS`.
pub(crate) fn new(num_threads: usize) -> Self { pub(crate) fn new_multi_threaded(num_threads: usize) -> Self {
let parker = Parker::new(); Self::MtExecutor(mt_executor::Executor::new(num_threads))
let unparker = parker.unparker().clone();
let (local_queues_and_parkers, stealers_and_unparkers): (Vec<_>, Vec<_>) = (0..num_threads)
.map(|_| {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let local_queue = LocalQueue::new(QUEUE_SIZE);
let stealer = local_queue.stealer();
((local_queue, parker), (stealer, unparker))
})
.unzip();
// Each executor instance has a unique ID inherited by tasks to ensure
// that tasks are scheduled on their parent executor.
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
assert!(
executor_id <= usize::MAX / 2,
"too many executors have been instantiated"
);
let context = Arc::new(ExecutorContext::new(
executor_id,
unparker,
stealers_and_unparkers.into_iter(),
));
let active_tasks = Arc::new(Mutex::new(Slab::new()));
// All workers must be marked as active _before_ spawning the threads to
// make sure that the count of active workers does not fall to zero
// before all workers are blocked on the signal barrier.
context.pool_manager.set_all_workers_active();
// Spawn all worker threads.
let worker_handles: Vec<_> = local_queues_and_parkers
.into_iter()
.enumerate()
.map(|(id, (local_queue, worker_parker))| {
let thread_builder = thread::Builder::new().name(format!("Worker #{}", id));
thread_builder
.spawn({
let context = context.clone();
let active_tasks = active_tasks.clone();
move || {
let worker = Worker::new(local_queue, context);
ACTIVE_TASKS.set(&active_tasks, || {
LOCAL_WORKER
.set(&worker, || run_local_worker(&worker, id, worker_parker))
});
}
})
.unwrap()
})
.collect();
// Wait until all workers are blocked on the signal barrier.
parker.park();
assert!(context.pool_manager.pool_is_idle());
Self {
context,
active_tasks,
parker,
worker_handles,
}
} }
/// Spawns a task and returns a promise that can be polled to retrieve the /// Spawns a task which output will never be retrieved.
/// task's output.
/// ///
/// Note that spawned tasks are not executed until [`run()`](Executor::run) /// Note that spawned tasks are not executed until [`run()`](Executor::run)
/// is called. /// is called.
@ -182,28 +47,14 @@ impl Executor {
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
// Book a slot to store the task cancellation token. match self {
let mut active_tasks = self.active_tasks.lock().unwrap(); Self::StExecutor(executor) => executor.spawn(future),
let task_entry = active_tasks.vacant_entry(); Self::MtExecutor(executor) => executor.spawn(future),
}
// Wrap the future so that it removes its cancel token from the
// executor's list when dropped.
let future = CancellableFuture::new(future, task_entry.key());
let (promise, runnable, cancel_token) =
task::spawn(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token);
self.context.injector.insert_task(runnable);
promise
} }
/// Spawns a task which output will never be retrieved. /// Spawns a task which output will never be retrieved.
/// ///
/// This is mostly useful to avoid undue reference counting for futures that
/// return a `()` type.
///
/// Note that spawned tasks are not executed until [`run()`](Executor::run) /// Note that spawned tasks are not executed until [`run()`](Executor::run)
/// is called. /// is called.
pub(crate) fn spawn_and_forget<T>(&self, future: T) pub(crate) fn spawn_and_forget<T>(&self, future: T)
@ -211,354 +62,171 @@ impl Executor {
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
// Book a slot to store the task cancellation token. match self {
let mut active_tasks = self.active_tasks.lock().unwrap(); Self::StExecutor(executor) => executor.spawn_and_forget(future),
let task_entry = active_tasks.vacant_entry(); Self::MtExecutor(executor) => executor.spawn_and_forget(future),
}
// Wrap the future so that it removes its cancel token from the
// executor's list when dropped.
let future = CancellableFuture::new(future, task_entry.key());
let (runnable, cancel_token) =
task::spawn_and_forget(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token);
self.context.injector.insert_task(runnable);
} }
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock. /// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) { pub(crate) fn run(&mut self) {
self.context.pool_manager.activate_worker(); match self {
Self::StExecutor(executor) => executor.run(),
loop { Self::MtExecutor(executor) => executor.run(),
if let Some(worker_panic) = self.context.pool_manager.take_panic() {
panic::resume_unwind(worker_panic);
}
if self.context.pool_manager.pool_is_idle() {
return;
}
self.parker.park();
} }
} }
} }
impl Drop for Executor { #[cfg(all(test, not(asynchronix_loom)))]
fn drop(&mut self) { mod tests {
// Force all threads to return. use std::sync::atomic::Ordering;
self.context.pool_manager.trigger_termination(); use std::sync::Arc;
for handle in self.worker_handles.drain(0..) {
handle.join().unwrap();
}
// Drop all tasks that have not completed. use futures_channel::{mpsc, oneshot};
// use futures_util::StreamExt;
// A local worker must be set because some tasks may schedule other
// tasks when dropped, which requires that a local worker be available.
let worker = Worker::new(LocalQueue::new(QUEUE_SIZE), self.context.clone());
LOCAL_WORKER.set(&worker, || {
// Cancel all pending futures.
//
// `ACTIVE_TASKS` is explicitly unset to prevent
// `CancellableFuture::drop()` from trying to remove its own token
// from the list of active tasks as this would result in a reentrant
// lock. This is mainly to stay on the safe side: `ACTIVE_TASKS`
// should not be set on this thread anyway, unless for some reason
// the executor runs inside another executor.
ACTIVE_TASKS.unset(|| {
let mut tasks = self.active_tasks.lock().unwrap();
for task in tasks.drain() {
task.cancel();
}
// Some of the dropped tasks may have scheduled other tasks that use super::*;
// were not yet cancelled, preventing them from being dropped
// upon cancellation. This is OK: the scheduled tasks will be
// dropped when the local and injector queues are dropped, and
// they cannot re-schedule one another since all tasks were
// cancelled.
});
});
}
}
impl fmt::Debug for Executor { /// An object that runs an arbitrary closure when dropped.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { struct RunOnDrop<F: FnOnce()> {
f.debug_struct("Executor").finish_non_exhaustive() drop_fn: Option<F>,
} }
} impl<F: FnOnce()> RunOnDrop<F> {
/// Creates a new `RunOnDrop`.
/// Shared executor context. fn new(drop_fn: F) -> Self {
///
/// This contains all executor resources that can be shared between threads.
struct ExecutorContext {
/// Injector queue.
injector: Injector,
/// Unique executor ID inherited by all tasks spawned on this executor instance.
executor_id: usize,
/// Unparker for the main executor thread.
executor_unparker: Unparker,
/// Manager for all worker threads.
pool_manager: PoolManager,
}
impl ExecutorContext {
/// Creates a new shared executor context.
pub(super) fn new(
executor_id: usize,
executor_unparker: Unparker,
stealers_and_unparkers: impl Iterator<Item = (Stealer, Unparker)>,
) -> Self {
let (stealers, worker_unparkers): (Vec<_>, Vec<_>) =
stealers_and_unparkers.into_iter().unzip();
let worker_unparkers = worker_unparkers.into_boxed_slice();
Self { Self {
injector: Injector::new(), drop_fn: Some(drop_fn),
executor_id,
executor_unparker,
pool_manager: PoolManager::new(
worker_unparkers.len(),
stealers.into_boxed_slice(),
worker_unparkers,
),
} }
} }
} }
impl<F: FnOnce()> Drop for RunOnDrop<F> {
/// A `Future` wrapper that removes its cancellation token from the list of
/// active tasks when dropped.
struct CancellableFuture<T: Future> {
inner: T,
cancellation_key: usize,
}
impl<T: Future> CancellableFuture<T> {
/// Creates a new `CancellableFuture`.
fn new(fut: T, cancellation_key: usize) -> Self {
Self {
inner: fut,
cancellation_key,
}
}
}
impl<T: Future> Future for CancellableFuture<T> {
type Output = T::Output;
#[inline(always)]
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) }
}
}
impl<T: Future> Drop for CancellableFuture<T> {
fn drop(&mut self) { fn drop(&mut self) {
// Remove the task from the list of active tasks if the future is self.drop_fn.take().map(|f| f());
// dropped on a worker thread. Otherwise do nothing and let the }
// executor's drop handler do the cleanup. }
let _ = ACTIVE_TASKS.map(|active_tasks| {
// Don't unwrap on `lock()` because this function can be called from fn executor_deadlock(mut executor: Executor) {
// a destructor and should not panic. In the worse case, the cancel let (_sender1, receiver1) = oneshot::channel::<()>();
// token will be left in the list of active tasks, which does let (_sender2, receiver2) = oneshot::channel::<()>();
// prevents eager task deallocation but does not cause any issue
// otherwise. let launch_count = Arc::new(AtomicUsize::new(0));
if let Ok(mut active_tasks) = active_tasks.lock() { let completion_count = Arc::new(AtomicUsize::new(0));
let _cancel_token = active_tasks.try_remove(self.cancellation_key);
executor.spawn_and_forget({
let launch_count = launch_count.clone();
let completion_count = completion_count.clone();
async move {
launch_count.fetch_add(1, Ordering::Relaxed);
let _ = receiver2.await;
completion_count.fetch_add(1, Ordering::Relaxed);
} }
}); });
executor.spawn_and_forget({
let launch_count = launch_count.clone();
let completion_count = completion_count.clone();
async move {
launch_count.fetch_add(1, Ordering::Relaxed);
let _ = receiver1.await;
completion_count.fetch_add(1, Ordering::Relaxed);
} }
});
executor.run();
// Check that the executor returns on deadlock, i.e. none of the task has
// completed.
assert_eq!(launch_count.load(Ordering::Relaxed), 2);
assert_eq!(completion_count.load(Ordering::Relaxed), 0);
// Drop the executor and thus the receiver tasks before the senders,
// failing which the senders may signal that the channel has been
// dropped and wake the tasks outside the executor.
drop(executor);
} }
/// Schedules a `Runnable` from within a worker thread. fn executor_drop_cycle(mut executor: Executor) {
/// let (sender1, mut receiver1) = mpsc::channel(2);
/// # Panics let (sender2, mut receiver2) = mpsc::channel(2);
/// let (sender3, mut receiver3) = mpsc::channel(2);
/// This function will panic if called from a non-worker thread or if called
/// from the worker thread of another executor instance than the one the task
/// for this `Runnable` was spawned on.
fn schedule_task(task: Runnable, executor_id: usize) {
LOCAL_WORKER
.map(|worker| {
let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector;
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
// Check that this task was indeed spawned on this executor. let drop_count = Arc::new(AtomicUsize::new(0));
assert_eq!(
executor_id, worker.executor_context.executor_id,
"Tasks must be awaken on the same executor they are spawned on"
);
// Store the task in the fast slot and retrieve the one that was // Spawn 3 tasks that wake one another when dropped.
// formerly stored, if any. executor.spawn_and_forget({
let prev_task = match fast_slot.replace(Some(task)) { let mut sender2 = sender2.clone();
// If there already was a task in the slot, proceed so it can be let mut sender3 = sender3.clone();
// moved to a task queue. let drop_count = drop_count.clone();
Some(t) => t,
// Otherwise return immediately: this task cannot be stolen so
// there is no point in activating a sibling worker.
None => return,
};
// Push the previous task to the local queue if possible or on the async move {
// injector queue otherwise. let _guard = RunOnDrop::new(move || {
if let Err(prev_task) = local_queue.push(prev_task) { let _ = sender2.try_send(());
// The local queue is full. Try to move half of it to the let _ = sender3.try_send(());
// injector queue; if this fails, just push one task to the drop_count.fetch_add(1, Ordering::Relaxed);
// injector queue. });
if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) { let _ = receiver1.next().await;
injector.push_bucket(Bucket::from_iter(drain));
local_queue.push(prev_task).unwrap();
} else {
injector.insert_task(prev_task);
} }
});
executor.spawn_and_forget({
let mut sender1 = sender1.clone();
let mut sender3 = sender3.clone();
let drop_count = drop_count.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender1.try_send(());
let _ = sender3.try_send(());
drop_count.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver2.next().await;
}
});
executor.spawn_and_forget({
let mut sender1 = sender1.clone();
let mut sender2 = sender2.clone();
let drop_count = drop_count.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender1.try_send(());
let _ = sender2.try_send(());
drop_count.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver3.next().await;
}
});
executor.run();
// Make sure that all tasks are eventually dropped even though each task
// wakes the others when dropped.
drop(executor);
assert_eq!(drop_count.load(Ordering::Relaxed), 3);
} }
// A task has been pushed to the local or injector queue: try to #[test]
// activate another worker if no worker is currently searching for a fn executor_deadlock_st() {
// task. executor_deadlock(Executor::new_single_threaded());
if pool_manager.searching_worker_count() == 0 {
pool_manager.activate_worker_relaxed();
}
})
.expect("Tasks may not be awaken outside executor threads");
} }
/// Processes all incoming tasks on a worker thread until the `Terminate` signal #[test]
/// is received or until it panics. fn executor_deadlock_mt() {
/// executor_deadlock(Executor::new_multi_threaded(3));
/// Panics caught in this thread are relayed to the main executor thread.
fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector;
let executor_unparker = &worker.executor_context.executor_unparker;
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
let result = panic::catch_unwind(AssertUnwindSafe(|| {
// Set how long to spin when searching for a task.
const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000);
// Seed a thread RNG with the worker ID.
let rng = Rng::new(id as u64);
loop {
// Signal barrier: park until notified to continue or terminate.
// Try to deactivate the worker.
if pool_manager.try_set_worker_inactive(id) {
parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
} else if injector.is_empty() {
// This worker could not be deactivated because it was the last
// active worker. In such case, the call to
// `try_set_worker_inactive` establishes a synchronization with
// all threads that pushed tasks to the injector queue but could
// not activate a new worker, which is why some tasks may now be
// visible in the injector queue.
pool_manager.set_all_workers_inactive();
executor_unparker.unpark();
parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
} else {
pool_manager.begin_worker_search();
} }
if pool_manager.termination_is_triggered() { #[test]
return; fn executor_deadlock_mt_one_worker() {
executor_deadlock(Executor::new_multi_threaded(1));
}
#[test]
fn executor_drop_cycle_st() {
executor_drop_cycle(Executor::new_single_threaded());
} }
let mut search_start = Instant::now(); #[test]
fn executor_drop_cycle_mt() {
// Process the tasks one by one. executor_drop_cycle(Executor::new_multi_threaded(3));
loop {
// Check the injector queue first.
if let Some(bucket) = injector.pop_bucket() {
let bucket_iter = bucket.into_iter();
// There is a _very_ remote possibility that, even though
// the local queue is empty, it has temporarily too little
// spare capacity for the bucket. This could happen if a
// concurrent steal operation was preempted for all the time
// it took to pop and process the remaining tasks and it
// hasn't released the stolen capacity yet.
//
// Unfortunately, we cannot just skip checking the injector
// queue altogether when there isn't enough spare capacity
// in the local queue because this could lead to a race:
// suppose that (1) this thread has earlier pushed tasks
// onto the injector queue, and (2) the stealer has
// processed all stolen tasks before this thread sees the
// capacity restored and at the same time (3) the stealer
// does not yet see the tasks this thread pushed to the
// injector queue; in such scenario, both this thread and
// the stealer thread may park and leave unprocessed tasks
// in the injector queue.
//
// This is the only instance where spinning is used, as the
// probability of this happening is close to zero and the
// complexity of a signaling mechanism (condvar & friends)
// wouldn't carry its weight.
while local_queue.spare_capacity() < bucket_iter.len() {}
// Since empty buckets are never pushed onto the injector
// queue, we should now have at least one task to process.
local_queue.extend(bucket_iter);
} else {
// The injector queue is empty. Try to steal from active
// siblings.
let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng);
if stealers.all(|stealer| {
stealer
.steal_and_pop(local_queue, |n| n - n / 2)
.map(|(task, _)| {
let prev_task = fast_slot.replace(Some(task));
assert!(prev_task.is_none());
})
.is_err()
}) {
// Give up if unsuccessful for too long.
if (Instant::now() - search_start) > MAX_SEARCH_DURATION {
pool_manager.end_worker_search();
break;
}
// Re-try.
continue;
}
}
// Signal the end of the search so that another worker can be
// activated when a new task is scheduled.
pool_manager.end_worker_search();
// Pop tasks from the fast slot or the local queue.
while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) {
if pool_manager.termination_is_triggered() {
return;
}
task.run();
}
// Resume the search for tasks.
pool_manager.begin_worker_search();
search_start = Instant::now();
}
}
}));
// Propagate the panic, if any.
if let Err(panic) = result {
pool_manager.register_panic(panic);
pool_manager.trigger_termination();
executor_unparker.unpark();
} }
} }

View File

@ -0,0 +1,576 @@
//! Multi-threaded `async` executor.
//!
//! The executor is exclusively designed for message-passing computational
//! tasks. As such, it does not include an I/O reactor and does not consider
//! fairness as a goal in itself. While it does use fair local queues inasmuch
//! as these tend to perform better in message-passing applications, it uses an
//! unfair injection queue and a LIFO slot without attempt to mitigate the
//! effect of badly behaving code (e.g. futures that spin-lock by yielding to
//! the executor; there is for this reason no support for something like tokio's
//! `yield_now`).
//!
//! Another way in which it differs from other `async` executors is that it
//! treats deadlocking as a normal occurrence. This is because in a
//! discrete-time simulator, the simulation of a system at a given time step
//! will make as much progress as possible until it technically reaches a
//! deadlock. Only then does the simulator advance the simulated time to that of
//! the next "event" extracted from a time-sorted priority queue.
//!
//! The design of the executor is largely influenced by the tokio and Go
//! schedulers, both of which are optimized for message-passing applications. In
//! particular, it uses fast, fixed-size thread-local work-stealing queues with
//! a non-stealable LIFO slot in combination with an injector queue, which
//! injector queue is used both to schedule new tasks and to absorb temporary
//! overflow in the local queues.
//!
//! The design of the injector queue is kept very simple compared to tokio, by
//! taking advantage of the fact that the injector is not required to be either
//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast
//! because tasks are moved in batch and are stored contiguously in memory.
//!
//! Another difference with tokio is that, at the moment, the complete subset of
//! active worker threads is stored in a single atomic variable. This makes it
//! possible to rapidly identify free worker threads for stealing operations,
//! with the downside that the maximum number of worker threads is currently
//! limited to `usize::BITS`. This is not expected to constitute a limitation in
//! practice since system simulation is not typically embarrassingly parallel.
//!
//! Probably the largest difference with tokio is the task system, which has
//! better throughput due to less need for synchronization. This mainly results
//! from the use of an atomic notification counter rather than an atomic
//! notification flag, thus alleviating the need to reset the notification flag
//! before polling a future.
mod injector;
mod pool_manager;
use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker};
use slab::Slab;
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::util::rng::Rng;
use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID;
use pool_manager::PoolManager;
const BUCKET_SIZE: usize = 128;
const QUEUE_SIZE: usize = BUCKET_SIZE * 2;
type Bucket = injector::Bucket<Runnable, BUCKET_SIZE>;
type Injector = injector::Injector<Runnable, BUCKET_SIZE>;
type LocalQueue = st3::fifo::Worker<Runnable>;
type Stealer = st3::fifo::Stealer<Runnable>;
scoped_thread_local!(static LOCAL_WORKER: Worker);
scoped_thread_local!(static ACTIVE_TASKS: Mutex<Slab<CancelToken>>);
/// A multi-threaded `async` executor.
pub(crate) struct Executor {
/// Shared executor data.
context: Arc<ExecutorContext>,
/// List of tasks that have not completed yet.
active_tasks: Arc<Mutex<Slab<CancelToken>>>,
/// Parker for the main executor thread.
parker: Parker,
/// Handles to the worker threads.
worker_handles: Vec<JoinHandle<()>>,
}
impl Executor {
/// Creates an executor that runs futures on a thread pool.
///
/// The maximum number of threads is set with the `num_threads` parameter.
///
/// # Panics
///
/// This will panic if the specified number of threads is zero or is more
/// than `usize::BITS`.
pub(crate) fn new(num_threads: usize) -> Self {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let (local_queues_and_parkers, stealers_and_unparkers): (Vec<_>, Vec<_>) = (0..num_threads)
.map(|_| {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let local_queue = LocalQueue::new(QUEUE_SIZE);
let stealer = local_queue.stealer();
((local_queue, parker), (stealer, unparker))
})
.unzip();
// Each executor instance has a unique ID inherited by tasks to ensure
// that tasks are scheduled on their parent executor.
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
assert!(
executor_id <= usize::MAX / 2,
"too many executors have been instantiated"
);
let context = Arc::new(ExecutorContext::new(
executor_id,
unparker,
stealers_and_unparkers.into_iter(),
));
let active_tasks = Arc::new(Mutex::new(Slab::new()));
// All workers must be marked as active _before_ spawning the threads to
// make sure that the count of active workers does not fall to zero
// before all workers are blocked on the signal barrier.
context.pool_manager.set_all_workers_active();
// Spawn all worker threads.
let worker_handles: Vec<_> = local_queues_and_parkers
.into_iter()
.enumerate()
.map(|(id, (local_queue, worker_parker))| {
let thread_builder = thread::Builder::new().name(format!("Worker #{}", id));
thread_builder
.spawn({
let context = context.clone();
let active_tasks = active_tasks.clone();
move || {
let worker = Worker::new(local_queue, context);
ACTIVE_TASKS.set(&active_tasks, || {
LOCAL_WORKER
.set(&worker, || run_local_worker(&worker, id, worker_parker))
});
}
})
.unwrap()
})
.collect();
// Wait until all workers are blocked on the signal barrier.
parker.park();
assert!(context.pool_manager.pool_is_idle());
Self {
context,
active_tasks,
parker,
worker_handles,
}
}
/// Spawns a task and returns a promise that can be polled to retrieve the
/// task's output.
///
/// Note that spawned tasks are not executed until [`run()`](Executor::run)
/// is called.
pub(crate) fn spawn<T>(&self, future: T) -> Promise<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
// Book a slot to store the task cancellation token.
let mut active_tasks = self.active_tasks.lock().unwrap();
let task_entry = active_tasks.vacant_entry();
// Wrap the future so that it removes its cancel token from the
// executor's list when dropped.
let future = CancellableFuture::new(future, task_entry.key());
let (promise, runnable, cancel_token) =
task::spawn(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token);
self.context.injector.insert_task(runnable);
promise
}
/// Spawns a task which output will never be retrieved.
///
/// This is mostly useful to avoid undue reference counting for futures that
/// return a `()` type.
///
/// Note that spawned tasks are not executed until [`run()`](Executor::run)
/// is called.
pub(crate) fn spawn_and_forget<T>(&self, future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
// Book a slot to store the task cancellation token.
let mut active_tasks = self.active_tasks.lock().unwrap();
let task_entry = active_tasks.vacant_entry();
// Wrap the future so that it removes its cancel token from the
// executor's list when dropped.
let future = CancellableFuture::new(future, task_entry.key());
let (runnable, cancel_token) =
task::spawn_and_forget(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token);
self.context.injector.insert_task(runnable);
}
/// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) {
self.context.pool_manager.activate_worker();
loop {
if let Some(worker_panic) = self.context.pool_manager.take_panic() {
panic::resume_unwind(worker_panic);
}
if self.context.pool_manager.pool_is_idle() {
return;
}
self.parker.park();
}
}
}
impl Drop for Executor {
fn drop(&mut self) {
// Force all threads to return.
self.context.pool_manager.trigger_termination();
for handle in self.worker_handles.drain(0..) {
handle.join().unwrap();
}
// Drop all tasks that have not completed.
//
// A local worker must be set because some tasks may schedule other
// tasks when dropped, which requires that a local worker be available.
let worker = Worker::new(LocalQueue::new(QUEUE_SIZE), self.context.clone());
LOCAL_WORKER.set(&worker, || {
// Cancel all pending futures.
//
// `ACTIVE_TASKS` is explicitly unset to prevent
// `CancellableFuture::drop()` from trying to remove its own token
// from the list of active tasks as this would result in a reentrant
// lock. This is mainly to stay on the safe side: `ACTIVE_TASKS`
// should not be set on this thread anyway, unless for some reason
// the executor runs inside another executor.
ACTIVE_TASKS.unset(|| {
let mut tasks = self.active_tasks.lock().unwrap();
for task in tasks.drain() {
task.cancel();
}
// Some of the dropped tasks may have scheduled other tasks that
// were not yet cancelled, preventing them from being dropped
// upon cancellation. This is OK: the scheduled tasks will be
// dropped when the local and injector queues are dropped, and
// they cannot re-schedule one another since all tasks were
// cancelled.
});
});
}
}
impl fmt::Debug for Executor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Executor").finish_non_exhaustive()
}
}
/// Shared executor context.
///
/// This contains all executor resources that can be shared between threads.
struct ExecutorContext {
/// Injector queue.
injector: Injector,
/// Unique executor identifier inherited by all tasks spawned on this
/// executor instance.
executor_id: usize,
/// Unparker for the main executor thread.
executor_unparker: Unparker,
/// Manager for all worker threads.
pool_manager: PoolManager,
}
impl ExecutorContext {
/// Creates a new shared executor context.
pub(super) fn new(
executor_id: usize,
executor_unparker: Unparker,
stealers_and_unparkers: impl Iterator<Item = (Stealer, Unparker)>,
) -> Self {
let (stealers, worker_unparkers): (Vec<_>, Vec<_>) =
stealers_and_unparkers.into_iter().unzip();
let worker_unparkers = worker_unparkers.into_boxed_slice();
Self {
injector: Injector::new(),
executor_id,
executor_unparker,
pool_manager: PoolManager::new(
worker_unparkers.len(),
stealers.into_boxed_slice(),
worker_unparkers,
),
}
}
}
/// A `Future` wrapper that removes its cancellation token from the list of
/// active tasks when dropped.
struct CancellableFuture<T: Future> {
inner: T,
cancellation_key: usize,
}
impl<T: Future> CancellableFuture<T> {
/// Creates a new `CancellableFuture`.
fn new(fut: T, cancellation_key: usize) -> Self {
Self {
inner: fut,
cancellation_key,
}
}
}
impl<T: Future> Future for CancellableFuture<T> {
type Output = T::Output;
#[inline(always)]
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) }
}
}
impl<T: Future> Drop for CancellableFuture<T> {
fn drop(&mut self) {
// Remove the task from the list of active tasks if the future is
// dropped on a worker thread. Otherwise do nothing and let the
// executor's drop handler do the cleanup.
let _ = ACTIVE_TASKS.map(|active_tasks| {
// Don't unwrap on `lock()` because this function can be called from
// a destructor and should not panic. In the worse case, the cancel
// token will be left in the list of active tasks, which does
// prevents eager task deallocation but does not cause any issue
// otherwise.
if let Ok(mut active_tasks) = active_tasks.lock() {
let _cancel_token = active_tasks.try_remove(self.cancellation_key);
}
});
}
}
/// A local worker with access to global executor resources.
pub(crate) struct Worker {
local_queue: LocalQueue,
fast_slot: Cell<Option<Runnable>>,
executor_context: Arc<ExecutorContext>,
}
impl Worker {
/// Creates a new worker.
fn new(local_queue: LocalQueue, executor_context: Arc<ExecutorContext>) -> Self {
Self {
local_queue,
fast_slot: Cell::new(None),
executor_context,
}
}
}
/// Schedules a `Runnable` from within a worker thread.
///
/// # Panics
///
/// This function will panic if called from a non-worker thread or if called
/// from the worker thread of another executor instance than the one the task
/// for this `Runnable` was spawned on.
fn schedule_task(task: Runnable, executor_id: usize) {
LOCAL_WORKER
.map(|worker| {
let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector;
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
// Check that this task was indeed spawned on this executor.
assert_eq!(
executor_id, worker.executor_context.executor_id,
"Tasks must be awaken on the same executor they are spawned on"
);
// Store the task in the fast slot and retrieve the one that was
// formerly stored, if any.
let prev_task = match fast_slot.replace(Some(task)) {
// If there already was a task in the slot, proceed so it can be
// moved to a task queue.
Some(t) => t,
// Otherwise return immediately: this task cannot be stolen so
// there is no point in activating a sibling worker.
None => return,
};
// Push the previous task to the local queue if possible or on the
// injector queue otherwise.
if let Err(prev_task) = local_queue.push(prev_task) {
// The local queue is full. Try to move half of it to the
// injector queue; if this fails, just push one task to the
// injector queue.
if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) {
injector.push_bucket(Bucket::from_iter(drain));
local_queue.push(prev_task).unwrap();
} else {
injector.insert_task(prev_task);
}
}
// A task has been pushed to the local or injector queue: try to
// activate another worker if no worker is currently searching for a
// task.
if pool_manager.searching_worker_count() == 0 {
pool_manager.activate_worker_relaxed();
}
})
.expect("Tasks may not be awaken outside executor threads");
}
/// Processes all incoming tasks on a worker thread until the `Terminate` signal
/// is received or until it panics.
///
/// Panics caught in this thread are relayed to the main executor thread.
fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector;
let executor_unparker = &worker.executor_context.executor_unparker;
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
let result = panic::catch_unwind(AssertUnwindSafe(|| {
// Set how long to spin when searching for a task.
const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000);
// Seed a thread RNG with the worker ID.
let rng = Rng::new(id as u64);
loop {
// Signal barrier: park until notified to continue or terminate.
// Try to deactivate the worker.
if pool_manager.try_set_worker_inactive(id) {
parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
} else if injector.is_empty() {
// This worker could not be deactivated because it was the last
// active worker. In such case, the call to
// `try_set_worker_inactive` establishes a synchronization with
// all threads that pushed tasks to the injector queue but could
// not activate a new worker, which is why some tasks may now be
// visible in the injector queue.
pool_manager.set_all_workers_inactive();
executor_unparker.unpark();
parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
} else {
pool_manager.begin_worker_search();
}
if pool_manager.termination_is_triggered() {
return;
}
let mut search_start = Instant::now();
// Process the tasks one by one.
loop {
// Check the injector queue first.
if let Some(bucket) = injector.pop_bucket() {
let bucket_iter = bucket.into_iter();
// There is a _very_ remote possibility that, even though
// the local queue is empty, it has temporarily too little
// spare capacity for the bucket. This could happen if a
// concurrent steal operation was preempted for all the time
// it took to pop and process the remaining tasks and it
// hasn't released the stolen capacity yet.
//
// Unfortunately, we cannot just skip checking the injector
// queue altogether when there isn't enough spare capacity
// in the local queue because this could lead to a race:
// suppose that (1) this thread has earlier pushed tasks
// onto the injector queue, and (2) the stealer has
// processed all stolen tasks before this thread sees the
// capacity restored and at the same time (3) the stealer
// does not yet see the tasks this thread pushed to the
// injector queue; in such scenario, both this thread and
// the stealer thread may park and leave unprocessed tasks
// in the injector queue.
//
// This is the only instance where spinning is used, as the
// probability of this happening is close to zero and the
// complexity of a signaling mechanism (condvar & friends)
// wouldn't carry its weight.
while local_queue.spare_capacity() < bucket_iter.len() {}
// Since empty buckets are never pushed onto the injector
// queue, we should now have at least one task to process.
local_queue.extend(bucket_iter);
} else {
// The injector queue is empty. Try to steal from active
// siblings.
let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng);
if stealers.all(|stealer| {
stealer
.steal_and_pop(local_queue, |n| n - n / 2)
.map(|(task, _)| {
let prev_task = fast_slot.replace(Some(task));
assert!(prev_task.is_none());
})
.is_err()
}) {
// Give up if unsuccessful for too long.
if (Instant::now() - search_start) > MAX_SEARCH_DURATION {
pool_manager.end_worker_search();
break;
}
// Re-try.
continue;
}
}
// Signal the end of the search so that another worker can be
// activated when a new task is scheduled.
pool_manager.end_worker_search();
// Pop tasks from the fast slot or the local queue.
while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) {
if pool_manager.termination_is_triggered() {
return;
}
task.run();
}
// Resume the search for tasks.
pool_manager.begin_worker_search();
search_start = Instant::now();
}
}
}));
// Propagate the panic, if any.
if let Err(panic) = result {
pool_manager.register_panic(panic);
pool_manager.trigger_termination();
executor_unparker.unpark();
}
}

View File

@ -0,0 +1,244 @@
use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering;
use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID;
use crate::macros::scoped_thread_local::scoped_thread_local;
const QUEUE_MIN_CAPACITY: usize = 32;
scoped_thread_local!(static EXECUTOR_CONTEXT: ExecutorContext);
scoped_thread_local!(static ACTIVE_TASKS: RefCell<Slab<CancelToken>>);
/// A single-threaded `async` executor.
pub(crate) struct Executor {
/// Shared executor data.
context: ExecutorContext,
/// List of tasks that have not completed yet.
active_tasks: RefCell<Slab<CancelToken>>,
}
impl Executor {
/// Creates an executor that runs futures on the current thread.
pub(crate) fn new() -> Self {
// Each executor instance has a unique ID inherited by tasks to ensure
// that tasks are scheduled on their parent executor.
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
assert!(
executor_id <= usize::MAX / 2,
"too many executors have been instantiated"
);
let context = ExecutorContext::new(executor_id);
let active_tasks = RefCell::new(Slab::new());
Self {
context,
active_tasks,
}
}
/// Spawns a task and returns a promise that can be polled to retrieve the
/// task's output.
///
/// Note that spawned tasks are not executed until [`run()`](Executor::run)
/// is called.
pub(crate) fn spawn<T>(&self, future: T) -> Promise<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
// Book a slot to store the task cancellation token.
let mut active_tasks = self.active_tasks.borrow_mut();
let task_entry = active_tasks.vacant_entry();
// Wrap the future so that it removes its cancel token from the
// executor's list when dropped.
let future = CancellableFuture::new(future, task_entry.key());
let (promise, runnable, cancel_token) =
task::spawn(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token);
let mut queue = self.context.queue.borrow_mut();
queue.push(runnable);
promise
}
/// Spawns a task which output will never be retrieved.
///
/// This is mostly useful to avoid undue reference counting for futures that
/// return a `()` type.
///
/// Note that spawned tasks are not executed until [`run()`](Executor::run)
/// is called.
pub(crate) fn spawn_and_forget<T>(&self, future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
// Book a slot to store the task cancellation token.
let mut active_tasks = self.active_tasks.borrow_mut();
let task_entry = active_tasks.vacant_entry();
// Wrap the future so that it removes its cancel token from the
// executor's list when dropped.
let future = CancellableFuture::new(future, task_entry.key());
let (runnable, cancel_token) =
task::spawn_and_forget(future, schedule_task, self.context.executor_id);
task_entry.insert(cancel_token);
let mut queue = self.context.queue.borrow_mut();
queue.push(runnable);
}
/// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) {
ACTIVE_TASKS.set(&self.active_tasks, || {
EXECUTOR_CONTEXT.set(&self.context, || loop {
let task = match self.context.queue.borrow_mut().pop() {
Some(task) => task,
None => break,
};
task.run();
})
});
}
}
impl Drop for Executor {
fn drop(&mut self) {
// Drop all tasks that have not completed.
//
// The executor context must be set because some tasks may schedule
// other tasks when dropped, which requires that the work queue be
// available.
EXECUTOR_CONTEXT.set(&self.context, || {
// Cancel all pending futures.
//
// `ACTIVE_TASKS` is explicitly unset to prevent
// `CancellableFuture::drop()` from trying to remove its own token
// from the list of active tasks as this would result in a nested
// call to `borrow_mut` and thus a panic. This is mainly to stay on
// the safe side: `ACTIVE_TASKS` should not be set anyway, unless
// for some reason the executor runs inside another executor.
ACTIVE_TASKS.unset(|| {
let mut tasks = self.active_tasks.borrow_mut();
for task in tasks.drain() {
task.cancel();
}
// Some of the dropped tasks may have scheduled other tasks that
// were not yet cancelled, preventing them from being dropped
// upon cancellation. This is OK: the scheduled tasks will be
// dropped when the work queue is dropped, and they cannot
// re-schedule one another since all tasks were cancelled.
});
});
}
}
impl fmt::Debug for Executor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Executor").finish_non_exhaustive()
}
}
/// Shared executor context.
///
/// This contains all executor resources that can be shared between threads.
struct ExecutorContext {
/// Work queue.
queue: RefCell<Vec<Runnable>>,
/// Unique executor identifier inherited by all tasks spawned on this
/// executor instance.
executor_id: usize,
}
impl ExecutorContext {
/// Creates a new shared executor context.
fn new(executor_id: usize) -> Self {
Self {
queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)),
executor_id,
}
}
}
/// A `Future` wrapper that removes its cancellation token from the list of
/// active tasks when dropped.
struct CancellableFuture<T: Future> {
inner: T,
cancellation_key: usize,
}
impl<T: Future> CancellableFuture<T> {
/// Creates a new `CancellableFuture`.
fn new(fut: T, cancellation_key: usize) -> Self {
Self {
inner: fut,
cancellation_key,
}
}
}
impl<T: Future> Future for CancellableFuture<T> {
type Output = T::Output;
#[inline(always)]
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) }
}
}
impl<T: Future> Drop for CancellableFuture<T> {
fn drop(&mut self) {
// Remove the task from the list of active tasks while the executor is
// running (meaning that `ACTIVE_TASK` is set). Otherwise do nothing and
// let the executor's drop handler do the cleanup.
let _ = ACTIVE_TASKS.map(|active_tasks| {
// Don't use `borrow_mut()` because this function can be called from
// a destructor and should not panic. In the worse case, the cancel
// token will be left in the list of active tasks, which does
// prevents eager task deallocation but does not cause any issue
// otherwise.
if let Ok(mut active_tasks) = active_tasks.try_borrow_mut() {
let _cancel_token = active_tasks.try_remove(self.cancellation_key);
}
});
}
}
/// Schedules a `Runnable` from within a worker thread.
///
/// # Panics
///
/// This function will panic if called from called outside from the executor
/// work thread or from another executor instance than the one the task for this
/// `Runnable` was spawned on.
fn schedule_task(task: Runnable, executor_id: usize) {
EXECUTOR_CONTEXT
.map(|context| {
// Check that this task was indeed spawned on this executor.
assert_eq!(
executor_id, context.executor_id,
"Tasks must be awaken on the same executor they are spawned on"
);
let mut queue = context.queue.borrow_mut();
queue.push(task);
})
.expect("Tasks may not be awaken outside executor threads");
}

View File

@ -1,140 +0,0 @@
use futures_channel::{mpsc, oneshot};
use futures_util::StreamExt;
use super::*;
/// An object that runs an arbitrary closure when dropped.
struct RunOnDrop<F: FnOnce()> {
drop_fn: Option<F>,
}
impl<F: FnOnce()> RunOnDrop<F> {
/// Creates a new `RunOnDrop`.
fn new(drop_fn: F) -> Self {
Self {
drop_fn: Some(drop_fn),
}
}
}
impl<F: FnOnce()> Drop for RunOnDrop<F> {
fn drop(&mut self) {
self.drop_fn.take().map(|f| f());
}
}
#[test]
fn executor_deadlock() {
const NUM_THREADS: usize = 3;
let (_sender1, receiver1) = oneshot::channel::<()>();
let (_sender2, receiver2) = oneshot::channel::<()>();
let mut executor = Executor::new(NUM_THREADS);
static LAUNCH_COUNT: AtomicUsize = AtomicUsize::new(0);
static COMPLETION_COUNT: AtomicUsize = AtomicUsize::new(0);
executor.spawn_and_forget(async move {
LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed);
let _ = receiver2.await;
COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed);
});
executor.spawn_and_forget(async move {
LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed);
let _ = receiver1.await;
COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed);
});
executor.run();
// Check that the executor returns on deadlock, i.e. none of the task has
// completed.
assert_eq!(LAUNCH_COUNT.load(Ordering::Relaxed), 2);
assert_eq!(COMPLETION_COUNT.load(Ordering::Relaxed), 0);
}
#[test]
fn executor_deadlock_st() {
const NUM_THREADS: usize = 1;
let (_sender1, receiver1) = oneshot::channel::<()>();
let (_sender2, receiver2) = oneshot::channel::<()>();
let mut executor = Executor::new(NUM_THREADS);
static LAUNCH_COUNT: AtomicUsize = AtomicUsize::new(0);
static COMPLETION_COUNT: AtomicUsize = AtomicUsize::new(0);
executor.spawn_and_forget(async move {
LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed);
let _ = receiver2.await;
COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed);
});
executor.spawn_and_forget(async move {
LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed);
let _ = receiver1.await;
COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed);
});
executor.run();
// Check that the executor returnes on deadlock, i.e. none of the task has
// completed.
assert_eq!(LAUNCH_COUNT.load(Ordering::Relaxed), 2);
assert_eq!(COMPLETION_COUNT.load(Ordering::Relaxed), 0);
}
#[test]
fn executor_drop_cycle() {
const NUM_THREADS: usize = 3;
let (sender1, mut receiver1) = mpsc::channel(2);
let (sender2, mut receiver2) = mpsc::channel(2);
let (sender3, mut receiver3) = mpsc::channel(2);
let mut executor = Executor::new(NUM_THREADS);
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
// Spawn 3 tasks that wake one another when dropped.
executor.spawn_and_forget({
let mut sender2 = sender2.clone();
let mut sender3 = sender3.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender2.try_send(());
let _ = sender3.try_send(());
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver1.next().await;
}
});
executor.spawn_and_forget({
let mut sender1 = sender1.clone();
let mut sender3 = sender3.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender1.try_send(());
let _ = sender3.try_send(());
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver2.next().await;
}
});
executor.spawn_and_forget({
let mut sender1 = sender1.clone();
let mut sender2 = sender2.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender1.try_send(());
let _ = sender2.try_send(());
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver3.next().await;
}
});
executor.run();
// Make sure that all tasks are eventually dropped even though each task
// wakes the others when dropped.
drop(executor);
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3);
}

View File

@ -1,25 +0,0 @@
use std::cell::Cell;
use std::sync::Arc;
use super::task::Runnable;
use super::ExecutorContext;
use super::LocalQueue;
/// A local worker with access to global executor resources.
pub(crate) struct Worker {
pub(super) local_queue: LocalQueue,
pub(super) fast_slot: Cell<Option<Runnable>>,
pub(super) executor_context: Arc<ExecutorContext>,
}
impl Worker {
/// Creates a new worker.
pub(super) fn new(local_queue: LocalQueue, executor_context: Arc<ExecutorContext>) -> Self {
Self {
local_queue,
fast_slot: Cell::new(None),
executor_context,
}
}
}

View File

@ -7,19 +7,18 @@ use std::ptr;
/// Declare a new thread-local storage scoped key of type `ScopedKey<T>`. /// Declare a new thread-local storage scoped key of type `ScopedKey<T>`.
/// ///
/// This is based on the `scoped-tls` crate, with slight modifications, such as /// This is based on the `scoped-tls` crate, with slight modifications, such as
/// the use of the newly available `const` qualifier for TLS. /// the addition of a `ScopedLocalKey::unset` method and the use of a `map`
/// method that returns `Option::None` when the value is not set, rather than
/// panicking as `with` would.
macro_rules! scoped_thread_local { macro_rules! scoped_thread_local {
($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => ( ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => (
$(#[$attrs])* $(#[$attrs])*
$vis static $name: $crate::macros::scoped_thread_local::ScopedLocalKey<$ty> $vis static $name: $crate::macros::scoped_thread_local::ScopedLocalKey<$ty>
= $crate::macros::scoped_thread_local::ScopedLocalKey { = unsafe {
inner: { ::std::thread_local!(static FOO: ::std::cell::Cell<*const ()> = const {
thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { ::std::cell::Cell::new(::std::ptr::null())
std::cell::Cell::new(::std::ptr::null())
}); });
&FOO $crate::macros::scoped_thread_local::ScopedLocalKey::new(&FOO)
},
_marker: ::std::marker::PhantomData,
}; };
) )
} }
@ -28,13 +27,24 @@ pub(crate) use scoped_thread_local;
/// Type representing a thread local storage key corresponding to a reference /// Type representing a thread local storage key corresponding to a reference
/// to the type parameter `T`. /// to the type parameter `T`.
pub(crate) struct ScopedLocalKey<T> { pub(crate) struct ScopedLocalKey<T> {
pub(crate) inner: &'static LocalKey<Cell<*const ()>>, inner: &'static LocalKey<Cell<*const ()>>,
pub(crate) _marker: marker::PhantomData<T>, _marker: marker::PhantomData<T>,
} }
unsafe impl<T> Sync for ScopedLocalKey<T> {} unsafe impl<T> Sync for ScopedLocalKey<T> {}
impl<T> ScopedLocalKey<T> { impl<T> ScopedLocalKey<T> {
#[doc(hidden)]
/// # Safety
///
/// Should only be called through the public macro.
pub(crate) const unsafe fn new(inner: &'static LocalKey<Cell<*const ()>>) -> Self {
Self {
inner,
_marker: marker::PhantomData,
}
}
/// Inserts a value into this scoped thread local storage slot for the /// Inserts a value into this scoped thread local storage slot for the
/// duration of a closure. /// duration of a closure.
pub(crate) fn set<F, R>(&'static self, t: &T, f: F) -> R pub(crate) fn set<F, R>(&'static self, t: &T, f: F) -> R

View File

@ -14,8 +14,8 @@ use super::codegen::simulation::*;
/// Transport-independent server implementation. /// Transport-independent server implementation.
/// ///
/// This implementation implements the protobuf services without any /// This implements the protobuf services without any transport-specific
/// transport-specific management. /// management.
pub(crate) struct GenericServer<F> { pub(crate) struct GenericServer<F> {
sim_gen: F, sim_gen: F,
sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>, sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>,

View File

@ -25,15 +25,22 @@ impl SimInit {
Self::with_num_threads(num_cpus::get()) Self::with_num_threads(num_cpus::get())
} }
/// Creates a builder for a multithreaded simulation running on the /// Creates a builder for a simulation running on the specified number of
/// specified number of threads. /// threads.
///
/// Note that the number of worker threads is automatically constrained to
/// be between 1 and `usize::BITS` (inclusive).
pub fn with_num_threads(num_threads: usize) -> Self { pub fn with_num_threads(num_threads: usize) -> Self {
// The current executor's implementation caps the number of thread to 64 let num_threads = num_threads.clamp(1, usize::BITS as usize);
// on 64-bit systems and 32 on 32-bit systems.
let num_threads = num_threads.min(usize::BITS as usize); let executor = if num_threads == 1 {
Executor::new_single_threaded()
} else {
Executor::new_multi_threaded(num_threads)
};
Self { Self {
executor: Executor::new(num_threads), executor,
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
time: SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)), time: SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)),
clock: Box::new(NoClock::new()), clock: Box::new(NoClock::new()),