diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c952a0..b0b9912 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: matrix: rust: - stable - - 1.75.0 + - 1.77.0 steps: - name: Checkout sources uses: actions/checkout@v3 diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index e739386..016e9a0 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -10,8 +10,10 @@ on: - 'asynchronix/src/executor/task.rs' - 'asynchronix/src/executor/task/**' - 'asynchronix/src/loom_exports.rs' - - 'asynchronix/src/model/ports/broadcaster.rs' - - 'asynchronix/src/model/ports/broadcaster/**' + - 'asynchronix/src/ports/output/broadcaster.rs' + - 'asynchronix/src/ports/output/broadcaster/**' + - 'asynchronix/src/ports/source/broadcaster.rs' + - 'asynchronix/src/ports/source/broadcaster/**' - 'asynchronix/src/util/slot.rs' - 'asynchronix/src/util/sync_cell.rs' diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 079c113..981ce35 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -9,7 +9,7 @@ name = "asynchronix" authors = ["Serge Barral "] version = "0.2.2" edition = "2021" -rust-version = "1.75" +rust-version = "1.77.0" license = "MIT OR Apache-2.0" repository = "https://github.com/asynchronics/asynchronix" readme = "../README.md" diff --git a/asynchronix/build.rs b/asynchronix/build.rs index ce89ca8..fb7492c 100644 --- a/asynchronix/build.rs +++ b/asynchronix/build.rs @@ -1,4 +1,7 @@ fn main() -> Result<(), Box> { + // Prevent warnings when checking for flag `asynchronix_loom`. + println!("cargo::rustc-check-cfg=cfg(asynchronix_loom)"); + #[cfg(feature = "rpc-codegen")] let builder = tonic_build::configure() .build_client(false) diff --git a/asynchronix/src/executor/task.rs b/asynchronix/src/executor/task.rs index 3b8d1e3..9236de9 100644 --- a/asynchronix/src/executor/task.rs +++ b/asynchronix/src/executor/task.rs @@ -125,13 +125,6 @@ where S: Fn(Runnable, T) + Send + Sync + 'static, T: Clone + Send + Sync + 'static, { - const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_waker, - Self::wake_by_val, - Self::wake_by_ref, - Self::drop_waker, - ); - /// Clones a waker. unsafe fn clone_waker(ptr: *const ()) -> RawWaker { let this = &*(ptr as *const Self); @@ -141,7 +134,7 @@ where panic!("Attack of the clones: the waker was cloned too many times"); } - RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) + RawWaker::new(ptr, raw_waker_vtable::()) } /// Wakes the task by value. @@ -287,6 +280,37 @@ where } } +/// Returns a reference to the waker's virtual table. +/// +/// Unfortunately, Rust will sometimes create multiple memory instances of the +/// virtual table for the same generic parameters, which defeats +/// `Waker::will_wake` as the latter tests the pointers to the virtual tables +/// for equality. +/// +/// Forcing the function to be inlined appears to solve this problem, but we may +/// want to investigate more robust methods. Tokio has [switched][1] to a single +/// non-generic virtual table declared as `static`, which then delegates each +/// call with another virtual call. This does ensure that `Waker::will_wake` +/// will always work, but the double indirection is a bit unfortunate and its +/// cost would need to be evaluated. +/// +/// [1]: https://github.com/tokio-rs/tokio/pull/5213 +#[inline(never)] +fn raw_waker_vtable() -> &'static RawWakerVTable +where + F: Future + Send + 'static, + F::Output: Send + 'static, + S: Fn(Runnable, T) + Send + Sync + 'static, + T: Clone + Send + Sync + 'static, +{ + &RawWakerVTable::new( + Task::::clone_waker, + Task::::wake_by_val, + Task::::wake_by_ref, + Task::::drop_waker, + ) +} + /// Spawns a task. /// /// An arbitrary tag can be attached to the task, a clone of which will be diff --git a/asynchronix/src/executor/task/cancel_token.rs b/asynchronix/src/executor/task/cancel_token.rs index 2bc2b13..6d1511f 100644 --- a/asynchronix/src/executor/task/cancel_token.rs +++ b/asynchronix/src/executor/task/cancel_token.rs @@ -25,7 +25,7 @@ struct VTable { /// but not currently scheduled (no `Runnable` exist) then the future is /// dropped immediately. Otherwise, the future will be dropped at a later /// time by the scheduled `Runnable` once it runs. -unsafe fn cancel(ptr: *const ()) +unsafe fn cancel(ptr: *const ()) where F: Future + Send + 'static, F::Output: Send + 'static, @@ -123,7 +123,7 @@ where } /// Drops the token without cancelling the task. -unsafe fn drop(ptr: *const ()) +unsafe fn drop(ptr: *const ()) where F: Future + Send + 'static, F::Output: Send + 'static, @@ -180,7 +180,7 @@ impl CancelToken { /// allocator, /// - the reference count has been incremented to account for this new task /// reference. - pub(super) unsafe fn new_unchecked(task: *const Task) -> Self + pub(super) unsafe fn new_unchecked(task: *const Task) -> Self where F: Future + Send + 'static, F::Output: Send + 'static, diff --git a/asynchronix/src/executor/task/promise.rs b/asynchronix/src/executor/task/promise.rs index 7504d26..47d56f9 100644 --- a/asynchronix/src/executor/task/promise.rs +++ b/asynchronix/src/executor/task/promise.rs @@ -20,7 +20,7 @@ struct VTable { } /// Retrieves the output of the task if ready. -unsafe fn poll(ptr: *const ()) -> Stage +unsafe fn poll(ptr: *const ()) -> Stage where F: Future + Send + 'static, F::Output: Send + 'static, @@ -62,7 +62,7 @@ where } /// Drops the promise. -unsafe fn drop(ptr: *const ()) +unsafe fn drop(ptr: *const ()) where F: Future + Send + 'static, F::Output: Send + 'static, diff --git a/asynchronix/src/executor/task/runnable.rs b/asynchronix/src/executor/task/runnable.rs index 792af3b..59b719a 100644 --- a/asynchronix/src/executor/task/runnable.rs +++ b/asynchronix/src/executor/task/runnable.rs @@ -11,7 +11,7 @@ use crate::loom_exports::debug_or_loom_assert; use crate::loom_exports::sync::atomic::{self, AtomicU64, Ordering}; use super::util::RunOnDrop; -use super::Task; +use super::{raw_waker_vtable, Task}; use super::{CLOSED, POLLING, REF_MASK, WAKE_MASK}; /// Virtual table for a `Runnable`. @@ -22,7 +22,7 @@ struct VTable { } /// Polls the inner future. -unsafe fn run(ptr: *const ()) +unsafe fn run(ptr: *const ()) where F: Future + Send + 'static, F::Output: Send + 'static, @@ -77,7 +77,7 @@ where } // Poll the task. - let raw_waker = RawWaker::new(ptr, &Task::::RAW_WAKER_VTABLE); + let raw_waker = RawWaker::new(ptr, raw_waker_vtable::()); let waker = ManuallyDrop::new(Waker::from_raw(raw_waker)); let cx = &mut Context::from_waker(&waker); diff --git a/asynchronix/src/executor/task/tests/general.rs b/asynchronix/src/executor/task/tests/general.rs index beee857..00c42b7 100644 --- a/asynchronix/src/executor/task/tests/general.rs +++ b/asynchronix/src/executor/task/tests/general.rs @@ -136,6 +136,28 @@ impl Drop for MonitoredFuture { } } +// A future that checks whether the waker cloned from the first call to `poll` +// tests equal with `Waker::will_wake` on the second call to `poll`. +struct WillWakeFuture { + waker: Arc>>, +} +impl Future for WillWakeFuture { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let waker = &mut self.waker.lock().unwrap(); + + match waker.as_ref() { + None => { + **waker = Some(cx.waker().clone()); + + Poll::Pending + } + Some(waker) => Poll::Ready(waker.will_wake(cx.waker())), + } + } +} + #[test] fn task_schedule() { test_prelude!(); @@ -623,3 +645,24 @@ fn task_drop_cycle() { assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3); } + +#[test] +fn task_will_wake() { + test_prelude!(); + + let waker = Arc::new(Mutex::new(None)); + let future = WillWakeFuture { + waker: waker.clone(), + }; + + let (promise, runnable, _cancel_token) = spawn(future, schedule_runnable, ()); + runnable.run(); + + assert!(promise.poll().is_pending()); + + // Wake the future so it is scheduled another time. + waker.lock().unwrap().as_ref().unwrap().wake_by_ref(); + assert!(run_scheduled_runnable()); + + assert_eq!(promise.poll(), Stage::Ready(true)); +}