1
0
forked from ROMEO/nexosim

Merge pull request #5 from asynchronics/feature/better-event-cancellation

Feature/better event cancellation
This commit is contained in:
Serge Barral 2023-07-21 14:52:21 +02:00 committed by GitHub
commit 7c52f4b8b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 626 additions and 807 deletions

View File

@ -88,7 +88,7 @@ pub struct DelayedMultiplier {
impl DelayedMultiplier {
pub fn input(&mut self, value: f64, scheduler: &Scheduler<Self>) {
scheduler
.schedule_in(Duration::from_secs(1), Self::send, 2.0 * value)
.schedule_event_in(Duration::from_secs(1), Self::send, 2.0 * value)
.unwrap();
}
async fn send(&mut self, value: f64) {
@ -157,11 +157,12 @@ the posted events. For these reasons, Asynchronix relies on a fully custom
runtime.
Even though the runtime was largely influenced by Tokio, it features additional
optimization that make its faster than any other multi-threaded Rust executor on
the typically message-passing-heavy workloads seen in discrete-event simulation
(see [benchmark]). Asynchronix also improves over the state of the art with a
very fast custom MPSC channel, which performance has been demonstrated through
[Tachyonix][tachyonix], a general-purpose offshoot of this channel.
optimizations that make its faster than any other multi-threaded Rust executor
on the typically message-passing-heavy workloads seen in discrete-event
simulation (see [benchmark]). Asynchronix also improves over the state of the
art with a very fast custom MPSC channel, which performance has been
demonstrated through [Tachyonix][tachyonix], a general-purpose offshoot of this
channel.
[actor_model]: https://en.wikipedia.org/wiki/Actor_model

View File

@ -31,6 +31,7 @@ diatomic-waker = "0.1"
futures-task = "0.3"
multishot = "0.3"
num_cpus = "1.13"
pin-project-lite = "0.2"
recycle-box = "0.2"
slab = "0.4"
st3 = "0.4"

View File

@ -3,7 +3,7 @@
//! This example demonstrates in particular:
//!
//! * non-trivial state machines,
//! * cancellation of calls scheduled at the current time step using epochs,
//! * cancellation of events,
//! * model initialization,
//! * simulation monitoring with event slots.
//!
@ -37,7 +37,7 @@ use std::time::Duration;
use asynchronix::model::{InitializedModel, Model, Output};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{MonotonicTime, Scheduler, SchedulerKey};
use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
/// Water pump.
pub struct Pump {
@ -79,12 +79,9 @@ pub struct Controller {
brew_time: Duration,
/// Current water sense state.
water_sense: WaterSenseState,
/// Scheduler key, which if present indicates that the machine is current
/// Event key, which if present indicates that the machine is currently
/// brewing -- internal state.
stop_brew_key: Option<SchedulerKey>,
/// An epoch incremented when the scheduled 'stop_brew` callback must be
/// ignored -- internal state.
stop_brew_epoch: u64,
stop_brew_key: Option<EventKey>,
}
impl Controller {
@ -98,22 +95,16 @@ impl Controller {
pump_cmd: Output::default(),
stop_brew_key: None,
water_sense: WaterSenseState::Empty, // will be overridden during init
stop_brew_epoch: 0,
}
}
/// Signals a change in the water sensing state -- input port.
pub async fn water_sense(&mut self, state: WaterSenseState, scheduler: &Scheduler<Self>) {
pub async fn water_sense(&mut self, state: WaterSenseState) {
// Check if the tank just got empty.
if state == WaterSenseState::Empty && self.water_sense == WaterSenseState::NotEmpty {
// If a brew was ongoing, we must cancel it.
if let Some(key) = self.stop_brew_key.take() {
// Try to abort the scheduled call to `stop_brew()`. If this will
// fails, increment the epoch so that the call is ignored.
if scheduler.cancel(key).is_err() {
self.stop_brew_epoch = self.stop_brew_epoch.wrapping_add(1);
};
key.cancel_event();
self.pump_cmd.send(PumpCommand::Off).await;
}
}
@ -136,11 +127,8 @@ impl Controller {
if let Some(key) = self.stop_brew_key.take() {
self.pump_cmd.send(PumpCommand::Off).await;
// Try to abort the scheduled call to `stop_brew()`. If this will
// fails, increment the epoch so that the call is ignored.
if scheduler.cancel(key).is_err() {
self.stop_brew_epoch = self.stop_brew_epoch.wrapping_add(1);
};
// Abort the scheduled call to `stop_brew()`.
key.cancel_event();
return;
}
@ -153,19 +141,14 @@ impl Controller {
// Schedule the `stop_brew()` method and turn on the pump.
self.stop_brew_key = Some(
scheduler
.schedule_in(self.brew_time, Self::stop_brew, self.stop_brew_epoch)
.schedule_keyed_event_in(self.brew_time, Self::stop_brew, ())
.unwrap(),
);
self.pump_cmd.send(PumpCommand::On).await;
}
/// Stops brewing.
async fn stop_brew(&mut self, epoch: u64) {
// Ignore this call if the epoch has been incremented.
if self.stop_brew_epoch != epoch {
return;
}
async fn stop_brew(&mut self) {
if self.stop_brew_key.take().is_some() {
self.pump_cmd.send(PumpCommand::Off).await;
}
@ -190,9 +173,6 @@ pub struct Tank {
volume: f64,
/// State that exists when the mass flow rate is non-zero -- internal state.
dynamic_state: Option<TankDynamicState>,
/// An epoch incremented when the pending call to `set_empty()` must be
/// ignored -- internal state.
set_empty_epoch: u64,
}
impl Tank {
/// Creates a new tank with the specified amount of water [m³].
@ -204,7 +184,6 @@ impl Tank {
Self {
volume: water_volume,
dynamic_state: None,
set_empty_epoch: 0,
water_sense: Output::default(),
}
}
@ -224,11 +203,8 @@ impl Tank {
// If the current flow rate is non-zero, compute the current volume and
// schedule a new update.
if let Some(state) = self.dynamic_state.take() {
// Try to abort the scheduled call to `set_empty()`. If this will
// fails, increment the epoch so that the call is ignored.
if scheduler.cancel(state.set_empty_key).is_err() {
self.set_empty_epoch = self.set_empty_epoch.wrapping_add(1);
}
// Abort the scheduled call to `set_empty()`.
state.set_empty_key.cancel_event();
// Update the volume, saturating at 0 in case of rounding errors.
let time = scheduler.time();
@ -260,11 +236,8 @@ impl Tank {
// If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() {
// Try to abort the scheduled call to `set_empty()`. If this will
// fails, increment the epoch so that the call is ignored.
if scheduler.cancel(state.set_empty_key).is_err() {
self.set_empty_epoch = self.set_empty_epoch.wrapping_add(1);
}
// Abort the scheduled call to `set_empty()`.
state.set_empty_key.cancel_event();
// Update the volume, saturating at 0 in case of rounding errors.
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
@ -301,7 +274,7 @@ impl Tank {
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);
// Schedule the next update.
match scheduler.schedule_in(duration_until_empty, Self::set_empty, self.set_empty_epoch) {
match scheduler.schedule_keyed_event_in(duration_until_empty, Self::set_empty, ()) {
Ok(set_empty_key) => {
let state = TankDynamicState {
last_volume_update: time,
@ -319,12 +292,7 @@ impl Tank {
}
/// Updates the state of the tank to indicate that there is no more water.
async fn set_empty(&mut self, epoch: u64) {
// Ignore this call if the epoch has been incremented.
if epoch != self.set_empty_epoch {
return;
}
async fn set_empty(&mut self) {
self.volume = 0.0;
self.dynamic_state = None;
self.water_sense.send(WaterSenseState::Empty).await;
@ -355,7 +323,7 @@ impl Model for Tank {
/// is non-zero.
struct TankDynamicState {
last_volume_update: MonotonicTime,
set_empty_key: SchedulerKey,
set_empty_key: EventKey,
flow_rate: f64,
}
@ -429,7 +397,7 @@ fn main() {
// Drink too much coffee.
let volume_per_shot = pump_flow_rate * Controller::DEFAULT_BREW_TIME.as_secs_f64();
let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who care about floating-point rounding errors?
let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who cares about floating-point rounding errors?
for _ in 0..(shots_per_tank - 1) {
simu.send_event(Controller::brew_cmd, (), &controller_addr);
assert_eq!(flow_rate.take(), Some(pump_flow_rate));
@ -463,7 +431,7 @@ fn main() {
assert_eq!(flow_rate.take(), Some(0.0));
// Interrupt the brew after 15s by pressing again the brew button.
simu.schedule_in(
simu.schedule_event_in(
Duration::from_secs(15),
Controller::brew_cmd,
(),

View File

@ -174,7 +174,7 @@ impl Driver {
// Schedule the next pulse.
scheduler
.schedule_in(pulse_duration, Self::send_pulse, ())
.schedule_event_in(pulse_duration, Self::send_pulse, ())
.unwrap();
}
}
@ -224,7 +224,7 @@ fn main() {
assert!(position.next().is_none());
// Start the motor in 2s with a PPS of 10Hz.
simu.schedule_in(
simu.schedule_event_in(
Duration::from_secs(2),
Driver::pulse_rate,
10.0,

View File

@ -79,7 +79,7 @@ enum MessageBox<T: ?Sized> {
None,
}
/// A queue slot that a stamp and either a boxed messaged or an empty box.
/// A queue slot with a stamp and either a boxed messaged or an empty box.
struct Slot<T: ?Sized> {
stamp: AtomicUsize,
message: UnsafeCell<MessageBox<T>>,

View File

@ -113,7 +113,7 @@
//! }
//! impl Delay {
//! pub fn input(&mut self, value: f64, scheduler: &Scheduler<Self>) {
//! scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap();
//! scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap();
//! }
//!
//! async fn send(&mut self, value: f64) {
@ -184,7 +184,7 @@
//! # }
//! # impl Delay {
//! # pub fn input(&mut self, value: f64, scheduler: &Scheduler<Self>) {
//! # scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap();
//! # scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap();
//! # }
//! # async fn send(&mut self, value: f64) { // this method can be private
//! # self.output.send(value).await;
@ -242,7 +242,7 @@
//! [`Simulation::send_event()`](simulation::Simulation::send_event) or
//! [`Simulation::send_query()`](simulation::Simulation::send_query),
//! 3. by scheduling events, using for instance
//! [`Simulation::schedule_in()`](simulation::Simulation::schedule_in).
//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in).
//!
//! Simulation outputs can be monitored using
//! [`EventSlot`](simulation::EventSlot)s and
@ -275,7 +275,7 @@
//! # }
//! # impl Delay {
//! # pub fn input(&mut self, value: f64, scheduler: &Scheduler<Self>) {
//! # scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap();
//! # scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap();
//! # }
//! # async fn send(&mut self, value: f64) { // this method can be private
//! # self.output.send(value).await;
@ -354,11 +354,12 @@
//!
//! The first guarantee (and only the first) also extends to events scheduled
//! from a simulation with
//! [`Simulation::schedule_in()`](simulation::Simulation::schedule_in) or
//! [`Simulation::schedule_at()`](simulation::Simulation::schedule_at): if the
//! scheduler contains several events to be delivered at the same time to the
//! same model, these events will always be processed in the order in which they
//! were scheduled.
//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in)
//! or
//! [`Simulation::schedule_event_at()`](simulation::Simulation::schedule_event_at):
//! if the scheduler contains several events to be delivered at the same time to
//! the same model, these events will always be processed in the order in which
//! they were scheduled.
//!
//! [actor_model]: https://en.wikipedia.org/wiki/Actor_model
//! [pony]: https://www.ponylang.io/

View File

@ -73,7 +73,7 @@
//!
//! At the moment, Asynchronix is unfortunately not able to discriminate between
//! such pathological deadlocks and the "expected" deadlock that occurs when all
//! tasks in a given time slice have completed and all models are starved on an
//! events in a given time slice have completed and all models are starved on an
//! empty mailbox. Consequently, blocking method such as [`SimInit::init()`],
//! [`Simulation::step()`], [`Simulation::send_event()`], etc., will return
//! without error after a pathological deadlock, leaving the user responsible
@ -91,8 +91,8 @@
//! There is actually a very simple solution to this problem: since the
//! [`InputFn`](crate::model::InputFn) trait also matches closures of type
//! `FnOnce(&mut impl Model)`, it is enough to invoke
//! [`Simulation::send_event()`] with a closure that connects or disconnects
//! a port, such as:
//! [`Simulation::send_event()`] with a closure that connects or disconnects a
//! port, such as:
//!
//! ```
//! # use asynchronix::model::{Model, Output};
@ -129,15 +129,15 @@ pub use sim_init::SimInit;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use recycle_box::{coerce_box, RecycleBox};
use crate::executor::Executor;
use crate::model::{InputFn, Model, ReplierFn};
use crate::time::{self, CancellationError, MonotonicTime, TearableAtomicTime};
use crate::time::{ScheduledTimeError, SchedulerKey, SchedulerQueue};
use crate::time::{self, MonotonicTime, TearableAtomicTime};
use crate::time::{EventKey, ScheduledTimeError, SchedulerQueue};
use crate::util::futures::SeqFuture;
use crate::util::slot;
use crate::util::sync_cell::SyncCell;
@ -164,9 +164,9 @@ use crate::util::sync_cell::SyncCell;
/// the case of queries, the response is returned.
///
/// Events can also be scheduled at a future simulation time using
/// [`schedule_in()`](Simulation::schedule_in) or
/// [`schedule_at()`](Simulation::schedule_at). These methods queue an event
/// without blocking.
/// [`schedule_event_in()`](Simulation::schedule_event_in) or
/// [`schedule_event_at()`](Simulation::schedule_event_at). These methods queue
/// an event without blocking.
///
/// Finally, the [`Simulation`] instance manages simulation time. Calling
/// [`step()`](Simulation::step) will increment simulation time until that of
@ -201,20 +201,20 @@ impl Simulation {
self.time.read()
}
/// Advances simulation time to that of the next scheduled task, processing
/// that task as well as all other tasks scheduled for the same time.
/// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other event scheduled for the same time.
///
/// This method may block. Once it returns, it is guaranteed that all newly
/// processed tasks (if any) have completed.
/// processed event (if any) have completed.
pub fn step(&mut self) {
self.step_to_next_bounded(MonotonicTime::MAX);
}
/// Iteratively advances the simulation time by the specified duration and
/// processes all tasks scheduled up to the target time.
/// processes all events scheduled up to the target time.
///
/// This method may block. Once it returns, it is guaranteed that (i) all
/// tasks scheduled up to the specified target time have completed and (ii)
/// events scheduled up to the specified target time have completed and (ii)
/// the final simulation time has been incremented by the specified
/// duration.
pub fn step_by(&mut self, duration: Duration) {
@ -223,11 +223,11 @@ impl Simulation {
self.step_until_unchecked(target_time);
}
/// Iteratively advances the simulation time and processes all tasks
/// Iteratively advances the simulation time and processes all events
/// scheduled up to the specified target time.
///
/// This method may block. Once it returns, it is guaranteed that (i) all
/// tasks scheduled up to the specified target time have completed and (ii)
/// events scheduled up to the specified target time have completed and (ii)
/// the final simulation time matches the target time.
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ScheduledTimeError<()>> {
if self.time.read() >= target_time {
@ -244,13 +244,13 @@ impl Simulation {
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
pub fn schedule_in<M, F, T, S>(
pub fn schedule_event_in<M, F, T, S>(
&mut self,
duration: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<SchedulerKey, ScheduledTimeError<T>>
) -> Result<(), ScheduledTimeError<T>>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
@ -261,7 +261,36 @@ impl Simulation {
}
let time = self.time.read() + duration;
let schedule_key = time::schedule_event_at_unchecked(
time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
Ok(())
}
/// Schedules an event at the lapse of the specified duration and returns an
/// event key.
///
/// An error is returned if the specified duration is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
pub fn schedule_keyed_event_in<M, F, T, S>(
&mut self,
duration: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, ScheduledTimeError<T>>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
if duration.is_zero() {
return Err(ScheduledTimeError(arg));
}
let time = self.time.read() + duration;
let event_key = time::schedule_keyed_event_at_unchecked(
time,
func,
arg,
@ -269,7 +298,7 @@ impl Simulation {
&self.scheduler_queue,
);
Ok(schedule_key)
Ok(event_key)
}
/// Schedules an event at a future time.
@ -279,13 +308,13 @@ impl Simulation {
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
pub fn schedule_at<M, F, T, S>(
pub fn schedule_event_at<M, F, T, S>(
&mut self,
time: MonotonicTime,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<SchedulerKey, ScheduledTimeError<T>>
) -> Result<(), ScheduledTimeError<T>>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
@ -294,7 +323,34 @@ impl Simulation {
if self.time.read() >= time {
return Err(ScheduledTimeError(arg));
}
let schedule_key = time::schedule_event_at_unchecked(
time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
Ok(())
}
/// Schedules an event at a future time and returns an event key.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
pub fn schedule_keyed_event_at<M, F, T, S>(
&mut self,
time: MonotonicTime,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, ScheduledTimeError<T>>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
if self.time.read() >= time {
return Err(ScheduledTimeError(arg));
}
let event_key = time::schedule_keyed_event_at_unchecked(
time,
func,
arg,
@ -302,16 +358,7 @@ impl Simulation {
&self.scheduler_queue,
);
Ok(schedule_key)
}
/// Cancels an event with a scheduled time in the future of the current
/// simulation time.
///
/// If the corresponding event was already executed, or if it is scheduled
/// for the current simulation time, an error is returned.
pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> {
time::cancel_scheduled(scheduler_key, &self.scheduler_queue)
Ok(event_key)
}
/// Sends and processes an event, blocking until completion.
@ -387,72 +434,84 @@ impl Simulation {
reply_reader.try_read().map_err(|_| QueryError {})
}
/// Advances simulation time to that of the next scheduled task if its
/// Advances simulation time to that of the next scheduled event if its
/// scheduling time does not exceed the specified bound, processing that
/// task as well as all other tasks scheduled for the same time.
/// event as well as all other events scheduled for the same time.
///
/// If at least one task was found that satisfied the time bound, the
/// If at least one event was found that satisfied the time bound, the
/// corresponding new simulation time is returned.
fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option<MonotonicTime> {
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = match scheduler_queue.peek_key() {
Some(&k) if k.0 <= upper_time_bound => k,
_ => return None,
// Closure returning the next key which time stamp is no older than the
// upper bound, if any. Cancelled events are discarded.
let get_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
loop {
match scheduler_queue.peek() {
Some((&k, t)) if k.0 <= upper_time_bound => {
if !t.is_cancelled() {
break Some(k);
}
// Discard cancelled events.
scheduler_queue.pull();
}
_ => break None,
}
}
};
// Set the simulation time to that of the next scheduled task
// Move to the next scheduled time.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = get_next_key(&mut scheduler_queue)?;
self.time.write(current_key.0);
loop {
let task = scheduler_queue.pull().unwrap().1;
let event = scheduler_queue.pull().unwrap().1;
let mut next_key = scheduler_queue.peek_key();
if next_key != Some(&current_key) {
// Since there are no other tasks targeting the same mailbox
// and the same time, the task is spawned immediately.
self.executor.spawn_and_forget(Box::into_pin(task));
let mut next_key = get_next_key(&mut scheduler_queue);
if next_key != Some(current_key) {
// Since there are no other events targeting the same mailbox
// and the same time, the event is spawned immediately.
self.executor.spawn_and_forget(Box::into_pin(event));
} else {
// To ensure that their relative order of execution is
// preserved, all tasks targeting the same mailbox are
// concatenated into a single future.
let mut task_sequence = SeqFuture::new();
// preserved, all event targeting the same mailbox are executed
// sequentially within a single compound future.
let mut event_sequence = SeqFuture::new();
task_sequence.push(Box::into_pin(task));
event_sequence.push(Box::into_pin(event));
loop {
let task = scheduler_queue.pull().unwrap().1;
task_sequence.push(Box::into_pin(task));
next_key = scheduler_queue.peek_key();
if next_key != Some(&current_key) {
let event = scheduler_queue.pull().unwrap().1;
event_sequence.push(Box::into_pin(event));
next_key = get_next_key(&mut scheduler_queue);
if next_key != Some(current_key) {
break;
}
}
// Spawn a parent task that sequentially polls all sub-tasks.
self.executor.spawn_and_forget(task_sequence);
// Spawn a parent event that sequentially polls all events
// targeting the same mailbox.
self.executor.spawn_and_forget(event_sequence);
}
match next_key {
// If the next task is scheduled at the same time, update the key and continue.
Some(k) if k.0 == current_key.0 => {
current_key = *k;
}
// Otherwise wait until all tasks have completed and return.
current_key = match next_key {
// If the next event is scheduled at the same time, update the
// key and continue.
Some(k) if k.0 == current_key.0 => k,
// Otherwise wait until all events have completed and return.
_ => {
drop(scheduler_queue); // make sure the queue's mutex is unlocked.
drop(scheduler_queue); // make sure the queue's mutex is released.
self.executor.run();
return Some(current_key.0);
}
}
};
}
}
/// Iteratively advances simulation time and processes all tasks scheduled
/// Iteratively advances simulation time and processes all events scheduled
/// up to the specified target time.
///
/// Once the method returns it is guaranteed that (i) all tasks scheduled up
/// to the specified target time have completed and (ii) the final
/// Once the method returns it is guaranteed that (i) all events scheduled
/// up to the specified target time have completed and (ii) the final
/// simulation time matches the target time.
///
/// This method does not check whether the specified time lies in the future
@ -462,7 +521,7 @@ impl Simulation {
match self.step_to_next_bounded(target_time) {
// The target time was reached exactly.
Some(t) if t == target_time => return,
// No tasks are scheduled before or at the target time.
// No events are scheduled before or at the target time.
None => {
// Update the simulation time.
self.time.write(target_time);

View File

@ -31,7 +31,7 @@
//!
//! // Sets an alarm [input port].
//! pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
//! if scheduler.schedule_at(setting, Self::ring, ()).is_err() {
//! if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() {
//! println!("The alarm clock can only be set for a future time");
//! }
//! }
@ -50,5 +50,7 @@ mod scheduler;
pub(crate) use monotonic_time::TearableAtomicTime;
pub use monotonic_time::{MonotonicTime, SystemTimeError};
pub(crate) use scheduler::{cancel_scheduled, schedule_event_at_unchecked, SchedulerQueue};
pub use scheduler::{CancellationError, ScheduledTimeError, Scheduler, SchedulerKey};
pub(crate) use scheduler::{
schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, SchedulerQueue,
};
pub use scheduler::{EventKey, ScheduledTimeError, Scheduler};

View File

@ -3,20 +3,23 @@
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use pin_project_lite::pin_project;
use recycle_box::{coerce_box, RecycleBox};
use crate::channel::{ChannelId, Sender};
use crate::model::{InputFn, Model};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::{self, PriorityQueue};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCellReader;
/// Shorthand for the scheduler queue type.
pub(crate) type SchedulerQueue =
PriorityQueue<(MonotonicTime, ChannelId), Box<dyn Future<Output = ()> + Send>>;
PriorityQueue<(MonotonicTime, ChannelId), Box<dyn EventFuture<Output = ()> + Send>>;
/// A local scheduler for models.
///
@ -65,8 +68,8 @@ pub(crate) type SchedulerQueue =
/// let greeting = format!("Hello, this message was scheduled at:
/// {:?}.", time);
///
/// if let Err(err) = scheduler.schedule_in(delay, Self::send_msg, greeting) {
/// // ^^^^^^^^ scheduled method
/// if let Err(err) = scheduler.schedule_event_in(delay, Self::send_msg, greeting) {
/// // ^^^^^^^^ scheduled method
/// // The duration was zero, so greet right away.
/// let greeting = err.0;
/// self.msg_out.send(greeting).await;
@ -144,19 +147,19 @@ impl<M: Model> Scheduler<M> {
///
/// // Schedule this method again in 1s with an incremented counter.
/// scheduler
/// .schedule_in(Duration::from_secs(1), Self::trigger, counter + 1)
/// .schedule_event_in(Duration::from_secs(1), Self::trigger, counter + 1)
/// .unwrap();
/// }
/// }
///
/// impl Model for PeriodicLogger {}
/// ```
pub fn schedule_in<F, T, S>(
pub fn schedule_event_in<F, T, S>(
&self,
duration: Duration,
func: F,
arg: T,
) -> Result<SchedulerKey, ScheduledTimeError<T>>
) -> Result<(), ScheduledTimeError<T>>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
@ -166,10 +169,70 @@ impl<M: Model> Scheduler<M> {
}
let time = self.time() + duration;
let sender = self.sender.clone();
let schedule_key =
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
Ok(schedule_key)
Ok(())
}
/// Schedules an event at the lapse of the specified duration and returns an
/// event key.
///
/// An error is returned if the specified duration is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
/// use std::future::Future;
/// use asynchronix::model::Model;
/// use asynchronix::time::{EventKey, Scheduler};
///
/// // A model that logs the value of a counter every second after being
/// // triggered the first time until logging is stopped.
/// pub struct PeriodicLogger {
/// event_key: Option<EventKey>
/// }
///
/// impl PeriodicLogger {
/// // Triggers the logging of a timestamp every second [input port].
/// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler<Self>) {
/// self.stop();
/// println!("counter: {}", counter);
///
/// // Schedule this method again in 1s with an incremented counter.
/// let event_key = scheduler
/// .schedule_keyed_event_in(Duration::from_secs(1), Self::trigger, counter + 1)
/// .unwrap();
/// self.event_key = Some(event_key);
/// }
///
/// // Cancels the logging of timestamps.
/// pub fn stop(&mut self) {
/// self.event_key.take().map(|k| k.cancel_event());
/// }
/// }
///
/// impl Model for PeriodicLogger {}
/// ```
pub fn schedule_keyed_event_in<F, T, S>(
&self,
duration: Duration,
func: F,
arg: T,
) -> Result<EventKey, ScheduledTimeError<T>>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
if duration.is_zero() {
return Err(ScheduledTimeError(arg));
}
let time = self.time() + duration;
let sender = self.sender.clone();
let event_key =
schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
Ok(event_key)
}
/// Schedules an event at a future time.
@ -183,7 +246,7 @@ impl<M: Model> Scheduler<M> {
/// use asynchronix::model::Model;
/// use asynchronix::time::{MonotonicTime, Scheduler};
///
/// // An alarm clock model.
/// // An alarm clock.
/// pub struct AlarmClock {
/// msg: String
/// }
@ -196,7 +259,7 @@ impl<M: Model> Scheduler<M> {
///
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
/// if scheduler.schedule_at(setting, Self::ring, ()).is_err() {
/// if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
@ -209,12 +272,12 @@ impl<M: Model> Scheduler<M> {
///
/// impl Model for AlarmClock {}
/// ```
pub fn schedule_at<F, T, S>(
pub fn schedule_event_at<F, T, S>(
&self,
time: MonotonicTime,
func: F,
arg: T,
) -> Result<SchedulerKey, ScheduledTimeError<T>>
) -> Result<(), ScheduledTimeError<T>>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
@ -223,20 +286,77 @@ impl<M: Model> Scheduler<M> {
return Err(ScheduledTimeError(arg));
}
let sender = self.sender.clone();
let schedule_key =
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
Ok(schedule_key)
Ok(())
}
/// Cancels an event with a scheduled time in the future of the current
/// simulation time.
/// Schedules an event at a future time and returns an event key.
///
/// If the corresponding event was already executed, or if it is scheduled
/// for the current simulation time but was not yet executed, an error is
/// returned.
pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> {
cancel_scheduled(scheduler_key, &self.scheduler_queue)
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::Model;
/// use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
///
/// // An alarm clock that can be cancelled.
/// pub struct AlarmClock {
/// msg: String,
/// event_key: Option<EventKey>,
/// }
///
/// impl AlarmClock {
/// // Creates a new alarm clock.
/// pub fn new(msg: String) -> Self {
/// Self {
/// msg,
/// event_key: None
/// }
/// }
///
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
/// self.cancel();
/// match scheduler.schedule_keyed_event_at(setting, Self::ring, ()) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels the current alarm, if any.
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel_event());
/// }
///
/// // Rings the alarm [private input port].
/// fn ring(&mut self) {
/// println!("{}", self.msg);
/// }
/// }
///
/// impl Model for AlarmClock {}
/// ```
pub fn schedule_keyed_event_at<F, T, S>(
&self,
time: MonotonicTime,
func: F,
arg: T,
) -> Result<EventKey, ScheduledTimeError<T>>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
if self.time() >= time {
return Err(ScheduledTimeError(arg));
}
let sender = self.sender.clone();
let event_key =
schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
Ok(event_key)
}
}
@ -246,15 +366,61 @@ impl<M: Model> fmt::Debug for Scheduler<M> {
}
}
/// Unique identifier for a scheduled event.
/// Handle to a scheduled event.
///
/// A `SchedulerKey` can be used to cancel a future event.
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub struct SchedulerKey(priority_queue::InsertKey);
/// An `EventKey` can be used to cancel a future event.
#[derive(Clone, Debug)]
pub struct EventKey {
state: Arc<AtomicUsize>,
}
impl SchedulerKey {
pub(crate) fn new(key: priority_queue::InsertKey) -> Self {
Self(key)
impl EventKey {
const IS_PENDING: usize = 0;
const IS_CANCELLED: usize = 1;
const IS_PROCESSED: usize = 2;
/// Creates a key for a pending event.
pub(crate) fn new() -> Self {
Self {
state: Arc::new(AtomicUsize::new(Self::IS_PENDING)),
}
}
/// Checks whether the event was cancelled.
pub(crate) fn event_is_cancelled(&self) -> bool {
self.state.load(Ordering::Relaxed) == Self::IS_CANCELLED
}
/// Marks the event as processed.
///
/// If the event cannot be processed because it was cancelled, `false` is
/// returned.
pub(crate) fn process_event(self) -> bool {
match self.state.compare_exchange(
Self::IS_PENDING,
Self::IS_PROCESSED,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => true,
Err(s) => s == Self::IS_PROCESSED,
}
}
/// Cancels the associated event if possible.
///
/// If the event cannot be cancelled because it was already processed,
/// `false` is returned.
pub fn cancel_event(self) -> bool {
match self.state.compare_exchange(
Self::IS_PENDING,
Self::IS_CANCELLED,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => true,
Err(s) => s == Self::IS_CANCELLED,
}
}
}
@ -274,21 +440,6 @@ impl<T> fmt::Display for ScheduledTimeError<T> {
impl<T: fmt::Debug> Error for ScheduledTimeError<T> {}
/// Error returned when the cancellation of a scheduler event is unsuccessful.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct CancellationError {}
impl fmt::Display for CancellationError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"the scheduler key should belong to an event or command scheduled in the future of the current simulation time"
)
}
}
impl Error for CancellationError {}
/// Schedules an event at a future time.
///
/// This method does not check whether the specified time lies in the future
@ -299,8 +450,7 @@ pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
arg: T,
sender: Sender<M>,
scheduler_queue: &Mutex<SchedulerQueue>,
) -> SchedulerKey
where
) where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
@ -321,26 +471,143 @@ where
)
.await;
};
let fut = Box::new(UnkeyedEventFuture::new(fut));
let mut scheduler_queue = scheduler_queue.lock().unwrap();
let insert_key = scheduler_queue.insert((time, channel_id), Box::new(fut));
SchedulerKey::new(insert_key)
scheduler_queue.insert((time, channel_id), fut);
}
/// Cancels an event or command with a scheduled time in the future of the
/// current simulation time.
/// Schedules an event at a future time, returning an event key.
///
/// If the corresponding event or command was already executed, or if it is
/// scheduled for the current simulation time, an error is returned.
pub(crate) fn cancel_scheduled(
scheduler_key: SchedulerKey,
/// This method does not check whether the specified time lies in the future
/// of the current simulation time.
pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
time: MonotonicTime,
func: F,
arg: T,
sender: Sender<M>,
scheduler_queue: &Mutex<SchedulerQueue>,
) -> Result<(), CancellationError> {
let mut scheduler_queue = scheduler_queue.lock().unwrap();
if scheduler_queue.delete(scheduler_key.0) {
return Ok(());
}
) -> EventKey
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
let channel_id = sender.channel_id();
Err(CancellationError {})
let event_key = EventKey::new();
let local_event_key = event_key.clone();
let fut = async move {
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = async move {
if local_event_key.process_event() {
func.call(model, arg, scheduler).await;
}
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
};
// Implementation note: we end up with two atomic references to the event
// key stored inside the event future: one was moved above to the future
// itself and the other one is created below via cloning and stored
// separately in the `KeyedEventFuture`. This is not ideal as we could
// theoretically spare on atomic reference counting by storing a single
// reference, but this would likely require some tricky `unsafe`, not least
// because the inner future sent to the mailbox outlives the
// `KeyedEventFuture`.
let fut = Box::new(KeyedEventFuture::new(fut, event_key.clone()));
let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), fut);
event_key
}
/// The future of an event which scheduling may be cancelled by the user.
pub(crate) trait EventFuture: Future {
/// Whether the scheduling of this event was cancelled.
fn is_cancelled(&self) -> bool;
}
pin_project! {
/// Future associated to a regular event that cannot be cancelled.
pub(crate) struct UnkeyedEventFuture<F> {
#[pin]
fut: F,
}
}
impl<F> UnkeyedEventFuture<F> {
/// Creates a new `EventFuture`.
pub(crate) fn new(fut: F) -> Self {
Self { fut }
}
}
impl<F> Future for UnkeyedEventFuture<F>
where
F: Future,
{
type Output = F::Output;
#[inline(always)]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
impl<F> EventFuture for UnkeyedEventFuture<F>
where
F: Future,
{
fn is_cancelled(&self) -> bool {
false
}
}
pin_project! {
/// Future associated to a keyed event that can be cancelled.
pub(crate) struct KeyedEventFuture<F> {
event_key: EventKey,
#[pin]
fut: F,
}
}
impl<F> KeyedEventFuture<F> {
/// Creates a new `EventFuture`.
pub(crate) fn new(fut: F, event_key: EventKey) -> Self {
Self { event_key, fut }
}
}
impl<F> Future for KeyedEventFuture<F>
where
F: Future,
{
type Output = F::Output;
#[inline(always)]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
impl<F> EventFuture for KeyedEventFuture<F>
where
F: Future,
{
fn is_cancelled(&self) -> bool {
self.event_key.event_is_cancelled()
}
}

View File

@ -4,6 +4,8 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::task::{Context, Poll};
/// An owned future which sequentially polls a collection of futures.
@ -51,3 +53,39 @@ impl<F: Future + Unpin> Future for SeqFuture<F> {
Poll::Pending
}
}
trait RevocableFuture: Future {
fn is_revoked() -> bool;
}
struct NeverRevokedFuture<F> {
inner: F,
}
impl<F: Future> NeverRevokedFuture<F> {
fn new(fut: F) -> Self {
Self { inner: fut }
}
}
impl<T: Future> Future for NeverRevokedFuture<T> {
type Output = T::Output;
#[inline(always)]
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) }
}
}
impl<T: Future> RevocableFuture for NeverRevokedFuture<T> {
fn is_revoked() -> bool {
false
}
}
struct ConcurrentlyRevocableFuture<F> {
inner: F,
is_revoked: Arc<AtomicBool>,
}

View File

@ -1,52 +1,60 @@
//! Associative priority queue.
#![allow(unused)]
use std::cmp::{Eq, Ord, Ordering, PartialOrd};
use std::collections::BinaryHeap;
use std::mem;
/// A key-value pair ordered by keys in inverse order, with epoch-based ordering
/// for equal keys.
struct Item<K, V>
where
K: Ord,
{
key: K,
value: V,
epoch: u64,
}
/// An associative container optimized for extraction of the value with the
/// lowest key and deletion of arbitrary key-value pairs.
impl<K, V> Ord for Item<K, V>
where
K: Ord,
{
fn cmp(&self, other: &Self) -> Ordering {
self.key
.cmp(&other.key)
.then_with(|| self.epoch.cmp(&other.epoch))
.reverse()
}
}
impl<K, V> PartialOrd for Item<K, V>
where
K: Ord,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K, V> Eq for Item<K, V> where K: Ord {}
impl<K, V> PartialEq for Item<K, V>
where
K: Ord,
{
fn eq(&self, other: &Self) -> bool {
(self.key == other.key) && (self.epoch == other.epoch)
}
}
/// An associative container optimized for extraction of the key-value pair with
/// the lowest key, based on a binary heap.
///
/// This implementation has the same theoretical complexity for insert and pull
/// operations as a conventional array-based binary heap but does differ from
/// the latter in some important aspects:
///
/// - elements can be deleted in *O*(log(*N*)) time rather than *O*(*N*) time
/// using a unique index returned at insertion time.
/// - same-key elements are guaranteed to be pulled in FIFO order,
///
/// Under the hood, the priority queue relies on a binary heap cross-indexed
/// with values stored in a slab allocator. Each item of the binary heap
/// contains an index pointing to the associated slab-allocated node, as well as
/// the user-provided key. Each slab node contains the value associated to the
/// key and a back-pointing index to the binary heap. The heap items also
/// contain a unique epoch which allows same-key nodes to be sorted by insertion
/// order. The epoch is used as well to build unique indices that enable
/// efficient deletion of arbitrary key-value pairs.
///
/// The slab-based design is what makes *O*(log(*N*)) deletion possible, but it
/// does come with some trade-offs:
///
/// - its memory footprint is higher because it needs 2 extra pointer-sized
/// indices for each element to cross-index the heap and the slab,
/// - its computational footprint is higher because of the extra cost associated
/// with random slab access; that being said, array-based binary heaps are not
/// extremely cache-friendly to start with so unless the slab becomes very
/// fragmented, this is not expected to introduce more than a reasonable
/// constant-factor penalty compared to a conventional binary heap.
///
/// The computational penalty is partially offset by the fact that the value
/// never needs to be moved from the moment it is inserted until it is pulled.
///
/// Note that the `Copy` bound on they keys could be lifted but this would make
/// the implementation slightly less efficient unless `unsafe` is used.
/// The insertion order of equal keys is preserved, with FIFO ordering.
pub(crate) struct PriorityQueue<K, V>
where
K: Copy + Clone + Ord,
K: Ord,
{
heap: Vec<Item<K>>,
slab: Vec<Node<V>>,
first_free_node: Option<usize>,
heap: BinaryHeap<Item<K, V>>,
next_epoch: u64,
}
@ -54,81 +62,23 @@ impl<K: Copy + Ord, V> PriorityQueue<K, V> {
/// Creates an empty `PriorityQueue`.
pub(crate) fn new() -> Self {
Self {
heap: Vec::new(),
slab: Vec::new(),
first_free_node: None,
heap: BinaryHeap::new(),
next_epoch: 0,
}
}
/// Creates an empty `PriorityQueue` with at least the specified capacity.
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
heap: Vec::with_capacity(capacity),
slab: Vec::with_capacity(capacity),
first_free_node: None,
next_epoch: 0,
}
}
/// Returns the number of key-value pairs in the priority queue.
pub(crate) fn len(&self) -> usize {
self.heap.len()
}
/// Inserts a new key-value pair and returns a unique insertion key.
/// Inserts a new key-value pair.
///
/// This operation has *O*(log(*N*)) amortized worse-case theoretical
/// complexity and *O*(1) amortized theoretical complexity for a
/// sufficiently random heap.
pub(crate) fn insert(&mut self, key: K, value: V) -> InsertKey {
// Build a unique key from the user-provided key and a unique epoch.
pub(crate) fn insert(&mut self, key: K, value: V) {
// Build an element from the user-provided key-value and a unique epoch.
let epoch = self.next_epoch;
assert_ne!(epoch, u64::MAX);
self.next_epoch += 1;
let unique_key = UniqueKey { key, epoch };
// Add a new node to the slab, either by re-using a free node or by
// appending a new one.
let slab_idx = match self.first_free_node {
Some(idx) => {
self.first_free_node = self.slab[idx].unwrap_next_free_node();
self.slab[idx] = Node::HeapNode(HeapNode {
value,
heap_idx: 0, // temporary value overridden in `sift_up`
});
idx
}
None => {
let idx = self.slab.len();
self.slab.push(Node::HeapNode(HeapNode {
value,
heap_idx: 0, // temporary value overridden in `sift_up`
}));
idx
}
};
// Add a new node at the bottom of the heap.
let heap_idx = self.heap.len();
self.heap.push(Item {
key: unique_key, // temporary value overridden in `sift_up`
slab_idx: 0, // temporary value overridden in `sift_up`
});
// Sift up the new node.
self.sift_up(
Item {
key: unique_key,
slab_idx,
},
heap_idx,
);
InsertKey { slab_idx, epoch }
let item = Item { key, value, epoch };
self.heap.push(item);
}
/// Pulls the value with the lowest key.
@ -138,26 +88,7 @@ impl<K: Copy + Ord, V> PriorityQueue<K, V> {
///
/// This operation has *O*(log(N)) non-amortized theoretical complexity.
pub(crate) fn pull(&mut self) -> Option<(K, V)> {
let item = self.heap.first()?;
let top_slab_idx = item.slab_idx;
let key = item.key.key;
// Free the top node, extracting its value.
let value = mem::replace(
&mut self.slab[top_slab_idx],
Node::FreeNode(FreeNode {
next: self.first_free_node,
}),
)
.unwrap_value();
self.first_free_node = Some(top_slab_idx);
// Sift the last node at the bottom of the heap from the top of the heap.
let last_item = self.heap.pop().unwrap();
if last_item.slab_idx != top_slab_idx {
self.sift_down(last_item, 0);
}
let Item { key, value, .. } = self.heap.pop()?;
Some((key, value))
}
@ -165,497 +96,48 @@ impl<K: Copy + Ord, V> PriorityQueue<K, V> {
/// Peeks a reference to the key-value pair with the lowest key, leaving it
/// in the queue.
///
/// If there are several equal lowest keys, a reference to the key-value
/// pair which was inserted first is returned.
/// If there are several equal lowest keys, references to the key-value pair
/// which was inserted first is returned.
///
/// This operation has *O*(1) non-amortized theoretical complexity.
pub(crate) fn peek(&self) -> Option<(&K, &V)> {
let item = self.heap.first()?;
let top_slab_idx = item.slab_idx;
let key = &item.key.key;
let value = self.slab[top_slab_idx].unwrap_value_ref();
let Item {
ref key, ref value, ..
} = self.heap.peek()?;
Some((key, value))
}
/// Peeks a reference to the lowest key, leaving it in the queue.
///
/// If there are several equal lowest keys, a reference to the key which was
/// inserted first is returned.
///
/// This operation has *O*(1) non-amortized theoretical complexity.
pub(crate) fn peek_key(&self) -> Option<&K> {
let item = self.heap.first()?;
Some(&item.key.key)
}
/// Delete the key-value pair associated to the provided insertion key if it
/// is still in the queue.
///
/// Using an insertion key returned from another `PriorityQueue` is a logic
/// error and could result in the deletion of an arbitrary key-value pair.
///
/// This method returns `true` if the pair was indeed in the queue and
/// `false` otherwise.
///
/// This operation has guaranteed *O*(log(*N*)) theoretical complexity.
pub(crate) fn delete(&mut self, insert_key: InsertKey) -> bool {
// Check that (i) there is a node at this index, (ii) this node is in
// the heap and (iii) this node has the correct epoch.
let slab_idx = insert_key.slab_idx;
let heap_idx = if let Some(Node::HeapNode(node)) = self.slab.get(slab_idx) {
let heap_idx = node.heap_idx;
if self.heap[heap_idx].key.epoch != insert_key.epoch {
return false;
}
heap_idx
} else {
return false;
};
// If the last item of the heap is not the one to be deleted, sift it up
// or down as appropriate starting from the vacant spot.
let last_item = self.heap.pop().unwrap();
if let Some(item) = self.heap.get(heap_idx) {
if last_item.key < item.key {
self.sift_up(last_item, heap_idx);
} else {
self.sift_down(last_item, heap_idx);
}
}
// Free the deleted node in the slab.
self.slab[slab_idx] = Node::FreeNode(FreeNode {
next: self.first_free_node,
});
self.first_free_node = Some(slab_idx);
true
}
/// Take a heap item and, starting at `heap_idx`, move it up the heap while
/// a parent has a larger key.
#[inline]
fn sift_up(&mut self, item: Item<K>, heap_idx: usize) {
let mut child_heap_idx = heap_idx;
let key = &item.key;
while child_heap_idx != 0 {
let parent_heap_idx = (child_heap_idx - 1) / 2;
// Stop when the key is larger or equal to the parent's.
if key >= &self.heap[parent_heap_idx].key {
break;
}
// Move the parent down one level.
self.heap[child_heap_idx] = self.heap[parent_heap_idx];
let parent_slab_idx = self.heap[parent_heap_idx].slab_idx;
*self.slab[parent_slab_idx].unwrap_heap_index_mut() = child_heap_idx;
// Stop when the key is larger or equal to the parent's.
if key >= &self.heap[parent_heap_idx].key {
break;
}
// Make the former parent the new child.
child_heap_idx = parent_heap_idx;
}
// Move the original item to the current child.
self.heap[child_heap_idx] = item;
*self.slab[item.slab_idx].unwrap_heap_index_mut() = child_heap_idx;
}
/// Take a heap item and, starting at `heap_idx`, move it down the heap
/// while a child has a smaller key.
#[inline]
fn sift_down(&mut self, item: Item<K>, heap_idx: usize) {
let mut parent_heap_idx = heap_idx;
let mut child_heap_idx = 2 * parent_heap_idx + 1;
let key = &item.key;
while child_heap_idx < self.heap.len() {
// If the sibling exists and has a smaller key, make it the
// candidate for swapping.
if let Some(other_child) = self.heap.get(child_heap_idx + 1) {
child_heap_idx += (self.heap[child_heap_idx].key > other_child.key) as usize;
}
// Stop when the key is smaller or equal to the child with the smallest key.
if key <= &self.heap[child_heap_idx].key {
break;
}
// Move the child up one level.
self.heap[parent_heap_idx] = self.heap[child_heap_idx];
let child_slab_idx = self.heap[child_heap_idx].slab_idx;
*self.slab[child_slab_idx].unwrap_heap_index_mut() = parent_heap_idx;
// Make the child the new parent.
parent_heap_idx = child_heap_idx;
child_heap_idx = 2 * parent_heap_idx + 1;
}
// Move the original item to the current parent.
self.heap[parent_heap_idx] = item;
*self.slab[item.slab_idx].unwrap_heap_index_mut() = parent_heap_idx;
}
}
/// Data related to a single key-value pair stored in the heap.
#[derive(Copy, Clone)]
struct Item<K: Copy> {
// A unique key by which the heap is sorted.
key: UniqueKey<K>,
// An index pointing to the corresponding node in the slab.
slab_idx: usize,
}
/// Data related to a single key-value pair stored in the slab.
enum Node<V> {
FreeNode(FreeNode),
HeapNode(HeapNode<V>),
}
impl<V> Node<V> {
/// Unwraps the `FreeNode::next` field.
fn unwrap_next_free_node(&self) -> Option<usize> {
match self {
Self::FreeNode(n) => n.next,
_ => panic!("the node was expected to be a free node"),
}
}
/// Unwraps the `HeapNode::value` field.
fn unwrap_value(self) -> V {
match self {
Self::HeapNode(n) => n.value,
_ => panic!("the node was expected to be a heap node"),
}
}
/// Unwraps the `HeapNode::value` field.
fn unwrap_value_ref(&self) -> &V {
match self {
Self::HeapNode(n) => &n.value,
_ => panic!("the node was expected to be a heap node"),
}
}
/// Unwraps a mutable reference to the `HeapNode::heap_idx` field.
fn unwrap_heap_index_mut(&mut self) -> &mut usize {
match self {
Self::HeapNode(n) => &mut n.heap_idx,
_ => panic!("the node was expected to be a heap node"),
}
}
}
/// A node that is no longer in the binary heap.
struct FreeNode {
// An index pointing to the next free node, if any.
next: Option<usize>,
}
/// A node currently in the binary heap.
struct HeapNode<V> {
// The value associated to this node.
value: V,
// Index of the node in the heap.
heap_idx: usize,
}
/// A unique insertion key that can be used for key-value pair deletion.
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub(crate) struct InsertKey {
// An index pointing to a node in the slab.
slab_idx: usize,
// The epoch when the node was inserted.
epoch: u64,
}
/// A unique key made of the user-provided key complemented by a unique epoch.
///
/// Implementation note: `UniqueKey` automatically derives `PartialOrd`, which
/// implies that lexicographic order between `key` and `epoch` must be preserved
/// to make sure that `key` has a higher sorting priority than `epoch`.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct UniqueKey<K: Copy + Clone> {
/// The user-provided key.
key: K,
/// A unique epoch that indicates the insertion date.
epoch: u64,
}
#[cfg(all(test, not(asynchronix_loom)))]
mod tests {
use std::fmt::Debug;
use super::*;
enum Op<K, V> {
Insert(K, V),
InsertAndMark(K, V),
Pull(Option<(K, V)>),
DeleteMarked(bool),
}
fn check<K: Copy + Clone + Ord + Debug, V: Eq + Debug>(
operations: impl Iterator<Item = Op<K, V>>,
) {
let mut queue = PriorityQueue::new();
let mut marked = None;
for op in operations {
match op {
Op::Insert(key, value) => {
queue.insert(key, value);
}
Op::InsertAndMark(key, value) => {
marked = Some(queue.insert(key, value));
}
Op::Pull(kv) => {
assert_eq!(queue.pull(), kv);
}
Op::DeleteMarked(success) => {
assert_eq!(
queue.delete(marked.take().expect("no item was marked for deletion")),
success
)
}
}
}
}
#[test]
fn priority_queue_smoke() {
let operations = [
Op::Insert(5, 'a'),
Op::Insert(2, 'b'),
Op::Insert(3, 'c'),
Op::Insert(4, 'd'),
Op::Insert(9, 'e'),
Op::Insert(1, 'f'),
Op::Insert(8, 'g'),
Op::Insert(0, 'h'),
Op::Insert(7, 'i'),
Op::Insert(6, 'j'),
Op::Pull(Some((0, 'h'))),
Op::Pull(Some((1, 'f'))),
Op::Pull(Some((2, 'b'))),
Op::Pull(Some((3, 'c'))),
Op::Pull(Some((4, 'd'))),
Op::Pull(Some((5, 'a'))),
Op::Pull(Some((6, 'j'))),
Op::Pull(Some((7, 'i'))),
Op::Pull(Some((8, 'g'))),
Op::Pull(Some((9, 'e'))),
];
fn priority_smoke() {
let mut q = PriorityQueue::new();
check(operations.into_iter());
}
q.insert(5, 'e');
q.insert(2, 'y');
q.insert(1, 'a');
q.insert(3, 'c');
q.insert(2, 'z');
q.insert(4, 'd');
q.insert(2, 'x');
#[test]
fn priority_queue_interleaved() {
let operations = [
Op::Insert(2, 'a'),
Op::Insert(7, 'b'),
Op::Insert(5, 'c'),
Op::Pull(Some((2, 'a'))),
Op::Insert(4, 'd'),
Op::Pull(Some((4, 'd'))),
Op::Insert(8, 'e'),
Op::Insert(2, 'f'),
Op::Pull(Some((2, 'f'))),
Op::Pull(Some((5, 'c'))),
Op::Pull(Some((7, 'b'))),
Op::Insert(5, 'g'),
Op::Insert(3, 'h'),
Op::Pull(Some((3, 'h'))),
Op::Pull(Some((5, 'g'))),
Op::Pull(Some((8, 'e'))),
Op::Pull(None),
];
check(operations.into_iter());
}
#[test]
fn priority_queue_equal_keys() {
let operations = [
Op::Insert(4, 'a'),
Op::Insert(1, 'b'),
Op::Insert(3, 'c'),
Op::Pull(Some((1, 'b'))),
Op::Insert(4, 'd'),
Op::Insert(8, 'e'),
Op::Insert(3, 'f'),
Op::Pull(Some((3, 'c'))),
Op::Pull(Some((3, 'f'))),
Op::Pull(Some((4, 'a'))),
Op::Insert(8, 'g'),
Op::Pull(Some((4, 'd'))),
Op::Pull(Some((8, 'e'))),
Op::Pull(Some((8, 'g'))),
Op::Pull(None),
];
check(operations.into_iter());
}
#[test]
fn priority_queue_delete_valid() {
let operations = [
Op::Insert(8, 'a'),
Op::Insert(1, 'b'),
Op::Insert(3, 'c'),
Op::InsertAndMark(3, 'd'),
Op::Insert(2, 'e'),
Op::Pull(Some((1, 'b'))),
Op::Insert(4, 'f'),
Op::DeleteMarked(true),
Op::Insert(5, 'g'),
Op::Pull(Some((2, 'e'))),
Op::Pull(Some((3, 'c'))),
Op::Pull(Some((4, 'f'))),
Op::Pull(Some((5, 'g'))),
Op::Pull(Some((8, 'a'))),
Op::Pull(None),
];
check(operations.into_iter());
}
#[test]
fn priority_queue_delete_invalid() {
let operations = [
Op::Insert(0, 'a'),
Op::Insert(7, 'b'),
Op::InsertAndMark(2, 'c'),
Op::Insert(4, 'd'),
Op::Pull(Some((0, 'a'))),
Op::Insert(2, 'e'),
Op::Pull(Some((2, 'c'))),
Op::Insert(4, 'f'),
Op::DeleteMarked(false),
Op::Pull(Some((2, 'e'))),
Op::Pull(Some((4, 'd'))),
Op::Pull(Some((4, 'f'))),
Op::Pull(Some((7, 'b'))),
Op::Pull(None),
];
check(operations.into_iter());
}
#[test]
fn priority_queue_fuzz() {
use std::cell::Cell;
use std::collections::BTreeMap;
use crate::util::rng::Rng;
// Number of fuzzing operations.
const ITER: usize = if cfg!(miri) { 1000 } else { 10_000_000 };
// Inclusive upper bound for randomly generated keys.
const MAX_KEY: u64 = 99;
// Probabilistic weight of each of the 4 operations.
//
// The weight for pull values should probably stay close to the sum of
// the two insertion weights to prevent queue size runaway.
const INSERT_WEIGHT: u64 = 5;
const INSERT_AND_MARK_WEIGHT: u64 = 1;
const PULL_WEIGHT: u64 = INSERT_WEIGHT + INSERT_AND_MARK_WEIGHT;
const DELETE_MARKED_WEIGHT: u64 = 1;
// Defines 4 basic operations on the priority queue, each of them being
// performed on both the tested implementation and on a shadow queue
// implemented with a `BTreeMap`. Any mismatch between the outcomes of
// pull and delete operations between the two queues triggers a panic.
let epoch: Cell<usize> = Cell::new(0);
let marked: Cell<Option<InsertKey>> = Cell::new(None);
let shadow_marked: Cell<Option<(u64, usize)>> = Cell::new(None);
let insert_fn = |queue: &mut PriorityQueue<u64, u64>,
shadow_queue: &mut BTreeMap<(u64, usize), u64>,
key,
value| {
queue.insert(key, value);
shadow_queue.insert((key, epoch.get()), value);
epoch.set(epoch.get() + 1);
};
let insert_and_mark_fn = |queue: &mut PriorityQueue<u64, u64>,
shadow_queue: &mut BTreeMap<(u64, usize), u64>,
key,
value| {
marked.set(Some(queue.insert(key, value)));
shadow_queue.insert((key, epoch.get()), value);
shadow_marked.set(Some((key, epoch.get())));
epoch.set(epoch.get() + 1);
};
let pull_fn = |queue: &mut PriorityQueue<u64, u64>,
shadow_queue: &mut BTreeMap<(u64, usize), u64>| {
let value = queue.pull();
let shadow_value = match shadow_queue.iter().next() {
Some((&unique_key, &value)) => {
shadow_queue.remove(&unique_key);
Some((unique_key.0, value))
}
None => None,
};
assert_eq!(value, shadow_value);
};
let delete_marked_fn =
|queue: &mut PriorityQueue<u64, u64>,
shadow_queue: &mut BTreeMap<(u64, usize), u64>| {
let success = match marked.take() {
Some(delete_key) => Some(queue.delete(delete_key)),
None => None,
};
let shadow_success = match shadow_marked.take() {
Some(delete_key) => Some(shadow_queue.remove(&delete_key).is_some()),
None => None,
};
assert_eq!(success, shadow_success);
};
// Fuzz away.
let mut queue = PriorityQueue::new();
let mut shadow_queue = BTreeMap::new();
let rng = Rng::new(12345);
const TOTAL_WEIGHT: u64 =
INSERT_WEIGHT + INSERT_AND_MARK_WEIGHT + PULL_WEIGHT + DELETE_MARKED_WEIGHT;
for _ in 0..ITER {
// Randomly choose one of the 4 possible operations, respecting the
// probability weights.
let mut op = rng.gen_bounded(TOTAL_WEIGHT);
if op < INSERT_WEIGHT {
let key = rng.gen_bounded(MAX_KEY + 1);
let val = rng.gen();
insert_fn(&mut queue, &mut shadow_queue, key, val);
continue;
}
op -= INSERT_WEIGHT;
if op < INSERT_AND_MARK_WEIGHT {
let key = rng.gen_bounded(MAX_KEY + 1);
let val = rng.gen();
insert_and_mark_fn(&mut queue, &mut shadow_queue, key, val);
continue;
}
op -= INSERT_AND_MARK_WEIGHT;
if op < PULL_WEIGHT {
pull_fn(&mut queue, &mut shadow_queue);
continue;
}
delete_marked_fn(&mut queue, &mut shadow_queue);
}
assert_eq!(q.peek(), Some((&1, &'a')));
assert_eq!(q.pull(), Some((1, 'a')));
assert_eq!(q.peek(), Some((&2, &'y')));
assert_eq!(q.pull(), Some((2, 'y')));
assert_eq!(q.peek(), Some((&2, &'z')));
assert_eq!(q.pull(), Some((2, 'z')));
assert_eq!(q.peek(), Some((&2, &'x')));
assert_eq!(q.pull(), Some((2, 'x')));
assert_eq!(q.peek(), Some((&3, &'c')));
assert_eq!(q.pull(), Some((3, 'c')));
assert_eq!(q.peek(), Some((&4, &'d')));
assert_eq!(q.pull(), Some((4, 'd')));
assert_eq!(q.peek(), Some((&5, &'e')));
assert_eq!(q.pull(), Some((5, 'e')));
}
}