1
0
forked from ROMEO/nexosim

Merge pull request #7 from asynchronics/fix/ci-failures

Fix/ci failures
This commit is contained in:
Serge Barral 2024-05-07 01:58:05 +02:00 committed by GitHub
commit 8467b35f03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 92 additions and 20 deletions

View File

@ -17,7 +17,7 @@ jobs:
matrix: matrix:
rust: rust:
- stable - stable
- 1.75.0 - 1.77.0
steps: steps:
- name: Checkout sources - name: Checkout sources
uses: actions/checkout@v3 uses: actions/checkout@v3

View File

@ -10,8 +10,10 @@ on:
- 'asynchronix/src/executor/task.rs' - 'asynchronix/src/executor/task.rs'
- 'asynchronix/src/executor/task/**' - 'asynchronix/src/executor/task/**'
- 'asynchronix/src/loom_exports.rs' - 'asynchronix/src/loom_exports.rs'
- 'asynchronix/src/model/ports/broadcaster.rs' - 'asynchronix/src/ports/output/broadcaster.rs'
- 'asynchronix/src/model/ports/broadcaster/**' - 'asynchronix/src/ports/output/broadcaster/**'
- 'asynchronix/src/ports/source/broadcaster.rs'
- 'asynchronix/src/ports/source/broadcaster/**'
- 'asynchronix/src/util/slot.rs' - 'asynchronix/src/util/slot.rs'
- 'asynchronix/src/util/sync_cell.rs' - 'asynchronix/src/util/sync_cell.rs'

View File

@ -9,7 +9,7 @@ name = "asynchronix"
authors = ["Serge Barral <serge.barral@asynchronics.com>"] authors = ["Serge Barral <serge.barral@asynchronics.com>"]
version = "0.2.2" version = "0.2.2"
edition = "2021" edition = "2021"
rust-version = "1.75" rust-version = "1.77.0"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
repository = "https://github.com/asynchronics/asynchronix" repository = "https://github.com/asynchronics/asynchronix"
readme = "../README.md" readme = "../README.md"

View File

@ -1,4 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
// Prevent warnings when checking for flag `asynchronix_loom`.
println!("cargo::rustc-check-cfg=cfg(asynchronix_loom)");
#[cfg(feature = "rpc-codegen")] #[cfg(feature = "rpc-codegen")]
let builder = tonic_build::configure() let builder = tonic_build::configure()
.build_client(false) .build_client(false)

View File

@ -125,13 +125,6 @@ where
S: Fn(Runnable, T) + Send + Sync + 'static, S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static, T: Clone + Send + Sync + 'static,
{ {
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_waker,
Self::wake_by_val,
Self::wake_by_ref,
Self::drop_waker,
);
/// Clones a waker. /// Clones a waker.
unsafe fn clone_waker(ptr: *const ()) -> RawWaker { unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let this = &*(ptr as *const Self); let this = &*(ptr as *const Self);
@ -141,7 +134,7 @@ where
panic!("Attack of the clones: the waker was cloned too many times"); panic!("Attack of the clones: the waker was cloned too many times");
} }
RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) RawWaker::new(ptr, raw_waker_vtable::<F, S, T>())
} }
/// Wakes the task by value. /// Wakes the task by value.
@ -287,6 +280,37 @@ where
} }
} }
/// Returns a reference to the waker's virtual table.
///
/// Unfortunately, Rust will sometimes create multiple memory instances of the
/// virtual table for the same generic parameters, which defeats
/// `Waker::will_wake` as the latter tests the pointers to the virtual tables
/// for equality.
///
/// Forcing the function to be inlined appears to solve this problem, but we may
/// want to investigate more robust methods. Tokio has [switched][1] to a single
/// non-generic virtual table declared as `static`, which then delegates each
/// call with another virtual call. This does ensure that `Waker::will_wake`
/// will always work, but the double indirection is a bit unfortunate and its
/// cost would need to be evaluated.
///
/// [1]: https://github.com/tokio-rs/tokio/pull/5213
#[inline(never)]
fn raw_waker_vtable<F, S, T>() -> &'static RawWakerVTable
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
&RawWakerVTable::new(
Task::<F, S, T>::clone_waker,
Task::<F, S, T>::wake_by_val,
Task::<F, S, T>::wake_by_ref,
Task::<F, S, T>::drop_waker,
)
}
/// Spawns a task. /// Spawns a task.
/// ///
/// An arbitrary tag can be attached to the task, a clone of which will be /// An arbitrary tag can be attached to the task, a clone of which will be

View File

