From af3d68e76f3612d1a34017d092e139f2c0516b77 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Tue, 7 May 2024 01:37:47 +0200 Subject: [PATCH] Force the waker VTable to be uniquely instantiated From Rust 1.78, `Waker::will_wake` tests equality by comparing the VTable pointers rather than the content of the VTable. Unfortunately, this exposes some instability in the code generation which sometimes causes several VTables to be instantiated in memory for the same generic parameters. This can in turn defeat `Waker::will_wake` if e.g. `Waker::clone` and `Waker::wake_by_*` end up with different pointers. The problemt is hopefully addressed by preventing inlining of the VTable generation function. A test has been added to try to detect regression, though the test may not be 100% reliable. --- .github/workflows/loom.yml | 6 ++- asynchronix/src/executor/task.rs | 40 +++++++++++++---- asynchronix/src/executor/task/runnable.rs | 4 +- .../src/executor/task/tests/general.rs | 43 +++++++++++++++++++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index e739386..016e9a0 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -10,8 +10,10 @@ on: - 'asynchronix/src/executor/task.rs' - 'asynchronix/src/executor/task/**' - 'asynchronix/src/loom_exports.rs' - - 'asynchronix/src/model/ports/broadcaster.rs' - - 'asynchronix/src/model/ports/broadcaster/**' + - 'asynchronix/src/ports/output/broadcaster.rs' + - 'asynchronix/src/ports/output/broadcaster/**' + - 'asynchronix/src/ports/source/broadcaster.rs' + - 'asynchronix/src/ports/source/broadcaster/**' - 'asynchronix/src/util/slot.rs' - 'asynchronix/src/util/sync_cell.rs' diff --git a/asynchronix/src/executor/task.rs b/asynchronix/src/executor/task.rs index 3b8d1e3..9236de9 100644 --- a/asynchronix/src/executor/task.rs +++ b/asynchronix/src/executor/task.rs @@ -125,13 +125,6 @@ where S: Fn(Runnable, T) + Send + Sync + 'static, T: Clone + Send + Sync + 'static, { - const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_waker, - Self::wake_by_val, - Self::wake_by_ref, - Self::drop_waker, - ); - /// Clones a waker. unsafe fn clone_waker(ptr: *const ()) -> RawWaker { let this = &*(ptr as *const Self); @@ -141,7 +134,7 @@ where panic!("Attack of the clones: the waker was cloned too many times"); } - RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) + RawWaker::new(ptr, raw_waker_vtable::()) } /// Wakes the task by value. @@ -287,6 +280,37 @@ where } } +/// Returns a reference to the waker's virtual table. +/// +/// Unfortunately, Rust will sometimes create multiple memory instances of the +/// virtual table for the same generic parameters, which defeats +/// `Waker::will_wake` as the latter tests the pointers to the virtual tables +/// for equality. +/// +/// Forcing the function to be inlined appears to solve this problem, but we may +/// want to investigate more robust methods. Tokio has [switched][1] to a single +/// non-generic virtual table declared as `static`, which then delegates each +/// call with another virtual call. This does ensure that `Waker::will_wake` +/// will always work, but the double indirection is a bit unfortunate and its +/// cost would need to be evaluated. +/// +/// [1]: https://github.com/tokio-rs/tokio/pull/5213 +#[inline(never)] +fn raw_waker_vtable() -> &'static RawWakerVTable +where + F: Future + Send + 'static, + F::Output: Send + 'static, + S: Fn(Runnable, T) + Send + Sync + 'static, + T: Clone + Send + Sync + 'static, +{ + &RawWakerVTable::new( + Task::::clone_waker, + Task::::wake_by_val, + Task::::wake_by_ref, + Task::::drop_waker, + ) +} + /// Spawns a task. /// /// An arbitrary tag can be attached to the task, a clone of which will be diff --git a/asynchronix/src/executor/task/runnable.rs b/asynchronix/src/executor/task/runnable.rs index d5162c6..59b719a 100644 --- a/asynchronix/src/executor/task/runnable.rs +++ b/asynchronix/src/executor/task/runnable.rs @@ -11,7 +11,7 @@ use crate::loom_exports::debug_or_loom_assert; use crate::loom_exports::sync::atomic::{self, AtomicU64, Ordering}; use super::util::RunOnDrop; -use super::Task; +use super::{raw_waker_vtable, Task}; use super::{CLOSED, POLLING, REF_MASK, WAKE_MASK}; /// Virtual table for a `Runnable`. @@ -77,7 +77,7 @@ where } // Poll the task. - let raw_waker = RawWaker::new(ptr, &Task::::RAW_WAKER_VTABLE); + let raw_waker = RawWaker::new(ptr, raw_waker_vtable::()); let waker = ManuallyDrop::new(Waker::from_raw(raw_waker)); let cx = &mut Context::from_waker(&waker); diff --git a/asynchronix/src/executor/task/tests/general.rs b/asynchronix/src/executor/task/tests/general.rs index beee857..00c42b7 100644 --- a/asynchronix/src/executor/task/tests/general.rs +++ b/asynchronix/src/executor/task/tests/general.rs @@ -136,6 +136,28 @@ impl Drop for MonitoredFuture { } } +// A future that checks whether the waker cloned from the first call to `poll` +// tests equal with `Waker::will_wake` on the second call to `poll`. +struct WillWakeFuture { + waker: Arc>>, +} +impl Future for WillWakeFuture { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let waker = &mut self.waker.lock().unwrap(); + + match waker.as_ref() { + None => { + **waker = Some(cx.waker().clone()); + + Poll::Pending + } + Some(waker) => Poll::Ready(waker.will_wake(cx.waker())), + } + } +} + #[test] fn task_schedule() { test_prelude!(); @@ -623,3 +645,24 @@ fn task_drop_cycle() { assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3); } + +#[test] +fn task_will_wake() { + test_prelude!(); + + let waker = Arc::new(Mutex::new(None)); + let future = WillWakeFuture { + waker: waker.clone(), + }; + + let (promise, runnable, _cancel_token) = spawn(future, schedule_runnable, ()); + runnable.run(); + + assert!(promise.poll().is_pending()); + + // Wake the future so it is scheduled another time. + waker.lock().unwrap().as_ref().unwrap().wake_by_ref(); + assert!(run_scheduled_runnable()); + + assert_eq!(promise.poll(), Stage::Ready(true)); +}