From 77e6e569ffbc4c016d180d44f567111c53e8edaa Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Tue, 7 May 2024 17:30:11 +0200 Subject: [PATCH 1/2] Add same-thread executor support --- .github/workflows/ci.yml | 42 +- asynchronix/src/dev_hooks.rs | 2 +- asynchronix/src/executor.rs | 678 +++++------------- asynchronix/src/executor/mt_executor.rs | 576 +++++++++++++++ .../executor/{ => mt_executor}/injector.rs | 0 .../{ => mt_executor}/pool_manager.rs | 0 asynchronix/src/executor/st_executor.rs | 244 +++++++ asynchronix/src/executor/tests.rs | 140 ---- asynchronix/src/executor/worker.rs | 25 - asynchronix/src/macros/scoped_thread_local.rs | 32 +- asynchronix/src/rpc/generic_server.rs | 4 +- asynchronix/src/simulation/sim_init.rs | 19 +- 12 files changed, 1066 insertions(+), 696 deletions(-) create mode 100644 asynchronix/src/executor/mt_executor.rs rename asynchronix/src/executor/{ => mt_executor}/injector.rs (100%) rename asynchronix/src/executor/{ => mt_executor}/pool_manager.rs (100%) create mode 100644 asynchronix/src/executor/st_executor.rs delete mode 100644 asynchronix/src/executor/tests.rs delete mode 100644 asynchronix/src/executor/worker.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b0b9912..1821e5a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: pull_request: push: - branches: [ main, dev ] + branches: [main, dev] env: RUSTFLAGS: -Dwarnings @@ -42,7 +42,7 @@ jobs: - name: Run cargo test run: cargo test --features="rpc grpc-server" - + loom-dry-run: name: Loom dry run runs-on: ubuntu-latest @@ -70,23 +70,53 @@ jobs: with: components: miri - - name: Run cargo miri tests + - name: Run cargo miri tests (single-threaded executor) + run: cargo miri test --tests --lib --features="rpc grpc-server" + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri tests (multi-threaded executor) run: cargo miri test --tests --lib --features="rpc grpc-server" env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example1 + - name: Run cargo miri example1 (single-threaded executor) + run: cargo miri run --example espresso_machine + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example1 (multi-threaded executor) run: cargo miri run --example espresso_machine env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example2 + - name: Run cargo miri example2 (single-threaded executor) + run: cargo miri run --example power_supply + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example2 (multi-threaded executor) run: cargo miri run --example power_supply env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example3 + - name: Run cargo miri example3 (single-threaded executor) run: cargo miri run --example stepper_motor + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example3 (multi-threaded executor) + run: cargo miri run --example stepper_motor + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 + + - name: Run cargo miri example4 (single-threaded executor) + run: cargo miri run --example assembly + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri example4 (multi-threaded executor) + run: cargo miri run --example assembly env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 96d948c..f79102f 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -15,7 +15,7 @@ impl Executor { /// /// The maximum number of threads is set with the `pool_size` parameter. pub fn new(pool_size: usize) -> Self { - Self(executor::Executor::new(pool_size)) + Self(executor::Executor::new_multi_threaded(pool_size)) } /// Spawns a task which output will never be retrieved. diff --git a/asynchronix/src/executor.rs b/asynchronix/src/executor.rs index b33a603..3d5a8de 100644 --- a/asynchronix/src/executor.rs +++ b/asynchronix/src/executor.rs @@ -1,98 +1,30 @@ -//! Multi-threaded `async` executor. -//! -//! The executor is exclusively designed for message-passing computational -//! tasks. As such, it does not include an I/O reactor and does not consider -//! fairness as a goal in itself. While it does use fair local queues inasmuch -//! as these tend to perform better in message-passing applications, it uses an -//! unfair injection queue and a LIFO slot without attempt to mitigate the -//! effect of badly behaving code (e.g. futures that spin-lock by yielding to -//! the executor; there is for this reason no support for something like tokio's -//! `yield_now`). -//! -//! Another way in which it differs from other `async` executors is that it -//! treats deadlocking as a normal occurrence. This is because in a -//! discrete-time simulator, the simulation of a system at a given time step -//! will make as much progress as possible until it technically reaches a -//! deadlock. Only then does the simulator advance the simulated time to that of -//! the next "event" extracted from a time-sorted priority queue. -//! -//! The design of the executor is largely influenced by the tokio and Go -//! schedulers, both of which are optimized for message-passing applications. In -//! particular, it uses fast, fixed-size thread-local work-stealing queues with -//! a non-stealable LIFO slot in combination with an injector queue, which -//! injector queue is used both to schedule new tasks and to absorb temporary -//! overflow in the local queues. -//! -//! The design of the injector queue is kept very simple compared to tokio, by -//! taking advantage of the fact that the injector is not required to be either -//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast -//! because tasks are moved in batch and are stored contiguously in memory. -//! -//! Another difference with tokio is that, at the moment, the complete subset of -//! active worker threads is stored in a single atomic variable. This makes it -//! possible to rapidly identify free worker threads for stealing operations, -//! with the downside that the maximum number of worker threads is currently -//! limited to `usize::BITS`. This is not expected to constitute a limitation in -//! practice since system simulation is not typically embarrassingly parallel. -//! -//! Probably the largest difference with tokio is the task system, which has -//! better throughput due to less need for synchronization. This mainly results -//! from the use of an atomic notification counter rather than an atomic -//! notification flag, thus alleviating the need to reset the notification flag -//! before polling a future. +//! `async` executor trait. -use std::fmt; -use std::future::Future; -use std::panic::{self, AssertUnwindSafe}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::{self, JoinHandle}; -use std::time::{Duration, Instant}; - -use crossbeam_utils::sync::{Parker, Unparker}; -use slab::Slab; - -mod injector; -mod pool_manager; +mod mt_executor; +mod st_executor; mod task; -mod worker; -#[cfg(all(test, not(asynchronix_loom)))] -mod tests; +use std::future::Future; +use std::sync::atomic::AtomicUsize; -use crate::macros::scoped_thread_local::scoped_thread_local; -use crate::util::rng::Rng; - -use self::pool_manager::PoolManager; -use self::task::{CancelToken, Promise, Runnable}; -use self::worker::Worker; - -const BUCKET_SIZE: usize = 128; -const QUEUE_SIZE: usize = BUCKET_SIZE * 2; +use task::Promise; +/// Unique identifier for executor instances. static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); -type Bucket = injector::Bucket; -type Injector = injector::Injector; -type LocalQueue = st3::fifo::Worker; -type Stealer = st3::fifo::Stealer; - -scoped_thread_local!(static LOCAL_WORKER: Worker); -scoped_thread_local!(static ACTIVE_TASKS: Mutex>); - -/// A multi-threaded `async` executor. -pub(crate) struct Executor { - /// Shared executor data. - context: Arc, - /// List of tasks that have not completed yet. - active_tasks: Arc>>, - /// Parker for the main executor thread. - parker: Parker, - /// Handles to the worker threads. - worker_handles: Vec>, +/// A single-threaded or multi-threaded `async` executor. +#[derive(Debug)] +pub(crate) enum Executor { + StExecutor(st_executor::Executor), + MtExecutor(mt_executor::Executor), } impl Executor { + /// Creates an executor that runs futures on the current thread. + pub(crate) fn new_single_threaded() -> Self { + Self::StExecutor(st_executor::Executor::new()) + } + /// Creates an executor that runs futures on a thread pool. /// /// The maximum number of threads is set with the `num_threads` parameter. @@ -101,78 +33,11 @@ impl Executor { /// /// This will panic if the specified number of threads is zero or is more /// than `usize::BITS`. - pub(crate) fn new(num_threads: usize) -> Self { - let parker = Parker::new(); - let unparker = parker.unparker().clone(); - - let (local_queues_and_parkers, stealers_and_unparkers): (Vec<_>, Vec<_>) = (0..num_threads) - .map(|_| { - let parker = Parker::new(); - let unparker = parker.unparker().clone(); - let local_queue = LocalQueue::new(QUEUE_SIZE); - let stealer = local_queue.stealer(); - - ((local_queue, parker), (stealer, unparker)) - }) - .unzip(); - - // Each executor instance has a unique ID inherited by tasks to ensure - // that tasks are scheduled on their parent executor. - let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); - assert!( - executor_id <= usize::MAX / 2, - "too many executors have been instantiated" - ); - - let context = Arc::new(ExecutorContext::new( - executor_id, - unparker, - stealers_and_unparkers.into_iter(), - )); - let active_tasks = Arc::new(Mutex::new(Slab::new())); - - // All workers must be marked as active _before_ spawning the threads to - // make sure that the count of active workers does not fall to zero - // before all workers are blocked on the signal barrier. - context.pool_manager.set_all_workers_active(); - - // Spawn all worker threads. - let worker_handles: Vec<_> = local_queues_and_parkers - .into_iter() - .enumerate() - .map(|(id, (local_queue, worker_parker))| { - let thread_builder = thread::Builder::new().name(format!("Worker #{}", id)); - - thread_builder - .spawn({ - let context = context.clone(); - let active_tasks = active_tasks.clone(); - move || { - let worker = Worker::new(local_queue, context); - ACTIVE_TASKS.set(&active_tasks, || { - LOCAL_WORKER - .set(&worker, || run_local_worker(&worker, id, worker_parker)) - }); - } - }) - .unwrap() - }) - .collect(); - - // Wait until all workers are blocked on the signal barrier. - parker.park(); - assert!(context.pool_manager.pool_is_idle()); - - Self { - context, - active_tasks, - parker, - worker_handles, - } + pub(crate) fn new_multi_threaded(num_threads: usize) -> Self { + Self::MtExecutor(mt_executor::Executor::new(num_threads)) } - /// Spawns a task and returns a promise that can be polled to retrieve the - /// task's output. + /// Spawns a task which output will never be retrieved. /// /// Note that spawned tasks are not executed until [`run()`](Executor::run) /// is called. @@ -182,28 +47,14 @@ impl Executor { T: Future + Send + 'static, T::Output: Send + 'static, { - // Book a slot to store the task cancellation token. - let mut active_tasks = self.active_tasks.lock().unwrap(); - let task_entry = active_tasks.vacant_entry(); - - // Wrap the future so that it removes its cancel token from the - // executor's list when dropped. - let future = CancellableFuture::new(future, task_entry.key()); - - let (promise, runnable, cancel_token) = - task::spawn(future, schedule_task, self.context.executor_id); - - task_entry.insert(cancel_token); - self.context.injector.insert_task(runnable); - - promise + match self { + Self::StExecutor(executor) => executor.spawn(future), + Self::MtExecutor(executor) => executor.spawn(future), + } } /// Spawns a task which output will never be retrieved. /// - /// This is mostly useful to avoid undue reference counting for futures that - /// return a `()` type. - /// /// Note that spawned tasks are not executed until [`run()`](Executor::run) /// is called. pub(crate) fn spawn_and_forget(&self, future: T) @@ -211,354 +62,171 @@ impl Executor { T: Future + Send + 'static, T::Output: Send + 'static, { - // Book a slot to store the task cancellation token. - let mut active_tasks = self.active_tasks.lock().unwrap(); - let task_entry = active_tasks.vacant_entry(); - - // Wrap the future so that it removes its cancel token from the - // executor's list when dropped. - let future = CancellableFuture::new(future, task_entry.key()); - - let (runnable, cancel_token) = - task::spawn_and_forget(future, schedule_task, self.context.executor_id); - - task_entry.insert(cancel_token); - self.context.injector.insert_task(runnable); + match self { + Self::StExecutor(executor) => executor.spawn_and_forget(future), + Self::MtExecutor(executor) => executor.spawn_and_forget(future), + } } /// Execute spawned tasks, blocking until all futures have completed or /// until the executor reaches a deadlock. pub(crate) fn run(&mut self) { - self.context.pool_manager.activate_worker(); + match self { + Self::StExecutor(executor) => executor.run(), + Self::MtExecutor(executor) => executor.run(), + } + } +} - loop { - if let Some(worker_panic) = self.context.pool_manager.take_panic() { - panic::resume_unwind(worker_panic); +#[cfg(all(test, not(asynchronix_loom)))] +mod tests { + use std::sync::atomic::Ordering; + use std::sync::Arc; + + use futures_channel::{mpsc, oneshot}; + use futures_util::StreamExt; + + use super::*; + + /// An object that runs an arbitrary closure when dropped. + struct RunOnDrop { + drop_fn: Option, + } + impl RunOnDrop { + /// Creates a new `RunOnDrop`. + fn new(drop_fn: F) -> Self { + Self { + drop_fn: Some(drop_fn), } - if self.context.pool_manager.pool_is_idle() { - return; - } - - self.parker.park(); } } -} - -impl Drop for Executor { - fn drop(&mut self) { - // Force all threads to return. - self.context.pool_manager.trigger_termination(); - for handle in self.worker_handles.drain(0..) { - handle.join().unwrap(); - } - - // Drop all tasks that have not completed. - // - // A local worker must be set because some tasks may schedule other - // tasks when dropped, which requires that a local worker be available. - let worker = Worker::new(LocalQueue::new(QUEUE_SIZE), self.context.clone()); - LOCAL_WORKER.set(&worker, || { - // Cancel all pending futures. - // - // `ACTIVE_TASKS` is explicitly unset to prevent - // `CancellableFuture::drop()` from trying to remove its own token - // from the list of active tasks as this would result in a reentrant - // lock. This is mainly to stay on the safe side: `ACTIVE_TASKS` - // should not be set on this thread anyway, unless for some reason - // the executor runs inside another executor. - ACTIVE_TASKS.unset(|| { - let mut tasks = self.active_tasks.lock().unwrap(); - for task in tasks.drain() { - task.cancel(); - } - - // Some of the dropped tasks may have scheduled other tasks that - // were not yet cancelled, preventing them from being dropped - // upon cancellation. This is OK: the scheduled tasks will be - // dropped when the local and injector queues are dropped, and - // they cannot re-schedule one another since all tasks were - // cancelled. - }); - }); - } -} - -impl fmt::Debug for Executor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Executor").finish_non_exhaustive() - } -} - -/// Shared executor context. -/// -/// This contains all executor resources that can be shared between threads. -struct ExecutorContext { - /// Injector queue. - injector: Injector, - /// Unique executor ID inherited by all tasks spawned on this executor instance. - executor_id: usize, - /// Unparker for the main executor thread. - executor_unparker: Unparker, - /// Manager for all worker threads. - pool_manager: PoolManager, -} - -impl ExecutorContext { - /// Creates a new shared executor context. - pub(super) fn new( - executor_id: usize, - executor_unparker: Unparker, - stealers_and_unparkers: impl Iterator, - ) -> Self { - let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = - stealers_and_unparkers.into_iter().unzip(); - let worker_unparkers = worker_unparkers.into_boxed_slice(); - - Self { - injector: Injector::new(), - executor_id, - executor_unparker, - pool_manager: PoolManager::new( - worker_unparkers.len(), - stealers.into_boxed_slice(), - worker_unparkers, - ), + impl Drop for RunOnDrop { + fn drop(&mut self) { + self.drop_fn.take().map(|f| f()); } } -} -/// A `Future` wrapper that removes its cancellation token from the list of -/// active tasks when dropped. -struct CancellableFuture { - inner: T, - cancellation_key: usize, -} + fn executor_deadlock(mut executor: Executor) { + let (_sender1, receiver1) = oneshot::channel::<()>(); + let (_sender2, receiver2) = oneshot::channel::<()>(); -impl CancellableFuture { - /// Creates a new `CancellableFuture`. - fn new(fut: T, cancellation_key: usize) -> Self { - Self { - inner: fut, - cancellation_key, - } - } -} + let launch_count = Arc::new(AtomicUsize::new(0)); + let completion_count = Arc::new(AtomicUsize::new(0)); -impl Future for CancellableFuture { - type Output = T::Output; + executor.spawn_and_forget({ + let launch_count = launch_count.clone(); + let completion_count = completion_count.clone(); - #[inline(always)] - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } - } -} - -impl Drop for CancellableFuture { - fn drop(&mut self) { - // Remove the task from the list of active tasks if the future is - // dropped on a worker thread. Otherwise do nothing and let the - // executor's drop handler do the cleanup. - let _ = ACTIVE_TASKS.map(|active_tasks| { - // Don't unwrap on `lock()` because this function can be called from - // a destructor and should not panic. In the worse case, the cancel - // token will be left in the list of active tasks, which does - // prevents eager task deallocation but does not cause any issue - // otherwise. - if let Ok(mut active_tasks) = active_tasks.lock() { - let _cancel_token = active_tasks.try_remove(self.cancellation_key); + async move { + launch_count.fetch_add(1, Ordering::Relaxed); + let _ = receiver2.await; + completion_count.fetch_add(1, Ordering::Relaxed); } }); - } -} - -/// Schedules a `Runnable` from within a worker thread. -/// -/// # Panics -/// -/// This function will panic if called from a non-worker thread or if called -/// from the worker thread of another executor instance than the one the task -/// for this `Runnable` was spawned on. -fn schedule_task(task: Runnable, executor_id: usize) { - LOCAL_WORKER - .map(|worker| { - let pool_manager = &worker.executor_context.pool_manager; - let injector = &worker.executor_context.injector; - let local_queue = &worker.local_queue; - let fast_slot = &worker.fast_slot; - - // Check that this task was indeed spawned on this executor. - assert_eq!( - executor_id, worker.executor_context.executor_id, - "Tasks must be awaken on the same executor they are spawned on" - ); - - // Store the task in the fast slot and retrieve the one that was - // formerly stored, if any. - let prev_task = match fast_slot.replace(Some(task)) { - // If there already was a task in the slot, proceed so it can be - // moved to a task queue. - Some(t) => t, - // Otherwise return immediately: this task cannot be stolen so - // there is no point in activating a sibling worker. - None => return, - }; - - // Push the previous task to the local queue if possible or on the - // injector queue otherwise. - if let Err(prev_task) = local_queue.push(prev_task) { - // The local queue is full. Try to move half of it to the - // injector queue; if this fails, just push one task to the - // injector queue. - if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) { - injector.push_bucket(Bucket::from_iter(drain)); - local_queue.push(prev_task).unwrap(); - } else { - injector.insert_task(prev_task); - } - } - - // A task has been pushed to the local or injector queue: try to - // activate another worker if no worker is currently searching for a - // task. - if pool_manager.searching_worker_count() == 0 { - pool_manager.activate_worker_relaxed(); - } - }) - .expect("Tasks may not be awaken outside executor threads"); -} - -/// Processes all incoming tasks on a worker thread until the `Terminate` signal -/// is received or until it panics. -/// -/// Panics caught in this thread are relayed to the main executor thread. -fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { - let pool_manager = &worker.executor_context.pool_manager; - let injector = &worker.executor_context.injector; - let executor_unparker = &worker.executor_context.executor_unparker; - let local_queue = &worker.local_queue; - let fast_slot = &worker.fast_slot; - - let result = panic::catch_unwind(AssertUnwindSafe(|| { - // Set how long to spin when searching for a task. - const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); - - // Seed a thread RNG with the worker ID. - let rng = Rng::new(id as u64); - - loop { - // Signal barrier: park until notified to continue or terminate. - - // Try to deactivate the worker. - if pool_manager.try_set_worker_inactive(id) { - parker.park(); - // No need to call `begin_worker_search()`: this was done by the - // thread that unparked the worker. - } else if injector.is_empty() { - // This worker could not be deactivated because it was the last - // active worker. In such case, the call to - // `try_set_worker_inactive` establishes a synchronization with - // all threads that pushed tasks to the injector queue but could - // not activate a new worker, which is why some tasks may now be - // visible in the injector queue. - pool_manager.set_all_workers_inactive(); - executor_unparker.unpark(); - parker.park(); - // No need to call `begin_worker_search()`: this was done by the - // thread that unparked the worker. - } else { - pool_manager.begin_worker_search(); - } - - if pool_manager.termination_is_triggered() { - return; - } - - let mut search_start = Instant::now(); - - // Process the tasks one by one. - loop { - // Check the injector queue first. - if let Some(bucket) = injector.pop_bucket() { - let bucket_iter = bucket.into_iter(); - - // There is a _very_ remote possibility that, even though - // the local queue is empty, it has temporarily too little - // spare capacity for the bucket. This could happen if a - // concurrent steal operation was preempted for all the time - // it took to pop and process the remaining tasks and it - // hasn't released the stolen capacity yet. - // - // Unfortunately, we cannot just skip checking the injector - // queue altogether when there isn't enough spare capacity - // in the local queue because this could lead to a race: - // suppose that (1) this thread has earlier pushed tasks - // onto the injector queue, and (2) the stealer has - // processed all stolen tasks before this thread sees the - // capacity restored and at the same time (3) the stealer - // does not yet see the tasks this thread pushed to the - // injector queue; in such scenario, both this thread and - // the stealer thread may park and leave unprocessed tasks - // in the injector queue. - // - // This is the only instance where spinning is used, as the - // probability of this happening is close to zero and the - // complexity of a signaling mechanism (condvar & friends) - // wouldn't carry its weight. - while local_queue.spare_capacity() < bucket_iter.len() {} - - // Since empty buckets are never pushed onto the injector - // queue, we should now have at least one task to process. - local_queue.extend(bucket_iter); - } else { - // The injector queue is empty. Try to steal from active - // siblings. - let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng); - if stealers.all(|stealer| { - stealer - .steal_and_pop(local_queue, |n| n - n / 2) - .map(|(task, _)| { - let prev_task = fast_slot.replace(Some(task)); - assert!(prev_task.is_none()); - }) - .is_err() - }) { - // Give up if unsuccessful for too long. - if (Instant::now() - search_start) > MAX_SEARCH_DURATION { - pool_manager.end_worker_search(); - break; - } - - // Re-try. - continue; - } - } - - // Signal the end of the search so that another worker can be - // activated when a new task is scheduled. - pool_manager.end_worker_search(); - - // Pop tasks from the fast slot or the local queue. - while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { - if pool_manager.termination_is_triggered() { - return; - } - task.run(); - } - - // Resume the search for tasks. - pool_manager.begin_worker_search(); - search_start = Instant::now(); - } - } - })); - - // Propagate the panic, if any. - if let Err(panic) = result { - pool_manager.register_panic(panic); - pool_manager.trigger_termination(); - executor_unparker.unpark(); + executor.spawn_and_forget({ + let launch_count = launch_count.clone(); + let completion_count = completion_count.clone(); + + async move { + launch_count.fetch_add(1, Ordering::Relaxed); + let _ = receiver1.await; + completion_count.fetch_add(1, Ordering::Relaxed); + } + }); + + executor.run(); + + // Check that the executor returns on deadlock, i.e. none of the task has + // completed. + assert_eq!(launch_count.load(Ordering::Relaxed), 2); + assert_eq!(completion_count.load(Ordering::Relaxed), 0); + + // Drop the executor and thus the receiver tasks before the senders, + // failing which the senders may signal that the channel has been + // dropped and wake the tasks outside the executor. + drop(executor); + } + + fn executor_drop_cycle(mut executor: Executor) { + let (sender1, mut receiver1) = mpsc::channel(2); + let (sender2, mut receiver2) = mpsc::channel(2); + let (sender3, mut receiver3) = mpsc::channel(2); + + let drop_count = Arc::new(AtomicUsize::new(0)); + + // Spawn 3 tasks that wake one another when dropped. + executor.spawn_and_forget({ + let mut sender2 = sender2.clone(); + let mut sender3 = sender3.clone(); + let drop_count = drop_count.clone(); + + async move { + let _guard = RunOnDrop::new(move || { + let _ = sender2.try_send(()); + let _ = sender3.try_send(()); + drop_count.fetch_add(1, Ordering::Relaxed); + }); + let _ = receiver1.next().await; + } + }); + executor.spawn_and_forget({ + let mut sender1 = sender1.clone(); + let mut sender3 = sender3.clone(); + let drop_count = drop_count.clone(); + + async move { + let _guard = RunOnDrop::new(move || { + let _ = sender1.try_send(()); + let _ = sender3.try_send(()); + drop_count.fetch_add(1, Ordering::Relaxed); + }); + let _ = receiver2.next().await; + } + }); + executor.spawn_and_forget({ + let mut sender1 = sender1.clone(); + let mut sender2 = sender2.clone(); + let drop_count = drop_count.clone(); + + async move { + let _guard = RunOnDrop::new(move || { + let _ = sender1.try_send(()); + let _ = sender2.try_send(()); + drop_count.fetch_add(1, Ordering::Relaxed); + }); + let _ = receiver3.next().await; + } + }); + + executor.run(); + + // Make sure that all tasks are eventually dropped even though each task + // wakes the others when dropped. + drop(executor); + assert_eq!(drop_count.load(Ordering::Relaxed), 3); + } + + #[test] + fn executor_deadlock_st() { + executor_deadlock(Executor::new_single_threaded()); + } + + #[test] + fn executor_deadlock_mt() { + executor_deadlock(Executor::new_multi_threaded(3)); + } + + #[test] + fn executor_deadlock_mt_one_worker() { + executor_deadlock(Executor::new_multi_threaded(1)); + } + #[test] + fn executor_drop_cycle_st() { + executor_drop_cycle(Executor::new_single_threaded()); + } + + #[test] + fn executor_drop_cycle_mt() { + executor_drop_cycle(Executor::new_multi_threaded(3)); } } diff --git a/asynchronix/src/executor/mt_executor.rs b/asynchronix/src/executor/mt_executor.rs new file mode 100644 index 0000000..5859cdf --- /dev/null +++ b/asynchronix/src/executor/mt_executor.rs @@ -0,0 +1,576 @@ +//! Multi-threaded `async` executor. +//! +//! The executor is exclusively designed for message-passing computational +//! tasks. As such, it does not include an I/O reactor and does not consider +//! fairness as a goal in itself. While it does use fair local queues inasmuch +//! as these tend to perform better in message-passing applications, it uses an +//! unfair injection queue and a LIFO slot without attempt to mitigate the +//! effect of badly behaving code (e.g. futures that spin-lock by yielding to +//! the executor; there is for this reason no support for something like tokio's +//! `yield_now`). +//! +//! Another way in which it differs from other `async` executors is that it +//! treats deadlocking as a normal occurrence. This is because in a +//! discrete-time simulator, the simulation of a system at a given time step +//! will make as much progress as possible until it technically reaches a +//! deadlock. Only then does the simulator advance the simulated time to that of +//! the next "event" extracted from a time-sorted priority queue. +//! +//! The design of the executor is largely influenced by the tokio and Go +//! schedulers, both of which are optimized for message-passing applications. In +//! particular, it uses fast, fixed-size thread-local work-stealing queues with +//! a non-stealable LIFO slot in combination with an injector queue, which +//! injector queue is used both to schedule new tasks and to absorb temporary +//! overflow in the local queues. +//! +//! The design of the injector queue is kept very simple compared to tokio, by +//! taking advantage of the fact that the injector is not required to be either +//! LIFO or FIFO. Moving tasks between a local queue and the injector is fast +//! because tasks are moved in batch and are stored contiguously in memory. +//! +//! Another difference with tokio is that, at the moment, the complete subset of +//! active worker threads is stored in a single atomic variable. This makes it +//! possible to rapidly identify free worker threads for stealing operations, +//! with the downside that the maximum number of worker threads is currently +//! limited to `usize::BITS`. This is not expected to constitute a limitation in +//! practice since system simulation is not typically embarrassingly parallel. +//! +//! Probably the largest difference with tokio is the task system, which has +//! better throughput due to less need for synchronization. This mainly results +//! from the use of an atomic notification counter rather than an atomic +//! notification flag, thus alleviating the need to reset the notification flag +//! before polling a future. + +mod injector; +mod pool_manager; + +use std::cell::Cell; +use std::fmt; +use std::future::Future; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::atomic::Ordering; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use crossbeam_utils::sync::{Parker, Unparker}; +use slab::Slab; + +use crate::macros::scoped_thread_local::scoped_thread_local; +use crate::util::rng::Rng; + +use super::task::{self, CancelToken, Promise, Runnable}; +use super::NEXT_EXECUTOR_ID; +use pool_manager::PoolManager; + +const BUCKET_SIZE: usize = 128; +const QUEUE_SIZE: usize = BUCKET_SIZE * 2; + +type Bucket = injector::Bucket; +type Injector = injector::Injector; +type LocalQueue = st3::fifo::Worker; +type Stealer = st3::fifo::Stealer; + +scoped_thread_local!(static LOCAL_WORKER: Worker); +scoped_thread_local!(static ACTIVE_TASKS: Mutex>); + +/// A multi-threaded `async` executor. +pub(crate) struct Executor { + /// Shared executor data. + context: Arc, + /// List of tasks that have not completed yet. + active_tasks: Arc>>, + /// Parker for the main executor thread. + parker: Parker, + /// Handles to the worker threads. + worker_handles: Vec>, +} + +impl Executor { + /// Creates an executor that runs futures on a thread pool. + /// + /// The maximum number of threads is set with the `num_threads` parameter. + /// + /// # Panics + /// + /// This will panic if the specified number of threads is zero or is more + /// than `usize::BITS`. + pub(crate) fn new(num_threads: usize) -> Self { + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + + let (local_queues_and_parkers, stealers_and_unparkers): (Vec<_>, Vec<_>) = (0..num_threads) + .map(|_| { + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + let local_queue = LocalQueue::new(QUEUE_SIZE); + let stealer = local_queue.stealer(); + + ((local_queue, parker), (stealer, unparker)) + }) + .unzip(); + + // Each executor instance has a unique ID inherited by tasks to ensure + // that tasks are scheduled on their parent executor. + let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); + assert!( + executor_id <= usize::MAX / 2, + "too many executors have been instantiated" + ); + + let context = Arc::new(ExecutorContext::new( + executor_id, + unparker, + stealers_and_unparkers.into_iter(), + )); + let active_tasks = Arc::new(Mutex::new(Slab::new())); + + // All workers must be marked as active _before_ spawning the threads to + // make sure that the count of active workers does not fall to zero + // before all workers are blocked on the signal barrier. + context.pool_manager.set_all_workers_active(); + + // Spawn all worker threads. + let worker_handles: Vec<_> = local_queues_and_parkers + .into_iter() + .enumerate() + .map(|(id, (local_queue, worker_parker))| { + let thread_builder = thread::Builder::new().name(format!("Worker #{}", id)); + + thread_builder + .spawn({ + let context = context.clone(); + let active_tasks = active_tasks.clone(); + move || { + let worker = Worker::new(local_queue, context); + ACTIVE_TASKS.set(&active_tasks, || { + LOCAL_WORKER + .set(&worker, || run_local_worker(&worker, id, worker_parker)) + }); + } + }) + .unwrap() + }) + .collect(); + + // Wait until all workers are blocked on the signal barrier. + parker.park(); + assert!(context.pool_manager.pool_is_idle()); + + Self { + context, + active_tasks, + parker, + worker_handles, + } + } + + /// Spawns a task and returns a promise that can be polled to retrieve the + /// task's output. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn(&self, future: T) -> Promise + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.lock().unwrap(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (promise, runnable, cancel_token) = + task::spawn(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + self.context.injector.insert_task(runnable); + + promise + } + + /// Spawns a task which output will never be retrieved. + /// + /// This is mostly useful to avoid undue reference counting for futures that + /// return a `()` type. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn_and_forget(&self, future: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.lock().unwrap(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (runnable, cancel_token) = + task::spawn_and_forget(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + self.context.injector.insert_task(runnable); + } + + /// Execute spawned tasks, blocking until all futures have completed or + /// until the executor reaches a deadlock. + pub(crate) fn run(&mut self) { + self.context.pool_manager.activate_worker(); + + loop { + if let Some(worker_panic) = self.context.pool_manager.take_panic() { + panic::resume_unwind(worker_panic); + } + if self.context.pool_manager.pool_is_idle() { + return; + } + + self.parker.park(); + } + } +} + +impl Drop for Executor { + fn drop(&mut self) { + // Force all threads to return. + self.context.pool_manager.trigger_termination(); + for handle in self.worker_handles.drain(0..) { + handle.join().unwrap(); + } + + // Drop all tasks that have not completed. + // + // A local worker must be set because some tasks may schedule other + // tasks when dropped, which requires that a local worker be available. + let worker = Worker::new(LocalQueue::new(QUEUE_SIZE), self.context.clone()); + LOCAL_WORKER.set(&worker, || { + // Cancel all pending futures. + // + // `ACTIVE_TASKS` is explicitly unset to prevent + // `CancellableFuture::drop()` from trying to remove its own token + // from the list of active tasks as this would result in a reentrant + // lock. This is mainly to stay on the safe side: `ACTIVE_TASKS` + // should not be set on this thread anyway, unless for some reason + // the executor runs inside another executor. + ACTIVE_TASKS.unset(|| { + let mut tasks = self.active_tasks.lock().unwrap(); + for task in tasks.drain() { + task.cancel(); + } + + // Some of the dropped tasks may have scheduled other tasks that + // were not yet cancelled, preventing them from being dropped + // upon cancellation. This is OK: the scheduled tasks will be + // dropped when the local and injector queues are dropped, and + // they cannot re-schedule one another since all tasks were + // cancelled. + }); + }); + } +} + +impl fmt::Debug for Executor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Executor").finish_non_exhaustive() + } +} + +/// Shared executor context. +/// +/// This contains all executor resources that can be shared between threads. +struct ExecutorContext { + /// Injector queue. + injector: Injector, + /// Unique executor identifier inherited by all tasks spawned on this + /// executor instance. + executor_id: usize, + /// Unparker for the main executor thread. + executor_unparker: Unparker, + /// Manager for all worker threads. + pool_manager: PoolManager, +} + +impl ExecutorContext { + /// Creates a new shared executor context. + pub(super) fn new( + executor_id: usize, + executor_unparker: Unparker, + stealers_and_unparkers: impl Iterator, + ) -> Self { + let (stealers, worker_unparkers): (Vec<_>, Vec<_>) = + stealers_and_unparkers.into_iter().unzip(); + let worker_unparkers = worker_unparkers.into_boxed_slice(); + + Self { + injector: Injector::new(), + executor_id, + executor_unparker, + pool_manager: PoolManager::new( + worker_unparkers.len(), + stealers.into_boxed_slice(), + worker_unparkers, + ), + } + } +} + +/// A `Future` wrapper that removes its cancellation token from the list of +/// active tasks when dropped. +struct CancellableFuture { + inner: T, + cancellation_key: usize, +} + +impl CancellableFuture { + /// Creates a new `CancellableFuture`. + fn new(fut: T, cancellation_key: usize) -> Self { + Self { + inner: fut, + cancellation_key, + } + } +} + +impl Future for CancellableFuture { + type Output = T::Output; + + #[inline(always)] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } + } +} + +impl Drop for CancellableFuture { + fn drop(&mut self) { + // Remove the task from the list of active tasks if the future is + // dropped on a worker thread. Otherwise do nothing and let the + // executor's drop handler do the cleanup. + let _ = ACTIVE_TASKS.map(|active_tasks| { + // Don't unwrap on `lock()` because this function can be called from + // a destructor and should not panic. In the worse case, the cancel + // token will be left in the list of active tasks, which does + // prevents eager task deallocation but does not cause any issue + // otherwise. + if let Ok(mut active_tasks) = active_tasks.lock() { + let _cancel_token = active_tasks.try_remove(self.cancellation_key); + } + }); + } +} + +/// A local worker with access to global executor resources. +pub(crate) struct Worker { + local_queue: LocalQueue, + fast_slot: Cell>, + executor_context: Arc, +} + +impl Worker { + /// Creates a new worker. + fn new(local_queue: LocalQueue, executor_context: Arc) -> Self { + Self { + local_queue, + fast_slot: Cell::new(None), + executor_context, + } + } +} + +/// Schedules a `Runnable` from within a worker thread. +/// +/// # Panics +/// +/// This function will panic if called from a non-worker thread or if called +/// from the worker thread of another executor instance than the one the task +/// for this `Runnable` was spawned on. +fn schedule_task(task: Runnable, executor_id: usize) { + LOCAL_WORKER + .map(|worker| { + let pool_manager = &worker.executor_context.pool_manager; + let injector = &worker.executor_context.injector; + let local_queue = &worker.local_queue; + let fast_slot = &worker.fast_slot; + + // Check that this task was indeed spawned on this executor. + assert_eq!( + executor_id, worker.executor_context.executor_id, + "Tasks must be awaken on the same executor they are spawned on" + ); + + // Store the task in the fast slot and retrieve the one that was + // formerly stored, if any. + let prev_task = match fast_slot.replace(Some(task)) { + // If there already was a task in the slot, proceed so it can be + // moved to a task queue. + Some(t) => t, + // Otherwise return immediately: this task cannot be stolen so + // there is no point in activating a sibling worker. + None => return, + }; + + // Push the previous task to the local queue if possible or on the + // injector queue otherwise. + if let Err(prev_task) = local_queue.push(prev_task) { + // The local queue is full. Try to move half of it to the + // injector queue; if this fails, just push one task to the + // injector queue. + if let Ok(drain) = local_queue.drain(|_| Bucket::capacity()) { + injector.push_bucket(Bucket::from_iter(drain)); + local_queue.push(prev_task).unwrap(); + } else { + injector.insert_task(prev_task); + } + } + + // A task has been pushed to the local or injector queue: try to + // activate another worker if no worker is currently searching for a + // task. + if pool_manager.searching_worker_count() == 0 { + pool_manager.activate_worker_relaxed(); + } + }) + .expect("Tasks may not be awaken outside executor threads"); +} + +/// Processes all incoming tasks on a worker thread until the `Terminate` signal +/// is received or until it panics. +/// +/// Panics caught in this thread are relayed to the main executor thread. +fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { + let pool_manager = &worker.executor_context.pool_manager; + let injector = &worker.executor_context.injector; + let executor_unparker = &worker.executor_context.executor_unparker; + let local_queue = &worker.local_queue; + let fast_slot = &worker.fast_slot; + + let result = panic::catch_unwind(AssertUnwindSafe(|| { + // Set how long to spin when searching for a task. + const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); + + // Seed a thread RNG with the worker ID. + let rng = Rng::new(id as u64); + + loop { + // Signal barrier: park until notified to continue or terminate. + + // Try to deactivate the worker. + if pool_manager.try_set_worker_inactive(id) { + parker.park(); + // No need to call `begin_worker_search()`: this was done by the + // thread that unparked the worker. + } else if injector.is_empty() { + // This worker could not be deactivated because it was the last + // active worker. In such case, the call to + // `try_set_worker_inactive` establishes a synchronization with + // all threads that pushed tasks to the injector queue but could + // not activate a new worker, which is why some tasks may now be + // visible in the injector queue. + pool_manager.set_all_workers_inactive(); + executor_unparker.unpark(); + parker.park(); + // No need to call `begin_worker_search()`: this was done by the + // thread that unparked the worker. + } else { + pool_manager.begin_worker_search(); + } + + if pool_manager.termination_is_triggered() { + return; + } + + let mut search_start = Instant::now(); + + // Process the tasks one by one. + loop { + // Check the injector queue first. + if let Some(bucket) = injector.pop_bucket() { + let bucket_iter = bucket.into_iter(); + + // There is a _very_ remote possibility that, even though + // the local queue is empty, it has temporarily too little + // spare capacity for the bucket. This could happen if a + // concurrent steal operation was preempted for all the time + // it took to pop and process the remaining tasks and it + // hasn't released the stolen capacity yet. + // + // Unfortunately, we cannot just skip checking the injector + // queue altogether when there isn't enough spare capacity + // in the local queue because this could lead to a race: + // suppose that (1) this thread has earlier pushed tasks + // onto the injector queue, and (2) the stealer has + // processed all stolen tasks before this thread sees the + // capacity restored and at the same time (3) the stealer + // does not yet see the tasks this thread pushed to the + // injector queue; in such scenario, both this thread and + // the stealer thread may park and leave unprocessed tasks + // in the injector queue. + // + // This is the only instance where spinning is used, as the + // probability of this happening is close to zero and the + // complexity of a signaling mechanism (condvar & friends) + // wouldn't carry its weight. + while local_queue.spare_capacity() < bucket_iter.len() {} + + // Since empty buckets are never pushed onto the injector + // queue, we should now have at least one task to process. + local_queue.extend(bucket_iter); + } else { + // The injector queue is empty. Try to steal from active + // siblings. + let mut stealers = pool_manager.shuffled_stealers(Some(id), &rng); + if stealers.all(|stealer| { + stealer + .steal_and_pop(local_queue, |n| n - n / 2) + .map(|(task, _)| { + let prev_task = fast_slot.replace(Some(task)); + assert!(prev_task.is_none()); + }) + .is_err() + }) { + // Give up if unsuccessful for too long. + if (Instant::now() - search_start) > MAX_SEARCH_DURATION { + pool_manager.end_worker_search(); + break; + } + + // Re-try. + continue; + } + } + + // Signal the end of the search so that another worker can be + // activated when a new task is scheduled. + pool_manager.end_worker_search(); + + // Pop tasks from the fast slot or the local queue. + while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { + if pool_manager.termination_is_triggered() { + return; + } + task.run(); + } + + // Resume the search for tasks. + pool_manager.begin_worker_search(); + search_start = Instant::now(); + } + } + })); + + // Propagate the panic, if any. + if let Err(panic) = result { + pool_manager.register_panic(panic); + pool_manager.trigger_termination(); + executor_unparker.unpark(); + } +} diff --git a/asynchronix/src/executor/injector.rs b/asynchronix/src/executor/mt_executor/injector.rs similarity index 100% rename from asynchronix/src/executor/injector.rs rename to asynchronix/src/executor/mt_executor/injector.rs diff --git a/asynchronix/src/executor/pool_manager.rs b/asynchronix/src/executor/mt_executor/pool_manager.rs similarity index 100% rename from asynchronix/src/executor/pool_manager.rs rename to asynchronix/src/executor/mt_executor/pool_manager.rs diff --git a/asynchronix/src/executor/st_executor.rs b/asynchronix/src/executor/st_executor.rs new file mode 100644 index 0000000..ced8c9c --- /dev/null +++ b/asynchronix/src/executor/st_executor.rs @@ -0,0 +1,244 @@ +use std::cell::RefCell; +use std::fmt; +use std::future::Future; +use std::sync::atomic::Ordering; + +use slab::Slab; + +use super::task::{self, CancelToken, Promise, Runnable}; +use super::NEXT_EXECUTOR_ID; + +use crate::macros::scoped_thread_local::scoped_thread_local; + +const QUEUE_MIN_CAPACITY: usize = 32; + +scoped_thread_local!(static EXECUTOR_CONTEXT: ExecutorContext); +scoped_thread_local!(static ACTIVE_TASKS: RefCell>); + +/// A single-threaded `async` executor. +pub(crate) struct Executor { + /// Shared executor data. + context: ExecutorContext, + /// List of tasks that have not completed yet. + active_tasks: RefCell>, +} + +impl Executor { + /// Creates an executor that runs futures on the current thread. + pub(crate) fn new() -> Self { + // Each executor instance has a unique ID inherited by tasks to ensure + // that tasks are scheduled on their parent executor. + let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); + assert!( + executor_id <= usize::MAX / 2, + "too many executors have been instantiated" + ); + + let context = ExecutorContext::new(executor_id); + let active_tasks = RefCell::new(Slab::new()); + + Self { + context, + active_tasks, + } + } + + /// Spawns a task and returns a promise that can be polled to retrieve the + /// task's output. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn(&self, future: T) -> Promise + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.borrow_mut(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (promise, runnable, cancel_token) = + task::spawn(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + let mut queue = self.context.queue.borrow_mut(); + queue.push(runnable); + + promise + } + + /// Spawns a task which output will never be retrieved. + /// + /// This is mostly useful to avoid undue reference counting for futures that + /// return a `()` type. + /// + /// Note that spawned tasks are not executed until [`run()`](Executor::run) + /// is called. + pub(crate) fn spawn_and_forget(&self, future: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + // Book a slot to store the task cancellation token. + let mut active_tasks = self.active_tasks.borrow_mut(); + let task_entry = active_tasks.vacant_entry(); + + // Wrap the future so that it removes its cancel token from the + // executor's list when dropped. + let future = CancellableFuture::new(future, task_entry.key()); + + let (runnable, cancel_token) = + task::spawn_and_forget(future, schedule_task, self.context.executor_id); + + task_entry.insert(cancel_token); + let mut queue = self.context.queue.borrow_mut(); + queue.push(runnable); + } + + /// Execute spawned tasks, blocking until all futures have completed or + /// until the executor reaches a deadlock. + pub(crate) fn run(&mut self) { + ACTIVE_TASKS.set(&self.active_tasks, || { + EXECUTOR_CONTEXT.set(&self.context, || loop { + let task = match self.context.queue.borrow_mut().pop() { + Some(task) => task, + None => break, + }; + + task.run(); + }) + }); + } +} + +impl Drop for Executor { + fn drop(&mut self) { + // Drop all tasks that have not completed. + // + // The executor context must be set because some tasks may schedule + // other tasks when dropped, which requires that the work queue be + // available. + EXECUTOR_CONTEXT.set(&self.context, || { + // Cancel all pending futures. + // + // `ACTIVE_TASKS` is explicitly unset to prevent + // `CancellableFuture::drop()` from trying to remove its own token + // from the list of active tasks as this would result in a nested + // call to `borrow_mut` and thus a panic. This is mainly to stay on + // the safe side: `ACTIVE_TASKS` should not be set anyway, unless + // for some reason the executor runs inside another executor. + ACTIVE_TASKS.unset(|| { + let mut tasks = self.active_tasks.borrow_mut(); + for task in tasks.drain() { + task.cancel(); + } + + // Some of the dropped tasks may have scheduled other tasks that + // were not yet cancelled, preventing them from being dropped + // upon cancellation. This is OK: the scheduled tasks will be + // dropped when the work queue is dropped, and they cannot + // re-schedule one another since all tasks were cancelled. + }); + }); + } +} + +impl fmt::Debug for Executor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Executor").finish_non_exhaustive() + } +} + +/// Shared executor context. +/// +/// This contains all executor resources that can be shared between threads. +struct ExecutorContext { + /// Work queue. + queue: RefCell>, + /// Unique executor identifier inherited by all tasks spawned on this + /// executor instance. + executor_id: usize, +} + +impl ExecutorContext { + /// Creates a new shared executor context. + fn new(executor_id: usize) -> Self { + Self { + queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)), + executor_id, + } + } +} + +/// A `Future` wrapper that removes its cancellation token from the list of +/// active tasks when dropped. +struct CancellableFuture { + inner: T, + cancellation_key: usize, +} + +impl CancellableFuture { + /// Creates a new `CancellableFuture`. + fn new(fut: T, cancellation_key: usize) -> Self { + Self { + inner: fut, + cancellation_key, + } + } +} + +impl Future for CancellableFuture { + type Output = T::Output; + + #[inline(always)] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } + } +} + +impl Drop for CancellableFuture { + fn drop(&mut self) { + // Remove the task from the list of active tasks while the executor is + // running (meaning that `ACTIVE_TASK` is set). Otherwise do nothing and + // let the executor's drop handler do the cleanup. + let _ = ACTIVE_TASKS.map(|active_tasks| { + // Don't use `borrow_mut()` because this function can be called from + // a destructor and should not panic. In the worse case, the cancel + // token will be left in the list of active tasks, which does + // prevents eager task deallocation but does not cause any issue + // otherwise. + if let Ok(mut active_tasks) = active_tasks.try_borrow_mut() { + let _cancel_token = active_tasks.try_remove(self.cancellation_key); + } + }); + } +} + +/// Schedules a `Runnable` from within a worker thread. +/// +/// # Panics +/// +/// This function will panic if called from called outside from the executor +/// work thread or from another executor instance than the one the task for this +/// `Runnable` was spawned on. +fn schedule_task(task: Runnable, executor_id: usize) { + EXECUTOR_CONTEXT + .map(|context| { + // Check that this task was indeed spawned on this executor. + assert_eq!( + executor_id, context.executor_id, + "Tasks must be awaken on the same executor they are spawned on" + ); + + let mut queue = context.queue.borrow_mut(); + queue.push(task); + }) + .expect("Tasks may not be awaken outside executor threads"); +} diff --git a/asynchronix/src/executor/tests.rs b/asynchronix/src/executor/tests.rs deleted file mode 100644 index 7f63b46..0000000 --- a/asynchronix/src/executor/tests.rs +++ /dev/null @@ -1,140 +0,0 @@ -use futures_channel::{mpsc, oneshot}; -use futures_util::StreamExt; - -use super::*; - -/// An object that runs an arbitrary closure when dropped. -struct RunOnDrop { - drop_fn: Option, -} -impl RunOnDrop { - /// Creates a new `RunOnDrop`. - fn new(drop_fn: F) -> Self { - Self { - drop_fn: Some(drop_fn), - } - } -} -impl Drop for RunOnDrop { - fn drop(&mut self) { - self.drop_fn.take().map(|f| f()); - } -} - -#[test] -fn executor_deadlock() { - const NUM_THREADS: usize = 3; - - let (_sender1, receiver1) = oneshot::channel::<()>(); - let (_sender2, receiver2) = oneshot::channel::<()>(); - - let mut executor = Executor::new(NUM_THREADS); - static LAUNCH_COUNT: AtomicUsize = AtomicUsize::new(0); - static COMPLETION_COUNT: AtomicUsize = AtomicUsize::new(0); - - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver2.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver1.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - - executor.run(); - // Check that the executor returns on deadlock, i.e. none of the task has - // completed. - assert_eq!(LAUNCH_COUNT.load(Ordering::Relaxed), 2); - assert_eq!(COMPLETION_COUNT.load(Ordering::Relaxed), 0); -} - -#[test] -fn executor_deadlock_st() { - const NUM_THREADS: usize = 1; - - let (_sender1, receiver1) = oneshot::channel::<()>(); - let (_sender2, receiver2) = oneshot::channel::<()>(); - - let mut executor = Executor::new(NUM_THREADS); - static LAUNCH_COUNT: AtomicUsize = AtomicUsize::new(0); - static COMPLETION_COUNT: AtomicUsize = AtomicUsize::new(0); - - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver2.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - executor.spawn_and_forget(async move { - LAUNCH_COUNT.fetch_add(1, Ordering::Relaxed); - let _ = receiver1.await; - COMPLETION_COUNT.fetch_add(1, Ordering::Relaxed); - }); - - executor.run(); - // Check that the executor returnes on deadlock, i.e. none of the task has - // completed. - assert_eq!(LAUNCH_COUNT.load(Ordering::Relaxed), 2); - assert_eq!(COMPLETION_COUNT.load(Ordering::Relaxed), 0); -} - -#[test] -fn executor_drop_cycle() { - const NUM_THREADS: usize = 3; - - let (sender1, mut receiver1) = mpsc::channel(2); - let (sender2, mut receiver2) = mpsc::channel(2); - let (sender3, mut receiver3) = mpsc::channel(2); - - let mut executor = Executor::new(NUM_THREADS); - static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); - - // Spawn 3 tasks that wake one another when dropped. - executor.spawn_and_forget({ - let mut sender2 = sender2.clone(); - let mut sender3 = sender3.clone(); - - async move { - let _guard = RunOnDrop::new(move || { - let _ = sender2.try_send(()); - let _ = sender3.try_send(()); - DROP_COUNT.fetch_add(1, Ordering::Relaxed); - }); - let _ = receiver1.next().await; - } - }); - executor.spawn_and_forget({ - let mut sender1 = sender1.clone(); - let mut sender3 = sender3.clone(); - - async move { - let _guard = RunOnDrop::new(move || { - let _ = sender1.try_send(()); - let _ = sender3.try_send(()); - DROP_COUNT.fetch_add(1, Ordering::Relaxed); - }); - let _ = receiver2.next().await; - } - }); - executor.spawn_and_forget({ - let mut sender1 = sender1.clone(); - let mut sender2 = sender2.clone(); - - async move { - let _guard = RunOnDrop::new(move || { - let _ = sender1.try_send(()); - let _ = sender2.try_send(()); - DROP_COUNT.fetch_add(1, Ordering::Relaxed); - }); - let _ = receiver3.next().await; - } - }); - - executor.run(); - - // Make sure that all tasks are eventually dropped even though each task - // wakes the others when dropped. - drop(executor); - assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3); -} diff --git a/asynchronix/src/executor/worker.rs b/asynchronix/src/executor/worker.rs deleted file mode 100644 index b815276..0000000 --- a/asynchronix/src/executor/worker.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::cell::Cell; -use std::sync::Arc; - -use super::task::Runnable; - -use super::ExecutorContext; -use super::LocalQueue; - -/// A local worker with access to global executor resources. -pub(crate) struct Worker { - pub(super) local_queue: LocalQueue, - pub(super) fast_slot: Cell>, - pub(super) executor_context: Arc, -} - -impl Worker { - /// Creates a new worker. - pub(super) fn new(local_queue: LocalQueue, executor_context: Arc) -> Self { - Self { - local_queue, - fast_slot: Cell::new(None), - executor_context, - } - } -} diff --git a/asynchronix/src/macros/scoped_thread_local.rs b/asynchronix/src/macros/scoped_thread_local.rs index b60b287..587a8f8 100644 --- a/asynchronix/src/macros/scoped_thread_local.rs +++ b/asynchronix/src/macros/scoped_thread_local.rs @@ -7,19 +7,18 @@ use std::ptr; /// Declare a new thread-local storage scoped key of type `ScopedKey`. /// /// This is based on the `scoped-tls` crate, with slight modifications, such as -/// the use of the newly available `const` qualifier for TLS. +/// the addition of a `ScopedLocalKey::unset` method and the use of a `map` +/// method that returns `Option::None` when the value is not set, rather than +/// panicking as `with` would. macro_rules! scoped_thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => ( $(#[$attrs])* $vis static $name: $crate::macros::scoped_thread_local::ScopedLocalKey<$ty> - = $crate::macros::scoped_thread_local::ScopedLocalKey { - inner: { - thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { - std::cell::Cell::new(::std::ptr::null()) - }); - &FOO - }, - _marker: ::std::marker::PhantomData, + = unsafe { + ::std::thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { + ::std::cell::Cell::new(::std::ptr::null()) + }); + $crate::macros::scoped_thread_local::ScopedLocalKey::new(&FOO) }; ) } @@ -28,13 +27,24 @@ pub(crate) use scoped_thread_local; /// Type representing a thread local storage key corresponding to a reference /// to the type parameter `T`. pub(crate) struct ScopedLocalKey { - pub(crate) inner: &'static LocalKey>, - pub(crate) _marker: marker::PhantomData, + inner: &'static LocalKey>, + _marker: marker::PhantomData, } unsafe impl Sync for ScopedLocalKey {} impl ScopedLocalKey { + #[doc(hidden)] + /// # Safety + /// + /// Should only be called through the public macro. + pub(crate) const unsafe fn new(inner: &'static LocalKey>) -> Self { + Self { + inner, + _marker: marker::PhantomData, + } + } + /// Inserts a value into this scoped thread local storage slot for the /// duration of a closure. pub(crate) fn set(&'static self, t: &T, f: F) -> R diff --git a/asynchronix/src/rpc/generic_server.rs b/asynchronix/src/rpc/generic_server.rs index 6f54a93..70032c4 100644 --- a/asynchronix/src/rpc/generic_server.rs +++ b/asynchronix/src/rpc/generic_server.rs @@ -14,8 +14,8 @@ use super::codegen::simulation::*; /// Transport-independent server implementation. /// -/// This implementation implements the protobuf services without any -/// transport-specific management. +/// This implements the protobuf services without any transport-specific +/// management. pub(crate) struct GenericServer { sim_gen: F, sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>, diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index a8527ca..f22e1fc 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -25,15 +25,22 @@ impl SimInit { Self::with_num_threads(num_cpus::get()) } - /// Creates a builder for a multithreaded simulation running on the - /// specified number of threads. + /// Creates a builder for a simulation running on the specified number of + /// threads. + /// + /// Note that the number of worker threads is automatically constrained to + /// be between 1 and `usize::BITS` (inclusive). pub fn with_num_threads(num_threads: usize) -> Self { - // The current executor's implementation caps the number of thread to 64 - // on 64-bit systems and 32 on 32-bit systems. - let num_threads = num_threads.min(usize::BITS as usize); + let num_threads = num_threads.clamp(1, usize::BITS as usize); + + let executor = if num_threads == 1 { + Executor::new_single_threaded() + } else { + Executor::new_multi_threaded(num_threads) + }; Self { - executor: Executor::new(num_threads), + executor, scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time: SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)), clock: Box::new(NoClock::new()), From 59d2af51ba93d2cebdf9139fc9c1e3815d9ed6c8 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 27 May 2024 23:21:26 +0200 Subject: [PATCH 2/2] Expose the Protobuf simulation service to WASM/JS --- .github/workflows/ci.yml | 29 +++-- asynchronix/Cargo.toml | 12 +- asynchronix/build.rs | 7 +- asynchronix/src/rpc.rs | 7 +- .../src/rpc/api/custom_transport.proto | 50 -------- asynchronix/src/rpc/api/simulation.proto | 17 +++ asynchronix/src/rpc/codegen.rs | 3 +- .../src/rpc/codegen/custom_transport.rs | 111 ------------------ asynchronix/src/rpc/codegen/simulation.rs | 38 ++++++ asynchronix/src/rpc/grpc.rs | 36 +++--- ...eneric_server.rs => simulation_service.rs} | 102 +++++++++------- asynchronix/src/rpc/wasm.rs | 82 +++++++++++++ 12 files changed, 252 insertions(+), 242 deletions(-) delete mode 100644 asynchronix/src/rpc/api/custom_transport.proto delete mode 100644 asynchronix/src/rpc/codegen/custom_transport.rs rename asynchronix/src/rpc/{generic_server.rs => simulation_service.rs} (89%) create mode 100644 asynchronix/src/rpc/wasm.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1821e5a..b8660b9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,22 @@ jobs: toolchain: ${{ matrix.rust }} - name: Run cargo check - run: cargo check --features="rpc grpc-server" + run: cargo check --features="rpc grpc-service" + + build-wasm: + name: Build wasm32 + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v3 + + - name: Install toolchain + uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + + - name: Run cargo build (wasm) + run: cargo build --target wasm32-unknown-unknown --features="rpc" test: name: Test suite @@ -41,7 +56,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run cargo test - run: cargo test --features="rpc grpc-server" + run: cargo test --features="rpc grpc-service" loom-dry-run: name: Loom dry run @@ -54,7 +69,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Dry-run cargo test (Loom) - run: cargo test --no-run --tests --features="rpc grpc-server" + run: cargo test --no-run --tests --features="rpc grpc-service" env: RUSTFLAGS: --cfg asynchronix_loom @@ -71,12 +86,12 @@ jobs: components: miri - name: Run cargo miri tests (single-threaded executor) - run: cargo miri test --tests --lib --features="rpc grpc-server" + run: cargo miri test --tests --lib --features="rpc grpc-service" env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - name: Run cargo miri tests (multi-threaded executor) - run: cargo miri test --tests --lib --features="rpc grpc-server" + run: cargo miri test --tests --lib --features="rpc grpc-service" env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 @@ -134,7 +149,7 @@ jobs: run: cargo fmt --all -- --check - name: Run cargo clippy - run: cargo clippy --features="rpc grpc-server" + run: cargo clippy --features="rpc grpc-service" docs: name: Docs @@ -147,4 +162,4 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run cargo doc - run: cargo doc --no-deps --features="rpc grpc-server" --document-private-items + run: cargo doc --no-deps --features="rpc grpc-service" --document-private-items diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 981ce35..2b978dd 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -26,8 +26,10 @@ autotests = false rpc = ["dep:rmp-serde", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"] # This feature forces protobuf/gRPC code (re-)generation. rpc-codegen = ["dep:tonic-build"] -# gRPC server. -grpc-server = ["rpc", "dep:tokio"] +# gRPC service. +grpc-service = ["rpc", "dep:tokio" , "tonic/transport"] +# wasm service. +wasm-service = ["rpc", "dep:wasm-bindgen"] # API-unstable public exports meant for external test/benchmarking; development only. dev-hooks = [] # Logging of performance-related statistics; development only. @@ -58,10 +60,12 @@ prost-types = { version = "0.12", optional = true } rmp-serde = { version = "1.1", optional = true } serde = { version = "1", optional = true } -# gRPC dependencies. +# gRPC service dependencies. tokio = { version = "1.0", features=["net"], optional = true } -tonic = { version = "0.11", optional = true } +tonic = { version = "0.11", default-features = false, features=["codegen", "prost"], optional = true } +# WASM service dependencies. +wasm-bindgen = { version = "0.2", optional = true } [target.'cfg(asynchronix_loom)'.dependencies] loom = "0.5" diff --git a/asynchronix/build.rs b/asynchronix/build.rs index fb7492c..d2bb66b 100644 --- a/asynchronix/build.rs +++ b/asynchronix/build.rs @@ -7,14 +7,11 @@ fn main() -> Result<(), Box> { .build_client(false) .out_dir("src/rpc/codegen/"); - #[cfg(all(feature = "rpc-codegen", not(feature = "grpc-server")))] + #[cfg(all(feature = "rpc-codegen", not(feature = "grpc-service")))] let builder = builder.build_server(false); #[cfg(feature = "rpc-codegen")] - builder.compile( - &["simulation.proto", "custom_transport.proto"], - &["src/rpc/api/"], - )?; + builder.compile(&["simulation.proto"], &["src/rpc/api/"])?; Ok(()) } diff --git a/asynchronix/src/rpc.rs b/asynchronix/src/rpc.rs index d67e3d0..9506bfa 100644 --- a/asynchronix/src/rpc.rs +++ b/asynchronix/src/rpc.rs @@ -2,9 +2,12 @@ mod codegen; mod endpoint_registry; -mod generic_server; -#[cfg(feature = "grpc-server")] +#[cfg(feature = "grpc-service")] pub mod grpc; mod key_registry; +mod simulation_service; +#[cfg(feature = "wasm-service")] +pub mod wasm; pub use endpoint_registry::EndpointRegistry; +pub use simulation_service::SimulationService; diff --git a/asynchronix/src/rpc/api/custom_transport.proto b/asynchronix/src/rpc/api/custom_transport.proto deleted file mode 100644 index 46aefb4..0000000 --- a/asynchronix/src/rpc/api/custom_transport.proto +++ /dev/null @@ -1,50 +0,0 @@ -// Additional types for transport implementations which, unlike gRPC, do not -// support auto-generation from the `Simulation` service description. - -syntax = "proto3"; -package custom_transport; - -import "simulation.proto"; - -enum ServerErrorCode { - UNKNOWN_REQUEST = 0; - EMPTY_REQUEST = 1; -} - -message ServerError { - ServerErrorCode code = 1; - string message = 2; -} - -message AnyRequest { - oneof request { // Expects exactly 1 variant. - simulation.InitRequest init_request = 1; - simulation.TimeRequest time_request = 2; - simulation.StepRequest step_request = 3; - simulation.StepUntilRequest step_until_request = 4; - simulation.ScheduleEventRequest schedule_event_request = 5; - simulation.CancelEventRequest cancel_event_request = 6; - simulation.ProcessEventRequest process_event_request = 7; - simulation.ProcessQueryRequest process_query_request = 8; - simulation.ReadEventsRequest read_events_request = 9; - simulation.OpenSinkRequest open_sink_request = 10; - simulation.CloseSinkRequest close_sink_request = 11; - } -} - -message AnyReply { - oneof reply { // Contains exactly 1 variant. - simulation.InitReply init_reply = 1; - simulation.TimeReply time_reply = 2; - simulation.StepReply step_reply = 3; - simulation.StepUntilReply step_until_reply = 4; - simulation.ScheduleEventReply schedule_event_reply = 5; - simulation.CancelEventReply cancel_event_reply = 6; - simulation.ProcessEventReply process_event_reply = 7; - simulation.ProcessQueryReply process_query_reply = 8; - simulation.ReadEventsReply read_events_reply = 9; - simulation.OpenSinkReply open_sink_reply = 10; - simulation.CloseSinkReply close_sink_reply = 11; - ServerError error = 100; - } -} diff --git a/asynchronix/src/rpc/api/simulation.proto b/asynchronix/src/rpc/api/simulation.proto index b12d593..8aa9f68 100644 --- a/asynchronix/src/rpc/api/simulation.proto +++ b/asynchronix/src/rpc/api/simulation.proto @@ -146,6 +146,23 @@ message CloseSinkReply { } } +// A convenience message type for custom transport implementation. +message AnyRequest { + oneof request { // Expects exactly 1 variant. + InitRequest init_request = 1; + TimeRequest time_request = 2; + StepRequest step_request = 3; + StepUntilRequest step_until_request = 4; + ScheduleEventRequest schedule_event_request = 5; + CancelEventRequest cancel_event_request = 6; + ProcessEventRequest process_event_request = 7; + ProcessQueryRequest process_query_request = 8; + ReadEventsRequest read_events_request = 9; + OpenSinkRequest open_sink_request = 10; + CloseSinkRequest close_sink_request = 11; + } +} + service Simulation { rpc Init(InitRequest) returns (InitReply); rpc Time(TimeRequest) returns (TimeReply); diff --git a/asynchronix/src/rpc/codegen.rs b/asynchronix/src/rpc/codegen.rs index 3221cbc..c98125f 100644 --- a/asynchronix/src/rpc/codegen.rs +++ b/asynchronix/src/rpc/codegen.rs @@ -1,7 +1,6 @@ #![allow(unreachable_pub)] #![allow(clippy::enum_variant_names)] +#![allow(missing_docs)] -#[rustfmt::skip] -pub(crate) mod custom_transport; #[rustfmt::skip] pub(crate) mod simulation; diff --git a/asynchronix/src/rpc/codegen/custom_transport.rs b/asynchronix/src/rpc/codegen/custom_transport.rs deleted file mode 100644 index 61eac9d..0000000 --- a/asynchronix/src/rpc/codegen/custom_transport.rs +++ /dev/null @@ -1,111 +0,0 @@ -// This file is @generated by prost-build. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServerError { - #[prost(enumeration = "ServerErrorCode", tag = "1")] - pub code: i32, - #[prost(string, tag = "2")] - pub message: ::prost::alloc::string::String, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct AnyRequest { - /// Expects exactly 1 variant. - #[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")] - pub request: ::core::option::Option, -} -/// Nested message and enum types in `AnyRequest`. -pub mod any_request { - /// Expects exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Request { - #[prost(message, tag = "1")] - InitRequest(super::super::simulation::InitRequest), - #[prost(message, tag = "2")] - TimeRequest(super::super::simulation::TimeRequest), - #[prost(message, tag = "3")] - StepRequest(super::super::simulation::StepRequest), - #[prost(message, tag = "4")] - StepUntilRequest(super::super::simulation::StepUntilRequest), - #[prost(message, tag = "5")] - ScheduleEventRequest(super::super::simulation::ScheduleEventRequest), - #[prost(message, tag = "6")] - CancelEventRequest(super::super::simulation::CancelEventRequest), - #[prost(message, tag = "7")] - ProcessEventRequest(super::super::simulation::ProcessEventRequest), - #[prost(message, tag = "8")] - ProcessQueryRequest(super::super::simulation::ProcessQueryRequest), - #[prost(message, tag = "9")] - ReadEventsRequest(super::super::simulation::ReadEventsRequest), - #[prost(message, tag = "10")] - OpenSinkRequest(super::super::simulation::OpenSinkRequest), - #[prost(message, tag = "11")] - CloseSinkRequest(super::super::simulation::CloseSinkRequest), - } -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct AnyReply { - /// Contains exactly 1 variant. - #[prost(oneof = "any_reply::Reply", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100")] - pub reply: ::core::option::Option, -} -/// Nested message and enum types in `AnyReply`. -pub mod any_reply { - /// Contains exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Reply { - #[prost(message, tag = "1")] - InitReply(super::super::simulation::InitReply), - #[prost(message, tag = "2")] - TimeReply(super::super::simulation::TimeReply), - #[prost(message, tag = "3")] - StepReply(super::super::simulation::StepReply), - #[prost(message, tag = "4")] - StepUntilReply(super::super::simulation::StepUntilReply), - #[prost(message, tag = "5")] - ScheduleEventReply(super::super::simulation::ScheduleEventReply), - #[prost(message, tag = "6")] - CancelEventReply(super::super::simulation::CancelEventReply), - #[prost(message, tag = "7")] - ProcessEventReply(super::super::simulation::ProcessEventReply), - #[prost(message, tag = "8")] - ProcessQueryReply(super::super::simulation::ProcessQueryReply), - #[prost(message, tag = "9")] - ReadEventsReply(super::super::simulation::ReadEventsReply), - #[prost(message, tag = "10")] - OpenSinkReply(super::super::simulation::OpenSinkReply), - #[prost(message, tag = "11")] - CloseSinkReply(super::super::simulation::CloseSinkReply), - #[prost(message, tag = "100")] - Error(super::ServerError), - } -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum ServerErrorCode { - UnknownRequest = 0, - EmptyRequest = 1, -} -impl ServerErrorCode { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - ServerErrorCode::UnknownRequest => "UNKNOWN_REQUEST", - ServerErrorCode::EmptyRequest => "EMPTY_REQUEST", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "UNKNOWN_REQUEST" => Some(Self::UnknownRequest), - "EMPTY_REQUEST" => Some(Self::EmptyRequest), - _ => None, - } - } -} diff --git a/asynchronix/src/rpc/codegen/simulation.rs b/asynchronix/src/rpc/codegen/simulation.rs index aefb660..672aed1 100644 --- a/asynchronix/src/rpc/codegen/simulation.rs +++ b/asynchronix/src/rpc/codegen/simulation.rs @@ -332,6 +332,44 @@ pub mod close_sink_reply { Error(super::Error), } } +/// A convenience message type for custom transport implementation. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AnyRequest { + /// Expects exactly 1 variant. + #[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")] + pub request: ::core::option::Option, +} +/// Nested message and enum types in `AnyRequest`. +pub mod any_request { + /// Expects exactly 1 variant. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Request { + #[prost(message, tag = "1")] + InitRequest(super::InitRequest), + #[prost(message, tag = "2")] + TimeRequest(super::TimeRequest), + #[prost(message, tag = "3")] + StepRequest(super::StepRequest), + #[prost(message, tag = "4")] + StepUntilRequest(super::StepUntilRequest), + #[prost(message, tag = "5")] + ScheduleEventRequest(super::ScheduleEventRequest), + #[prost(message, tag = "6")] + CancelEventRequest(super::CancelEventRequest), + #[prost(message, tag = "7")] + ProcessEventRequest(super::ProcessEventRequest), + #[prost(message, tag = "8")] + ProcessQueryRequest(super::ProcessQueryRequest), + #[prost(message, tag = "9")] + ReadEventsRequest(super::ReadEventsRequest), + #[prost(message, tag = "10")] + OpenSinkRequest(super::OpenSinkRequest), + #[prost(message, tag = "11")] + CloseSinkRequest(super::CloseSinkRequest), + } +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum ErrorCode { diff --git a/asynchronix/src/rpc/grpc.rs b/asynchronix/src/rpc/grpc.rs index 94809e9..02b4bf5 100644 --- a/asynchronix/src/rpc/grpc.rs +++ b/asynchronix/src/rpc/grpc.rs @@ -1,4 +1,4 @@ -//! GRPC simulation server. +//! gRPC simulation service. use std::net::SocketAddr; use std::sync::Mutex; @@ -10,12 +10,12 @@ use crate::rpc::EndpointRegistry; use crate::simulation::SimInit; use super::codegen::simulation::*; -use super::generic_server::GenericServer; +use super::simulation_service::SimulationService; -/// Runs a GRPC simulation server. +/// Runs a gRPC simulation server. /// /// The first argument is a closure that is called every time the simulation is -/// started by the remote client. It must create a new `SimInit` object +/// (re)started by the remote client. It must create a new `SimInit` object /// complemented by a registry that exposes the public event and query /// interface. pub fn run(sim_gen: F, addr: SocketAddr) -> Result<(), Box> @@ -27,7 +27,7 @@ where .enable_io() .build()?; - let sim_manager = GrpcServer::new(sim_gen); + let sim_manager = GrpcSimulationService::new(sim_gen); rt.block_on(async move { Server::builder() @@ -39,33 +39,27 @@ where }) } -struct GrpcServer -where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, -{ - inner: Mutex>, +struct GrpcSimulationService { + inner: Mutex, } -impl GrpcServer -where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, -{ - fn new(sim_gen: F) -> Self { +impl GrpcSimulationService { + fn new(sim_gen: F) -> Self + where + F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + { Self { - inner: Mutex::new(GenericServer::new(sim_gen)), + inner: Mutex::new(SimulationService::new(sim_gen)), } } - fn inner(&self) -> MutexGuard<'_, GenericServer> { + fn inner(&self) -> MutexGuard<'_, SimulationService> { self.inner.lock().unwrap() } } #[tonic::async_trait] -impl simulation_server::Simulation for GrpcServer -where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, -{ +impl simulation_server::Simulation for GrpcSimulationService { async fn init(&self, request: Request) -> Result, Status> { let request = request.into_inner(); diff --git a/asynchronix/src/rpc/generic_server.rs b/asynchronix/src/rpc/simulation_service.rs similarity index 89% rename from asynchronix/src/rpc/generic_server.rs rename to asynchronix/src/rpc/simulation_service.rs index 70032c4..9c53001 100644 --- a/asynchronix/src/rpc/generic_server.rs +++ b/asynchronix/src/rpc/simulation_service.rs @@ -1,3 +1,5 @@ +use std::error; +use std::fmt; use std::time::Duration; use bytes::Buf; @@ -9,86 +11,87 @@ use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::rpc::EndpointRegistry; use crate::simulation::{SimInit, Simulation}; -use super::codegen::custom_transport::*; use super::codegen::simulation::*; -/// Transport-independent server implementation. +/// Protobuf-based simulation manager. /// -/// This implements the protobuf services without any transport-specific -/// management. -pub(crate) struct GenericServer { - sim_gen: F, +/// A `SimulationService` enables the management of the lifecycle of a +/// simulation, including creating a +/// [`Simulation`](crate::simulation::Simulation), invoking its methods and +/// instantiating a new simulation. +/// +/// Its methods map the various RPC service methods defined in +/// `simulation.proto`. +pub struct SimulationService { + sim_gen: Box (SimInit, EndpointRegistry) + Send + 'static>, sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>, } -impl GenericServer -where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, -{ - /// Creates a new `GenericServer` without any active simulation. - pub(crate) fn new(sim_gen: F) -> Self { +impl SimulationService { + /// Creates a new `SimulationService` without any active simulation. + /// + /// The argument is a closure that is called every time the simulation is + /// (re)started by the remote client. It must create a new `SimInit` object + /// complemented by a registry that exposes the public event and query + /// interface. + pub fn new(sim_gen: F) -> Self + where + F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + { Self { - sim_gen, + sim_gen: Box::new(sim_gen), sim_context: None, } } - /// Processes an encoded `AnyRequest` message and returns an encoded - /// `AnyReply`. - #[allow(dead_code)] - pub(crate) fn service_request(&mut self, request_buf: B) -> Vec + /// Processes an encoded `AnyRequest` message and returns an encoded reply. + pub fn process_request(&mut self, request_buf: B) -> Result, InvalidRequest> where B: Buf, { - let reply = match AnyRequest::decode(request_buf) { + match AnyRequest::decode(request_buf) { Ok(AnyRequest { request: Some(req) }) => match req { any_request::Request::InitRequest(request) => { - any_reply::Reply::InitReply(self.init(request)) + Ok(self.init(request).encode_to_vec()) } any_request::Request::TimeRequest(request) => { - any_reply::Reply::TimeReply(self.time(request)) + Ok(self.time(request).encode_to_vec()) } any_request::Request::StepRequest(request) => { - any_reply::Reply::StepReply(self.step(request)) + Ok(self.step(request).encode_to_vec()) } any_request::Request::StepUntilRequest(request) => { - any_reply::Reply::StepUntilReply(self.step_until(request)) + Ok(self.step_until(request).encode_to_vec()) } any_request::Request::ScheduleEventRequest(request) => { - any_reply::Reply::ScheduleEventReply(self.schedule_event(request)) + Ok(self.schedule_event(request).encode_to_vec()) } any_request::Request::CancelEventRequest(request) => { - any_reply::Reply::CancelEventReply(self.cancel_event(request)) + Ok(self.cancel_event(request).encode_to_vec()) } any_request::Request::ProcessEventRequest(request) => { - any_reply::Reply::ProcessEventReply(self.process_event(request)) + Ok(self.process_event(request).encode_to_vec()) } any_request::Request::ProcessQueryRequest(request) => { - any_reply::Reply::ProcessQueryReply(self.process_query(request)) + Ok(self.process_query(request).encode_to_vec()) } any_request::Request::ReadEventsRequest(request) => { - any_reply::Reply::ReadEventsReply(self.read_events(request)) + Ok(self.read_events(request).encode_to_vec()) } any_request::Request::OpenSinkRequest(request) => { - any_reply::Reply::OpenSinkReply(self.open_sink(request)) + Ok(self.open_sink(request).encode_to_vec()) } any_request::Request::CloseSinkRequest(request) => { - any_reply::Reply::CloseSinkReply(self.close_sink(request)) + Ok(self.close_sink(request).encode_to_vec()) } }, - Ok(AnyRequest { request: None }) => any_reply::Reply::Error(ServerError { - code: ServerErrorCode::EmptyRequest as i32, - message: "the message did not contain any request".to_string(), + Ok(AnyRequest { request: None }) => Err(InvalidRequest { + description: "the message did not contain any request".to_string(), }), - Err(err) => any_reply::Reply::Error(ServerError { - code: ServerErrorCode::UnknownRequest as i32, - message: format!("bad request: {}", err), + Err(err) => Err(InvalidRequest { + description: format!("bad request: {}", err), }), - }; - - let reply = AnyReply { reply: Some(reply) }; - - reply.encode_to_vec() + } } /// Initialize a simulation with the provided time. @@ -606,6 +609,25 @@ where } } +impl fmt::Debug for SimulationService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SimulationService").finish_non_exhaustive() + } +} + +#[derive(Clone, Debug)] +pub struct InvalidRequest { + description: String, +} + +impl fmt::Display for InvalidRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.description) + } +} + +impl error::Error for InvalidRequest {} + /// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. /// /// This will fail if the time is outside the protobuf-specified range for diff --git a/asynchronix/src/rpc/wasm.rs b/asynchronix/src/rpc/wasm.rs new file mode 100644 index 0000000..5526e6e --- /dev/null +++ b/asynchronix/src/rpc/wasm.rs @@ -0,0 +1,82 @@ +//! WASM simulation service. +//! +//! This module provides [`WasmSimulationService`], a thin wrapper over a +//! [`SimulationService`] that can be use from JavaScript. +//! +//! Although it is readily possible to use a +//! [`Simulation`](crate::simulation::Simulation) object from WASM, +//! [`WasmSimulationService`] goes further by exposing the complete simulation +//! API to JavaScript through protobuf. +//! +//! Keep in mind that WASM only supports single-threaded execution and therefore +//! any simulation bench compiled to WASM should instantiate simulations with +//! either [`SimInit::new()`](crate::simulation::SimInit::new) or +//! [`SimInit::with_num_threads(1)`](crate::simulation::SimInit::with_num_threads), +//! failing which the simulation will panic upon initialization. +//! +//! [`WasmSimulationService`] is exported to the JavaScript namespace as +//! `SimulationService`, and [`WasmSimulationService::process_request`] as +//! `SimulationService.processRequest`. + +use wasm_bindgen::prelude::*; + +use super::{EndpointRegistry, SimulationService}; +use crate::simulation::SimInit; + +/// A simulation service that can be used from JavaScript. +/// +/// This would typically be used by implementing a `run` function in Rust and +/// export it to WASM: +/// +/// ```no_run +/// #[wasm_bindgen] +/// pub fn run() -> WasmSimulationService { +/// WasmSimulationService::new(my_custom_bench_generator) +/// } +/// ``` +/// +/// which can then be used on the JS side to create a `SimulationService` as a +/// JS object, e.g. with: +/// +/// ```js +/// const simu = run(); +/// +/// // ...build a protobuf request and encode it as a `Uint8Array`... +/// +/// const reply = simu.processRequest(myRequest); +/// +/// // ...decode the protobuf reply... +/// ``` +#[wasm_bindgen(js_name = SimulationService)] +#[derive(Debug)] +pub struct WasmSimulationService(SimulationService); + +#[wasm_bindgen(js_class = SimulationService)] +impl WasmSimulationService { + /// Processes a protobuf-encoded `AnyRequest` message and returns a + /// protobuf-encoded reply. + /// + /// For the Protocol Buffer definitions, see the `simulation.proto` file. + #[wasm_bindgen(js_name = processRequest)] + pub fn process_request(&mut self, request: &[u8]) -> Result, JsError> { + self.0 + .process_request(request) + .map(|reply| reply.into_boxed_slice()) + .map_err(|e| JsError::new(&e.to_string())) + } +} + +impl WasmSimulationService { + /// Creates a new `SimulationService` without any active simulation. + /// + /// The argument is a closure that is called every time the simulation is + /// (re)started by the remote client. It must create a new `SimInit` object + /// complemented by a registry that exposes the public event and query + /// interface. + pub fn new(sim_gen: F) -> Self + where + F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + { + Self(SimulationService::new(sim_gen)) + } +}