@ -25,7 +25,7 @@ struct VTable {
/// but not currently scheduled (no `Runnable` exist) then the future is /// but not currently scheduled (no `Runnable` exist) then the future is
/// dropped immediately. Otherwise, the future will be dropped at a later /// dropped immediately. Otherwise, the future will be dropped at a later
/// time by the scheduled `Runnable` once it runs. /// time by the scheduled `Runnable` once it runs.
unsafe fn cancel<F: Future, S, T>(ptr: *const ()) unsafe fn cancel<F, S, T>(ptr: *const ())
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,
@ -123,7 +123,7 @@ where
} }
/// Drops the token without cancelling the task. /// Drops the token without cancelling the task.
unsafe fn drop<F: Future, S, T>(ptr: *const ()) unsafe fn drop<F, S, T>(ptr: *const ())
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,
@ -180,7 +180,7 @@ impl CancelToken {
/// allocator, /// allocator,
/// - the reference count has been incremented to account for this new task /// - the reference count has been incremented to account for this new task
/// reference. /// reference.
pub(super) unsafe fn new_unchecked<F: Future, S, T>(task: *const Task<F, S, T>) -> Self pub(super) unsafe fn new_unchecked<F, S, T>(task: *const Task<F, S, T>) -> Self
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,

View File

@ -20,7 +20,7 @@ struct VTable<U: Send + 'static> {
} }
/// Retrieves the output of the task if ready. /// Retrieves the output of the task if ready.
unsafe fn poll<F: Future, S, T>(ptr: *const ()) -> Stage<F::Output> unsafe fn poll<F, S, T>(ptr: *const ()) -> Stage<F::Output>
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,
@ -62,7 +62,7 @@ where
} }
/// Drops the promise. /// Drops the promise.
unsafe fn drop<F: Future, S, T>(ptr: *const ()) unsafe fn drop<F, S, T>(ptr: *const ())
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,

View File

@ -11,7 +11,7 @@ use crate::loom_exports::debug_or_loom_assert;
use crate::loom_exports::sync::atomic::{self, AtomicU64, Ordering}; use crate::loom_exports::sync::atomic::{self, AtomicU64, Ordering};
use super::util::RunOnDrop; use super::util::RunOnDrop;
use super::Task; use super::{raw_waker_vtable, Task};
use super::{CLOSED, POLLING, REF_MASK, WAKE_MASK}; use super::{CLOSED, POLLING, REF_MASK, WAKE_MASK};
/// Virtual table for a `Runnable`. /// Virtual table for a `Runnable`.
@ -22,7 +22,7 @@ struct VTable {
} }
/// Polls the inner future. /// Polls the inner future.
unsafe fn run<F: Future, S, T>(ptr: *const ()) unsafe fn run<F, S, T>(ptr: *const ())
where where
F: Future + Send + 'static, F: Future + Send + 'static,
F::Output: Send + 'static, F::Output: Send + 'static,
@ -77,7 +77,7 @@ where
} }
// Poll the task. // Poll the task.
let raw_waker = RawWaker::new(ptr, &Task::<F, S, T>::RAW_WAKER_VTABLE); let raw_waker = RawWaker::new(ptr, raw_waker_vtable::<F, S, T>());
let waker = ManuallyDrop::new(Waker::from_raw(raw_waker)); let waker = ManuallyDrop::new(Waker::from_raw(raw_waker));
let cx = &mut Context::from_waker(&waker); let cx = &mut Context::from_waker(&waker);

View File

@ -136,6 +136,28 @@ impl<F: Future> Drop for MonitoredFuture<F> {
} }
} }
// A future that checks whether the waker cloned from the first call to `poll`
// tests equal with `Waker::will_wake` on the second call to `poll`.
struct WillWakeFuture {
waker: Arc<Mutex<Option<std::task::Waker>>>,
}
impl Future for WillWakeFuture {
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let waker = &mut self.waker.lock().unwrap();
match waker.as_ref() {
None => {
**waker = Some(cx.waker().clone());
Poll::Pending
}
Some(waker) => Poll::Ready(waker.will_wake(cx.waker())),
}
}
}
#[test] #[test]
fn task_schedule() { fn task_schedule() {
test_prelude!(); test_prelude!();
@ -623,3 +645,24 @@ fn task_drop_cycle() {
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3); assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3);
} }
#[test]
fn task_will_wake() {
test_prelude!();
let waker = Arc::new(Mutex::new(None));
let future = WillWakeFuture {
waker: waker.clone(),
};
let (promise, runnable, _cancel_token) = spawn(future, schedule_runnable, ());
runnable.run();
assert!(promise.poll().is_pending());
// Wake the future so it is scheduled another time.
waker.lock().unwrap().as_ref().unwrap().wake_by_ref();
assert!(run_scheduled_runnable());
assert_eq!(promise.poll(), Stage::Ready(true));
}