diff --git a/README.md b/README.md index 371d9e0..945a9ea 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ pub struct DelayedMultiplier { impl DelayedMultiplier { pub fn input(&mut self, value: f64, scheduler: &Scheduler) { scheduler - .schedule_in(Duration::from_secs(1), Self::send, 2.0 * value) + .schedule_event_in(Duration::from_secs(1), Self::send, 2.0 * value) .unwrap(); } async fn send(&mut self, value: f64) { @@ -157,11 +157,12 @@ the posted events. For these reasons, Asynchronix relies on a fully custom runtime. Even though the runtime was largely influenced by Tokio, it features additional -optimization that make its faster than any other multi-threaded Rust executor on -the typically message-passing-heavy workloads seen in discrete-event simulation -(see [benchmark]). Asynchronix also improves over the state of the art with a -very fast custom MPSC channel, which performance has been demonstrated through -[Tachyonix][tachyonix], a general-purpose offshoot of this channel. +optimizations that make its faster than any other multi-threaded Rust executor +on the typically message-passing-heavy workloads seen in discrete-event +simulation (see [benchmark]). Asynchronix also improves over the state of the +art with a very fast custom MPSC channel, which performance has been +demonstrated through [Tachyonix][tachyonix], a general-purpose offshoot of this +channel. [actor_model]: https://en.wikipedia.org/wiki/Actor_model diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 981228a..bd39917 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -31,6 +31,7 @@ diatomic-waker = "0.1" futures-task = "0.3" multishot = "0.3" num_cpus = "1.13" +pin-project-lite = "0.2" recycle-box = "0.2" slab = "0.4" st3 = "0.4" diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index b504632..30a3fb4 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -3,7 +3,7 @@ //! This example demonstrates in particular: //! //! * non-trivial state machines, -//! * cancellation of calls scheduled at the current time step using epochs, +//! * cancellation of events, //! * model initialization, //! * simulation monitoring with event slots. //! @@ -37,7 +37,7 @@ use std::time::Duration; use asynchronix::model::{InitializedModel, Model, Output}; use asynchronix::simulation::{Mailbox, SimInit}; -use asynchronix::time::{MonotonicTime, Scheduler, SchedulerKey}; +use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; /// Water pump. pub struct Pump { @@ -79,12 +79,9 @@ pub struct Controller { brew_time: Duration, /// Current water sense state. water_sense: WaterSenseState, - /// Scheduler key, which if present indicates that the machine is current + /// Event key, which if present indicates that the machine is currently /// brewing -- internal state. - stop_brew_key: Option, - /// An epoch incremented when the scheduled 'stop_brew` callback must be - /// ignored -- internal state. - stop_brew_epoch: u64, + stop_brew_key: Option, } impl Controller { @@ -98,22 +95,16 @@ impl Controller { pump_cmd: Output::default(), stop_brew_key: None, water_sense: WaterSenseState::Empty, // will be overridden during init - stop_brew_epoch: 0, } } /// Signals a change in the water sensing state -- input port. - pub async fn water_sense(&mut self, state: WaterSenseState, scheduler: &Scheduler) { + pub async fn water_sense(&mut self, state: WaterSenseState) { // Check if the tank just got empty. if state == WaterSenseState::Empty && self.water_sense == WaterSenseState::NotEmpty { // If a brew was ongoing, we must cancel it. if let Some(key) = self.stop_brew_key.take() { - // Try to abort the scheduled call to `stop_brew()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(key).is_err() { - self.stop_brew_epoch = self.stop_brew_epoch.wrapping_add(1); - }; - + key.cancel_event(); self.pump_cmd.send(PumpCommand::Off).await; } } @@ -136,11 +127,8 @@ impl Controller { if let Some(key) = self.stop_brew_key.take() { self.pump_cmd.send(PumpCommand::Off).await; - // Try to abort the scheduled call to `stop_brew()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(key).is_err() { - self.stop_brew_epoch = self.stop_brew_epoch.wrapping_add(1); - }; + // Abort the scheduled call to `stop_brew()`. + key.cancel_event(); return; } @@ -153,19 +141,14 @@ impl Controller { // Schedule the `stop_brew()` method and turn on the pump. self.stop_brew_key = Some( scheduler - .schedule_in(self.brew_time, Self::stop_brew, self.stop_brew_epoch) + .schedule_keyed_event_in(self.brew_time, Self::stop_brew, ()) .unwrap(), ); self.pump_cmd.send(PumpCommand::On).await; } /// Stops brewing. - async fn stop_brew(&mut self, epoch: u64) { - // Ignore this call if the epoch has been incremented. - if self.stop_brew_epoch != epoch { - return; - } - + async fn stop_brew(&mut self) { if self.stop_brew_key.take().is_some() { self.pump_cmd.send(PumpCommand::Off).await; } @@ -190,9 +173,6 @@ pub struct Tank { volume: f64, /// State that exists when the mass flow rate is non-zero -- internal state. dynamic_state: Option, - /// An epoch incremented when the pending call to `set_empty()` must be - /// ignored -- internal state. - set_empty_epoch: u64, } impl Tank { /// Creates a new tank with the specified amount of water [m³]. @@ -204,7 +184,6 @@ impl Tank { Self { volume: water_volume, dynamic_state: None, - set_empty_epoch: 0, water_sense: Output::default(), } } @@ -224,11 +203,8 @@ impl Tank { // If the current flow rate is non-zero, compute the current volume and // schedule a new update. if let Some(state) = self.dynamic_state.take() { - // Try to abort the scheduled call to `set_empty()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(state.set_empty_key).is_err() { - self.set_empty_epoch = self.set_empty_epoch.wrapping_add(1); - } + // Abort the scheduled call to `set_empty()`. + state.set_empty_key.cancel_event(); // Update the volume, saturating at 0 in case of rounding errors. let time = scheduler.time(); @@ -260,11 +236,8 @@ impl Tank { // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { - // Try to abort the scheduled call to `set_empty()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(state.set_empty_key).is_err() { - self.set_empty_epoch = self.set_empty_epoch.wrapping_add(1); - } + // Abort the scheduled call to `set_empty()`. + state.set_empty_key.cancel_event(); // Update the volume, saturating at 0 in case of rounding errors. let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); @@ -301,7 +274,7 @@ impl Tank { let duration_until_empty = Duration::from_secs_f64(duration_until_empty); // Schedule the next update. - match scheduler.schedule_in(duration_until_empty, Self::set_empty, self.set_empty_epoch) { + match scheduler.schedule_keyed_event_in(duration_until_empty, Self::set_empty, ()) { Ok(set_empty_key) => { let state = TankDynamicState { last_volume_update: time, @@ -319,12 +292,7 @@ impl Tank { } /// Updates the state of the tank to indicate that there is no more water. - async fn set_empty(&mut self, epoch: u64) { - // Ignore this call if the epoch has been incremented. - if epoch != self.set_empty_epoch { - return; - } - + async fn set_empty(&mut self) { self.volume = 0.0; self.dynamic_state = None; self.water_sense.send(WaterSenseState::Empty).await; @@ -355,7 +323,7 @@ impl Model for Tank { /// is non-zero. struct TankDynamicState { last_volume_update: MonotonicTime, - set_empty_key: SchedulerKey, + set_empty_key: EventKey, flow_rate: f64, } @@ -429,7 +397,7 @@ fn main() { // Drink too much coffee. let volume_per_shot = pump_flow_rate * Controller::DEFAULT_BREW_TIME.as_secs_f64(); - let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who care about floating-point rounding errors? + let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who cares about floating-point rounding errors? for _ in 0..(shots_per_tank - 1) { simu.send_event(Controller::brew_cmd, (), &controller_addr); assert_eq!(flow_rate.take(), Some(pump_flow_rate)); @@ -463,7 +431,7 @@ fn main() { assert_eq!(flow_rate.take(), Some(0.0)); // Interrupt the brew after 15s by pressing again the brew button. - simu.schedule_in( + simu.schedule_event_in( Duration::from_secs(15), Controller::brew_cmd, (), diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 216197d..a232d88 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -174,7 +174,7 @@ impl Driver { // Schedule the next pulse. scheduler - .schedule_in(pulse_duration, Self::send_pulse, ()) + .schedule_event_in(pulse_duration, Self::send_pulse, ()) .unwrap(); } } @@ -224,7 +224,7 @@ fn main() { assert!(position.next().is_none()); // Start the motor in 2s with a PPS of 10Hz. - simu.schedule_in( + simu.schedule_event_in( Duration::from_secs(2), Driver::pulse_rate, 10.0, diff --git a/asynchronix/src/channel/queue.rs b/asynchronix/src/channel/queue.rs index d7beffa..6b1e233 100644 --- a/asynchronix/src/channel/queue.rs +++ b/asynchronix/src/channel/queue.rs @@ -79,7 +79,7 @@ enum MessageBox { None, } -/// A queue slot that a stamp and either a boxed messaged or an empty box. +/// A queue slot with a stamp and either a boxed messaged or an empty box. struct Slot { stamp: AtomicUsize, message: UnsafeCell>, diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index dffe725..bd88eef 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -113,7 +113,7 @@ //! } //! impl Delay { //! pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap(); +//! scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap(); //! } //! //! async fn send(&mut self, value: f64) { @@ -184,7 +184,7 @@ //! # } //! # impl Delay { //! # pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! # scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap(); +//! # scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -242,7 +242,7 @@ //! [`Simulation::send_event()`](simulation::Simulation::send_event) or //! [`Simulation::send_query()`](simulation::Simulation::send_query), //! 3. by scheduling events, using for instance -//! [`Simulation::schedule_in()`](simulation::Simulation::schedule_in). +//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in). //! //! Simulation outputs can be monitored using //! [`EventSlot`](simulation::EventSlot)s and @@ -275,7 +275,7 @@ //! # } //! # impl Delay { //! # pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! # scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap(); +//! # scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -354,11 +354,12 @@ //! //! The first guarantee (and only the first) also extends to events scheduled //! from a simulation with -//! [`Simulation::schedule_in()`](simulation::Simulation::schedule_in) or -//! [`Simulation::schedule_at()`](simulation::Simulation::schedule_at): if the -//! scheduler contains several events to be delivered at the same time to the -//! same model, these events will always be processed in the order in which they -//! were scheduled. +//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in) +//! or +//! [`Simulation::schedule_event_at()`](simulation::Simulation::schedule_event_at): +//! if the scheduler contains several events to be delivered at the same time to +//! the same model, these events will always be processed in the order in which +//! they were scheduled. //! //! [actor_model]: https://en.wikipedia.org/wiki/Actor_model //! [pony]: https://www.ponylang.io/ diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index e63488f..8c5b957 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -73,7 +73,7 @@ //! //! At the moment, Asynchronix is unfortunately not able to discriminate between //! such pathological deadlocks and the "expected" deadlock that occurs when all -//! tasks in a given time slice have completed and all models are starved on an +//! events in a given time slice have completed and all models are starved on an //! empty mailbox. Consequently, blocking method such as [`SimInit::init()`], //! [`Simulation::step()`], [`Simulation::send_event()`], etc., will return //! without error after a pathological deadlock, leaving the user responsible @@ -91,8 +91,8 @@ //! There is actually a very simple solution to this problem: since the //! [`InputFn`](crate::model::InputFn) trait also matches closures of type //! `FnOnce(&mut impl Model)`, it is enough to invoke -//! [`Simulation::send_event()`] with a closure that connects or disconnects -//! a port, such as: +//! [`Simulation::send_event()`] with a closure that connects or disconnects a +//! port, such as: //! //! ``` //! # use asynchronix::model::{Model, Output}; @@ -129,15 +129,15 @@ pub use sim_init::SimInit; use std::error::Error; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; use crate::executor::Executor; use crate::model::{InputFn, Model, ReplierFn}; -use crate::time::{self, CancellationError, MonotonicTime, TearableAtomicTime}; -use crate::time::{ScheduledTimeError, SchedulerKey, SchedulerQueue}; +use crate::time::{self, MonotonicTime, TearableAtomicTime}; +use crate::time::{EventKey, ScheduledTimeError, SchedulerQueue}; use crate::util::futures::SeqFuture; use crate::util::slot; use crate::util::sync_cell::SyncCell; @@ -164,9 +164,9 @@ use crate::util::sync_cell::SyncCell; /// the case of queries, the response is returned. /// /// Events can also be scheduled at a future simulation time using -/// [`schedule_in()`](Simulation::schedule_in) or -/// [`schedule_at()`](Simulation::schedule_at). These methods queue an event -/// without blocking. +/// [`schedule_event_in()`](Simulation::schedule_event_in) or +/// [`schedule_event_at()`](Simulation::schedule_event_at). These methods queue +/// an event without blocking. /// /// Finally, the [`Simulation`] instance manages simulation time. Calling /// [`step()`](Simulation::step) will increment simulation time until that of @@ -201,20 +201,20 @@ impl Simulation { self.time.read() } - /// Advances simulation time to that of the next scheduled task, processing - /// that task as well as all other tasks scheduled for the same time. + /// Advances simulation time to that of the next scheduled event, processing + /// that event as well as all other event scheduled for the same time. /// /// This method may block. Once it returns, it is guaranteed that all newly - /// processed tasks (if any) have completed. + /// processed event (if any) have completed. pub fn step(&mut self) { self.step_to_next_bounded(MonotonicTime::MAX); } /// Iteratively advances the simulation time by the specified duration and - /// processes all tasks scheduled up to the target time. + /// processes all events scheduled up to the target time. /// /// This method may block. Once it returns, it is guaranteed that (i) all - /// tasks scheduled up to the specified target time have completed and (ii) + /// events scheduled up to the specified target time have completed and (ii) /// the final simulation time has been incremented by the specified /// duration. pub fn step_by(&mut self, duration: Duration) { @@ -223,11 +223,11 @@ impl Simulation { self.step_until_unchecked(target_time); } - /// Iteratively advances the simulation time and processes all tasks + /// Iteratively advances the simulation time and processes all events /// scheduled up to the specified target time. /// /// This method may block. Once it returns, it is guaranteed that (i) all - /// tasks scheduled up to the specified target time have completed and (ii) + /// events scheduled up to the specified target time have completed and (ii) /// the final simulation time matches the target time. pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ScheduledTimeError<()>> { if self.time.read() >= target_time { @@ -244,13 +244,13 @@ impl Simulation { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_in( + pub fn schedule_event_in( &mut self, duration: Duration, func: F, arg: T, address: impl Into>, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -261,7 +261,36 @@ impl Simulation { } let time = self.time.read() + duration; - let schedule_key = time::schedule_event_at_unchecked( + time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules an event at the lapse of the specified duration and returns an + /// event key. + /// + /// An error is returned if the specified duration is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + pub fn schedule_keyed_event_in( + &mut self, + duration: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if duration.is_zero() { + return Err(ScheduledTimeError(arg)); + } + let time = self.time.read() + duration; + + let event_key = time::schedule_keyed_event_at_unchecked( time, func, arg, @@ -269,7 +298,7 @@ impl Simulation { &self.scheduler_queue, ); - Ok(schedule_key) + Ok(event_key) } /// Schedules an event at a future time. @@ -279,13 +308,13 @@ impl Simulation { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_at( + pub fn schedule_event_at( &mut self, time: MonotonicTime, func: F, arg: T, address: impl Into>, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -294,7 +323,34 @@ impl Simulation { if self.time.read() >= time { return Err(ScheduledTimeError(arg)); } - let schedule_key = time::schedule_event_at_unchecked( + time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules an event at a future time and returns an event key. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + pub fn schedule_keyed_event_at( + &mut self, + time: MonotonicTime, + func: F, + arg: T, + address: impl Into>, + ) -> Result> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if self.time.read() >= time { + return Err(ScheduledTimeError(arg)); + } + let event_key = time::schedule_keyed_event_at_unchecked( time, func, arg, @@ -302,16 +358,7 @@ impl Simulation { &self.scheduler_queue, ); - Ok(schedule_key) - } - - /// Cancels an event with a scheduled time in the future of the current - /// simulation time. - /// - /// If the corresponding event was already executed, or if it is scheduled - /// for the current simulation time, an error is returned. - pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> { - time::cancel_scheduled(scheduler_key, &self.scheduler_queue) + Ok(event_key) } /// Sends and processes an event, blocking until completion. @@ -387,72 +434,84 @@ impl Simulation { reply_reader.try_read().map_err(|_| QueryError {}) } - /// Advances simulation time to that of the next scheduled task if its + /// Advances simulation time to that of the next scheduled event if its /// scheduling time does not exceed the specified bound, processing that - /// task as well as all other tasks scheduled for the same time. + /// event as well as all other events scheduled for the same time. /// - /// If at least one task was found that satisfied the time bound, the + /// If at least one event was found that satisfied the time bound, the /// corresponding new simulation time is returned. fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - let mut current_key = match scheduler_queue.peek_key() { - Some(&k) if k.0 <= upper_time_bound => k, - _ => return None, + // Closure returning the next key which time stamp is no older than the + // upper bound, if any. Cancelled events are discarded. + let get_next_key = |scheduler_queue: &mut MutexGuard| { + loop { + match scheduler_queue.peek() { + Some((&k, t)) if k.0 <= upper_time_bound => { + if !t.is_cancelled() { + break Some(k); + } + // Discard cancelled events. + scheduler_queue.pull(); + } + _ => break None, + } + } }; - // Set the simulation time to that of the next scheduled task + // Move to the next scheduled time. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let mut current_key = get_next_key(&mut scheduler_queue)?; self.time.write(current_key.0); loop { - let task = scheduler_queue.pull().unwrap().1; + let event = scheduler_queue.pull().unwrap().1; - let mut next_key = scheduler_queue.peek_key(); - if next_key != Some(¤t_key) { - // Since there are no other tasks targeting the same mailbox - // and the same time, the task is spawned immediately. - self.executor.spawn_and_forget(Box::into_pin(task)); + let mut next_key = get_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { + // Since there are no other events targeting the same mailbox + // and the same time, the event is spawned immediately. + self.executor.spawn_and_forget(Box::into_pin(event)); } else { // To ensure that their relative order of execution is - // preserved, all tasks targeting the same mailbox are - // concatenated into a single future. - let mut task_sequence = SeqFuture::new(); + // preserved, all event targeting the same mailbox are executed + // sequentially within a single compound future. + let mut event_sequence = SeqFuture::new(); - task_sequence.push(Box::into_pin(task)); + event_sequence.push(Box::into_pin(event)); loop { - let task = scheduler_queue.pull().unwrap().1; - task_sequence.push(Box::into_pin(task)); - next_key = scheduler_queue.peek_key(); - if next_key != Some(¤t_key) { + let event = scheduler_queue.pull().unwrap().1; + event_sequence.push(Box::into_pin(event)); + next_key = get_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { break; } } - // Spawn a parent task that sequentially polls all sub-tasks. - self.executor.spawn_and_forget(task_sequence); + // Spawn a parent event that sequentially polls all events + // targeting the same mailbox. + self.executor.spawn_and_forget(event_sequence); } - match next_key { - // If the next task is scheduled at the same time, update the key and continue. - Some(k) if k.0 == current_key.0 => { - current_key = *k; - } - // Otherwise wait until all tasks have completed and return. + current_key = match next_key { + // If the next event is scheduled at the same time, update the + // key and continue. + Some(k) if k.0 == current_key.0 => k, + // Otherwise wait until all events have completed and return. _ => { - drop(scheduler_queue); // make sure the queue's mutex is unlocked. + drop(scheduler_queue); // make sure the queue's mutex is released. self.executor.run(); return Some(current_key.0); } - } + }; } } - /// Iteratively advances simulation time and processes all tasks scheduled + /// Iteratively advances simulation time and processes all events scheduled /// up to the specified target time. /// - /// Once the method returns it is guaranteed that (i) all tasks scheduled up - /// to the specified target time have completed and (ii) the final + /// Once the method returns it is guaranteed that (i) all events scheduled + /// up to the specified target time have completed and (ii) the final /// simulation time matches the target time. /// /// This method does not check whether the specified time lies in the future @@ -462,7 +521,7 @@ impl Simulation { match self.step_to_next_bounded(target_time) { // The target time was reached exactly. Some(t) if t == target_time => return, - // No tasks are scheduled before or at the target time. + // No events are scheduled before or at the target time. None => { // Update the simulation time. self.time.write(target_time); diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index 19497b8..56dbb58 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -31,7 +31,7 @@ //! //! // Sets an alarm [input port]. //! pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { -//! if scheduler.schedule_at(setting, Self::ring, ()).is_err() { +//! if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() { //! println!("The alarm clock can only be set for a future time"); //! } //! } @@ -50,5 +50,7 @@ mod scheduler; pub(crate) use monotonic_time::TearableAtomicTime; pub use monotonic_time::{MonotonicTime, SystemTimeError}; -pub(crate) use scheduler::{cancel_scheduled, schedule_event_at_unchecked, SchedulerQueue}; -pub use scheduler::{CancellationError, ScheduledTimeError, Scheduler, SchedulerKey}; +pub(crate) use scheduler::{ + schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, SchedulerQueue, +}; +pub use scheduler::{EventKey, ScheduledTimeError, Scheduler}; diff --git a/asynchronix/src/time/scheduler.rs b/asynchronix/src/time/scheduler.rs index 142fcf1..99ee365 100644 --- a/asynchronix/src/time/scheduler.rs +++ b/asynchronix/src/time/scheduler.rs @@ -3,20 +3,23 @@ use std::error::Error; use std::fmt; use std::future::Future; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use std::time::Duration; +use pin_project_lite::pin_project; use recycle_box::{coerce_box, RecycleBox}; use crate::channel::{ChannelId, Sender}; use crate::model::{InputFn, Model}; use crate::time::{MonotonicTime, TearableAtomicTime}; -use crate::util::priority_queue::{self, PriorityQueue}; +use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCellReader; /// Shorthand for the scheduler queue type. pub(crate) type SchedulerQueue = - PriorityQueue<(MonotonicTime, ChannelId), Box + Send>>; + PriorityQueue<(MonotonicTime, ChannelId), Box + Send>>; /// A local scheduler for models. /// @@ -65,8 +68,8 @@ pub(crate) type SchedulerQueue = /// let greeting = format!("Hello, this message was scheduled at: /// {:?}.", time); /// -/// if let Err(err) = scheduler.schedule_in(delay, Self::send_msg, greeting) { -/// // ^^^^^^^^ scheduled method +/// if let Err(err) = scheduler.schedule_event_in(delay, Self::send_msg, greeting) { +/// // ^^^^^^^^ scheduled method /// // The duration was zero, so greet right away. /// let greeting = err.0; /// self.msg_out.send(greeting).await; @@ -144,19 +147,19 @@ impl Scheduler { /// /// // Schedule this method again in 1s with an incremented counter. /// scheduler - /// .schedule_in(Duration::from_secs(1), Self::trigger, counter + 1) + /// .schedule_event_in(Duration::from_secs(1), Self::trigger, counter + 1) /// .unwrap(); /// } /// } /// /// impl Model for PeriodicLogger {} /// ``` - pub fn schedule_in( + pub fn schedule_event_in( &self, duration: Duration, func: F, arg: T, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, @@ -166,10 +169,70 @@ impl Scheduler { } let time = self.time() + duration; let sender = self.sender.clone(); - let schedule_key = - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - Ok(schedule_key) + Ok(()) + } + + /// Schedules an event at the lapse of the specified duration and returns an + /// event key. + /// + /// An error is returned if the specified duration is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use std::future::Future; + /// use asynchronix::model::Model; + /// use asynchronix::time::{EventKey, Scheduler}; + /// + /// // A model that logs the value of a counter every second after being + /// // triggered the first time until logging is stopped. + /// pub struct PeriodicLogger { + /// event_key: Option + /// } + /// + /// impl PeriodicLogger { + /// // Triggers the logging of a timestamp every second [input port]. + /// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler) { + /// self.stop(); + /// println!("counter: {}", counter); + /// + /// // Schedule this method again in 1s with an incremented counter. + /// let event_key = scheduler + /// .schedule_keyed_event_in(Duration::from_secs(1), Self::trigger, counter + 1) + /// .unwrap(); + /// self.event_key = Some(event_key); + /// } + /// + /// // Cancels the logging of timestamps. + /// pub fn stop(&mut self) { + /// self.event_key.take().map(|k| k.cancel_event()); + /// } + /// } + /// + /// impl Model for PeriodicLogger {} + /// ``` + pub fn schedule_keyed_event_in( + &self, + duration: Duration, + func: F, + arg: T, + ) -> Result> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if duration.is_zero() { + return Err(ScheduledTimeError(arg)); + } + let time = self.time() + duration; + let sender = self.sender.clone(); + let event_key = + schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(event_key) } /// Schedules an event at a future time. @@ -183,7 +246,7 @@ impl Scheduler { /// use asynchronix::model::Model; /// use asynchronix::time::{MonotonicTime, Scheduler}; /// - /// // An alarm clock model. + /// // An alarm clock. /// pub struct AlarmClock { /// msg: String /// } @@ -196,7 +259,7 @@ impl Scheduler { /// /// // Sets an alarm [input port]. /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { - /// if scheduler.schedule_at(setting, Self::ring, ()).is_err() { + /// if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() { /// println!("The alarm clock can only be set for a future time"); /// } /// } @@ -209,12 +272,12 @@ impl Scheduler { /// /// impl Model for AlarmClock {} /// ``` - pub fn schedule_at( + pub fn schedule_event_at( &self, time: MonotonicTime, func: F, arg: T, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, @@ -223,20 +286,77 @@ impl Scheduler { return Err(ScheduledTimeError(arg)); } let sender = self.sender.clone(); - let schedule_key = - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - Ok(schedule_key) + Ok(()) } - /// Cancels an event with a scheduled time in the future of the current - /// simulation time. + /// Schedules an event at a future time and returns an event key. /// - /// If the corresponding event was already executed, or if it is scheduled - /// for the current simulation time but was not yet executed, an error is - /// returned. - pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> { - cancel_scheduled(scheduler_key, &self.scheduler_queue) + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::Model; + /// use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; + /// + /// // An alarm clock that can be cancelled. + /// pub struct AlarmClock { + /// msg: String, + /// event_key: Option, + /// } + /// + /// impl AlarmClock { + /// // Creates a new alarm clock. + /// pub fn new(msg: String) -> Self { + /// Self { + /// msg, + /// event_key: None + /// } + /// } + /// + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { + /// self.cancel(); + /// match scheduler.schedule_keyed_event_at(setting, Self::ring, ()) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels the current alarm, if any. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel_event()); + /// } + /// + /// // Rings the alarm [private input port]. + /// fn ring(&mut self) { + /// println!("{}", self.msg); + /// } + /// } + /// + /// impl Model for AlarmClock {} + /// ``` + pub fn schedule_keyed_event_at( + &self, + time: MonotonicTime, + func: F, + arg: T, + ) -> Result> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if self.time() >= time { + return Err(ScheduledTimeError(arg)); + } + let sender = self.sender.clone(); + let event_key = + schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(event_key) } } @@ -246,15 +366,61 @@ impl fmt::Debug for Scheduler { } } -/// Unique identifier for a scheduled event. +/// Handle to a scheduled event. /// -/// A `SchedulerKey` can be used to cancel a future event. -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] -pub struct SchedulerKey(priority_queue::InsertKey); +/// An `EventKey` can be used to cancel a future event. +#[derive(Clone, Debug)] +pub struct EventKey { + state: Arc, +} -impl SchedulerKey { - pub(crate) fn new(key: priority_queue::InsertKey) -> Self { - Self(key) +impl EventKey { + const IS_PENDING: usize = 0; + const IS_CANCELLED: usize = 1; + const IS_PROCESSED: usize = 2; + + /// Creates a key for a pending event. + pub(crate) fn new() -> Self { + Self { + state: Arc::new(AtomicUsize::new(Self::IS_PENDING)), + } + } + + /// Checks whether the event was cancelled. + pub(crate) fn event_is_cancelled(&self) -> bool { + self.state.load(Ordering::Relaxed) == Self::IS_CANCELLED + } + + /// Marks the event as processed. + /// + /// If the event cannot be processed because it was cancelled, `false` is + /// returned. + pub(crate) fn process_event(self) -> bool { + match self.state.compare_exchange( + Self::IS_PENDING, + Self::IS_PROCESSED, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => true, + Err(s) => s == Self::IS_PROCESSED, + } + } + + /// Cancels the associated event if possible. + /// + /// If the event cannot be cancelled because it was already processed, + /// `false` is returned. + pub fn cancel_event(self) -> bool { + match self.state.compare_exchange( + Self::IS_PENDING, + Self::IS_CANCELLED, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => true, + Err(s) => s == Self::IS_CANCELLED, + } } } @@ -274,21 +440,6 @@ impl fmt::Display for ScheduledTimeError { impl Error for ScheduledTimeError {} -/// Error returned when the cancellation of a scheduler event is unsuccessful. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct CancellationError {} - -impl fmt::Display for CancellationError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "the scheduler key should belong to an event or command scheduled in the future of the current simulation time" - ) - } -} - -impl Error for CancellationError {} - /// Schedules an event at a future time. /// /// This method does not check whether the specified time lies in the future @@ -299,8 +450,7 @@ pub(crate) fn schedule_event_at_unchecked( arg: T, sender: Sender, scheduler_queue: &Mutex, -) -> SchedulerKey -where +) where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, @@ -321,26 +471,143 @@ where ) .await; }; + let fut = Box::new(UnkeyedEventFuture::new(fut)); let mut scheduler_queue = scheduler_queue.lock().unwrap(); - let insert_key = scheduler_queue.insert((time, channel_id), Box::new(fut)); - - SchedulerKey::new(insert_key) + scheduler_queue.insert((time, channel_id), fut); } -/// Cancels an event or command with a scheduled time in the future of the -/// current simulation time. +/// Schedules an event at a future time, returning an event key. /// -/// If the corresponding event or command was already executed, or if it is -/// scheduled for the current simulation time, an error is returned. -pub(crate) fn cancel_scheduled( - scheduler_key: SchedulerKey, +/// This method does not check whether the specified time lies in the future +/// of the current simulation time. +pub(crate) fn schedule_keyed_event_at_unchecked( + time: MonotonicTime, + func: F, + arg: T, + sender: Sender, scheduler_queue: &Mutex, -) -> Result<(), CancellationError> { - let mut scheduler_queue = scheduler_queue.lock().unwrap(); - if scheduler_queue.delete(scheduler_key.0) { - return Ok(()); - } +) -> EventKey +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let channel_id = sender.channel_id(); - Err(CancellationError {}) + let event_key = EventKey::new(); + let local_event_key = event_key.clone(); + + let fut = async move { + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = async move { + if local_event_key.process_event() { + func.call(model, arg, scheduler).await; + } + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; + }; + + // Implementation note: we end up with two atomic references to the event + // key stored inside the event future: one was moved above to the future + // itself and the other one is created below via cloning and stored + // separately in the `KeyedEventFuture`. This is not ideal as we could + // theoretically spare on atomic reference counting by storing a single + // reference, but this would likely require some tricky `unsafe`, not least + // because the inner future sent to the mailbox outlives the + // `KeyedEventFuture`. + let fut = Box::new(KeyedEventFuture::new(fut, event_key.clone())); + + let mut scheduler_queue = scheduler_queue.lock().unwrap(); + scheduler_queue.insert((time, channel_id), fut); + + event_key +} + +/// The future of an event which scheduling may be cancelled by the user. +pub(crate) trait EventFuture: Future { + /// Whether the scheduling of this event was cancelled. + fn is_cancelled(&self) -> bool; +} + +pin_project! { + /// Future associated to a regular event that cannot be cancelled. + pub(crate) struct UnkeyedEventFuture { + #[pin] + fut: F, + } +} + +impl UnkeyedEventFuture { + /// Creates a new `EventFuture`. + pub(crate) fn new(fut: F) -> Self { + Self { fut } + } +} + +impl Future for UnkeyedEventFuture +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl EventFuture for UnkeyedEventFuture +where + F: Future, +{ + fn is_cancelled(&self) -> bool { + false + } +} + +pin_project! { + /// Future associated to a keyed event that can be cancelled. + pub(crate) struct KeyedEventFuture { + event_key: EventKey, + #[pin] + fut: F, + } +} + +impl KeyedEventFuture { + /// Creates a new `EventFuture`. + pub(crate) fn new(fut: F, event_key: EventKey) -> Self { + Self { event_key, fut } + } +} + +impl Future for KeyedEventFuture +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl EventFuture for KeyedEventFuture +where + F: Future, +{ + fn is_cancelled(&self) -> bool { + self.event_key.event_is_cancelled() + } } diff --git a/asynchronix/src/util/futures.rs b/asynchronix/src/util/futures.rs index 22200a3..026e563 100644 --- a/asynchronix/src/util/futures.rs +++ b/asynchronix/src/util/futures.rs @@ -4,6 +4,8 @@ use std::future::Future; use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::task::{Context, Poll}; /// An owned future which sequentially polls a collection of futures. @@ -51,3 +53,39 @@ impl Future for SeqFuture { Poll::Pending } } + +trait RevocableFuture: Future { + fn is_revoked() -> bool; +} + +struct NeverRevokedFuture { + inner: F, +} + +impl NeverRevokedFuture { + fn new(fut: F) -> Self { + Self { inner: fut } + } +} +impl Future for NeverRevokedFuture { + type Output = T::Output; + + #[inline(always)] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } + } +} + +impl RevocableFuture for NeverRevokedFuture { + fn is_revoked() -> bool { + false + } +} + +struct ConcurrentlyRevocableFuture { + inner: F, + is_revoked: Arc, +} diff --git a/asynchronix/src/util/priority_queue.rs b/asynchronix/src/util/priority_queue.rs index bc6ccd2..b57dca1 100644 --- a/asynchronix/src/util/priority_queue.rs +++ b/asynchronix/src/util/priority_queue.rs @@ -1,52 +1,60 @@ //! Associative priority queue. -#![allow(unused)] +use std::cmp::{Eq, Ord, Ordering, PartialOrd}; +use std::collections::BinaryHeap; -use std::mem; +/// A key-value pair ordered by keys in inverse order, with epoch-based ordering +/// for equal keys. +struct Item +where + K: Ord, +{ + key: K, + value: V, + epoch: u64, +} -/// An associative container optimized for extraction of the value with the -/// lowest key and deletion of arbitrary key-value pairs. +impl Ord for Item +where + K: Ord, +{ + fn cmp(&self, other: &Self) -> Ordering { + self.key + .cmp(&other.key) + .then_with(|| self.epoch.cmp(&other.epoch)) + .reverse() + } +} + +impl PartialOrd for Item +where + K: Ord, +{ + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Eq for Item where K: Ord {} + +impl PartialEq for Item +where + K: Ord, +{ + fn eq(&self, other: &Self) -> bool { + (self.key == other.key) && (self.epoch == other.epoch) + } +} + +/// An associative container optimized for extraction of the key-value pair with +/// the lowest key, based on a binary heap. /// -/// This implementation has the same theoretical complexity for insert and pull -/// operations as a conventional array-based binary heap but does differ from -/// the latter in some important aspects: -/// -/// - elements can be deleted in *O*(log(*N*)) time rather than *O*(*N*) time -/// using a unique index returned at insertion time. -/// - same-key elements are guaranteed to be pulled in FIFO order, -/// -/// Under the hood, the priority queue relies on a binary heap cross-indexed -/// with values stored in a slab allocator. Each item of the binary heap -/// contains an index pointing to the associated slab-allocated node, as well as -/// the user-provided key. Each slab node contains the value associated to the -/// key and a back-pointing index to the binary heap. The heap items also -/// contain a unique epoch which allows same-key nodes to be sorted by insertion -/// order. The epoch is used as well to build unique indices that enable -/// efficient deletion of arbitrary key-value pairs. -/// -/// The slab-based design is what makes *O*(log(*N*)) deletion possible, but it -/// does come with some trade-offs: -/// -/// - its memory footprint is higher because it needs 2 extra pointer-sized -/// indices for each element to cross-index the heap and the slab, -/// - its computational footprint is higher because of the extra cost associated -/// with random slab access; that being said, array-based binary heaps are not -/// extremely cache-friendly to start with so unless the slab becomes very -/// fragmented, this is not expected to introduce more than a reasonable -/// constant-factor penalty compared to a conventional binary heap. -/// -/// The computational penalty is partially offset by the fact that the value -/// never needs to be moved from the moment it is inserted until it is pulled. -/// -/// Note that the `Copy` bound on they keys could be lifted but this would make -/// the implementation slightly less efficient unless `unsafe` is used. +/// The insertion order of equal keys is preserved, with FIFO ordering. pub(crate) struct PriorityQueue where - K: Copy + Clone + Ord, + K: Ord, { - heap: Vec>, - slab: Vec>, - first_free_node: Option, + heap: BinaryHeap>, next_epoch: u64, } @@ -54,81 +62,23 @@ impl PriorityQueue { /// Creates an empty `PriorityQueue`. pub(crate) fn new() -> Self { Self { - heap: Vec::new(), - slab: Vec::new(), - first_free_node: None, + heap: BinaryHeap::new(), next_epoch: 0, } } - /// Creates an empty `PriorityQueue` with at least the specified capacity. - pub(crate) fn with_capacity(capacity: usize) -> Self { - Self { - heap: Vec::with_capacity(capacity), - slab: Vec::with_capacity(capacity), - first_free_node: None, - next_epoch: 0, - } - } - - /// Returns the number of key-value pairs in the priority queue. - pub(crate) fn len(&self) -> usize { - self.heap.len() - } - - /// Inserts a new key-value pair and returns a unique insertion key. + /// Inserts a new key-value pair. /// /// This operation has *O*(log(*N*)) amortized worse-case theoretical /// complexity and *O*(1) amortized theoretical complexity for a /// sufficiently random heap. - pub(crate) fn insert(&mut self, key: K, value: V) -> InsertKey { - // Build a unique key from the user-provided key and a unique epoch. + pub(crate) fn insert(&mut self, key: K, value: V) { + // Build an element from the user-provided key-value and a unique epoch. let epoch = self.next_epoch; assert_ne!(epoch, u64::MAX); self.next_epoch += 1; - let unique_key = UniqueKey { key, epoch }; - - // Add a new node to the slab, either by re-using a free node or by - // appending a new one. - let slab_idx = match self.first_free_node { - Some(idx) => { - self.first_free_node = self.slab[idx].unwrap_next_free_node(); - - self.slab[idx] = Node::HeapNode(HeapNode { - value, - heap_idx: 0, // temporary value overridden in `sift_up` - }); - - idx - } - None => { - let idx = self.slab.len(); - self.slab.push(Node::HeapNode(HeapNode { - value, - heap_idx: 0, // temporary value overridden in `sift_up` - })); - - idx - } - }; - - // Add a new node at the bottom of the heap. - let heap_idx = self.heap.len(); - self.heap.push(Item { - key: unique_key, // temporary value overridden in `sift_up` - slab_idx: 0, // temporary value overridden in `sift_up` - }); - - // Sift up the new node. - self.sift_up( - Item { - key: unique_key, - slab_idx, - }, - heap_idx, - ); - - InsertKey { slab_idx, epoch } + let item = Item { key, value, epoch }; + self.heap.push(item); } /// Pulls the value with the lowest key. @@ -138,26 +88,7 @@ impl PriorityQueue { /// /// This operation has *O*(log(N)) non-amortized theoretical complexity. pub(crate) fn pull(&mut self) -> Option<(K, V)> { - let item = self.heap.first()?; - let top_slab_idx = item.slab_idx; - let key = item.key.key; - - // Free the top node, extracting its value. - let value = mem::replace( - &mut self.slab[top_slab_idx], - Node::FreeNode(FreeNode { - next: self.first_free_node, - }), - ) - .unwrap_value(); - - self.first_free_node = Some(top_slab_idx); - - // Sift the last node at the bottom of the heap from the top of the heap. - let last_item = self.heap.pop().unwrap(); - if last_item.slab_idx != top_slab_idx { - self.sift_down(last_item, 0); - } + let Item { key, value, .. } = self.heap.pop()?; Some((key, value)) } @@ -165,497 +96,48 @@ impl PriorityQueue { /// Peeks a reference to the key-value pair with the lowest key, leaving it /// in the queue. /// - /// If there are several equal lowest keys, a reference to the key-value - /// pair which was inserted first is returned. + /// If there are several equal lowest keys, references to the key-value pair + /// which was inserted first is returned. /// /// This operation has *O*(1) non-amortized theoretical complexity. pub(crate) fn peek(&self) -> Option<(&K, &V)> { - let item = self.heap.first()?; - let top_slab_idx = item.slab_idx; - let key = &item.key.key; - let value = self.slab[top_slab_idx].unwrap_value_ref(); + let Item { + ref key, ref value, .. + } = self.heap.peek()?; Some((key, value)) } - - /// Peeks a reference to the lowest key, leaving it in the queue. - /// - /// If there are several equal lowest keys, a reference to the key which was - /// inserted first is returned. - /// - /// This operation has *O*(1) non-amortized theoretical complexity. - pub(crate) fn peek_key(&self) -> Option<&K> { - let item = self.heap.first()?; - - Some(&item.key.key) - } - - /// Delete the key-value pair associated to the provided insertion key if it - /// is still in the queue. - /// - /// Using an insertion key returned from another `PriorityQueue` is a logic - /// error and could result in the deletion of an arbitrary key-value pair. - /// - /// This method returns `true` if the pair was indeed in the queue and - /// `false` otherwise. - /// - /// This operation has guaranteed *O*(log(*N*)) theoretical complexity. - pub(crate) fn delete(&mut self, insert_key: InsertKey) -> bool { - // Check that (i) there is a node at this index, (ii) this node is in - // the heap and (iii) this node has the correct epoch. - let slab_idx = insert_key.slab_idx; - let heap_idx = if let Some(Node::HeapNode(node)) = self.slab.get(slab_idx) { - let heap_idx = node.heap_idx; - if self.heap[heap_idx].key.epoch != insert_key.epoch { - return false; - } - heap_idx - } else { - return false; - }; - - // If the last item of the heap is not the one to be deleted, sift it up - // or down as appropriate starting from the vacant spot. - let last_item = self.heap.pop().unwrap(); - if let Some(item) = self.heap.get(heap_idx) { - if last_item.key < item.key { - self.sift_up(last_item, heap_idx); - } else { - self.sift_down(last_item, heap_idx); - } - } - - // Free the deleted node in the slab. - self.slab[slab_idx] = Node::FreeNode(FreeNode { - next: self.first_free_node, - }); - self.first_free_node = Some(slab_idx); - - true - } - - /// Take a heap item and, starting at `heap_idx`, move it up the heap while - /// a parent has a larger key. - #[inline] - fn sift_up(&mut self, item: Item, heap_idx: usize) { - let mut child_heap_idx = heap_idx; - let key = &item.key; - - while child_heap_idx != 0 { - let parent_heap_idx = (child_heap_idx - 1) / 2; - - // Stop when the key is larger or equal to the parent's. - if key >= &self.heap[parent_heap_idx].key { - break; - } - - // Move the parent down one level. - self.heap[child_heap_idx] = self.heap[parent_heap_idx]; - let parent_slab_idx = self.heap[parent_heap_idx].slab_idx; - *self.slab[parent_slab_idx].unwrap_heap_index_mut() = child_heap_idx; - - // Stop when the key is larger or equal to the parent's. - if key >= &self.heap[parent_heap_idx].key { - break; - } - // Make the former parent the new child. - child_heap_idx = parent_heap_idx; - } - - // Move the original item to the current child. - self.heap[child_heap_idx] = item; - *self.slab[item.slab_idx].unwrap_heap_index_mut() = child_heap_idx; - } - - /// Take a heap item and, starting at `heap_idx`, move it down the heap - /// while a child has a smaller key. - #[inline] - fn sift_down(&mut self, item: Item, heap_idx: usize) { - let mut parent_heap_idx = heap_idx; - let mut child_heap_idx = 2 * parent_heap_idx + 1; - let key = &item.key; - - while child_heap_idx < self.heap.len() { - // If the sibling exists and has a smaller key, make it the - // candidate for swapping. - if let Some(other_child) = self.heap.get(child_heap_idx + 1) { - child_heap_idx += (self.heap[child_heap_idx].key > other_child.key) as usize; - } - - // Stop when the key is smaller or equal to the child with the smallest key. - if key <= &self.heap[child_heap_idx].key { - break; - } - - // Move the child up one level. - self.heap[parent_heap_idx] = self.heap[child_heap_idx]; - let child_slab_idx = self.heap[child_heap_idx].slab_idx; - *self.slab[child_slab_idx].unwrap_heap_index_mut() = parent_heap_idx; - - // Make the child the new parent. - parent_heap_idx = child_heap_idx; - child_heap_idx = 2 * parent_heap_idx + 1; - } - - // Move the original item to the current parent. - self.heap[parent_heap_idx] = item; - *self.slab[item.slab_idx].unwrap_heap_index_mut() = parent_heap_idx; - } -} - -/// Data related to a single key-value pair stored in the heap. -#[derive(Copy, Clone)] -struct Item { - // A unique key by which the heap is sorted. - key: UniqueKey, - // An index pointing to the corresponding node in the slab. - slab_idx: usize, -} - -/// Data related to a single key-value pair stored in the slab. -enum Node { - FreeNode(FreeNode), - HeapNode(HeapNode), -} - -impl Node { - /// Unwraps the `FreeNode::next` field. - fn unwrap_next_free_node(&self) -> Option { - match self { - Self::FreeNode(n) => n.next, - _ => panic!("the node was expected to be a free node"), - } - } - - /// Unwraps the `HeapNode::value` field. - fn unwrap_value(self) -> V { - match self { - Self::HeapNode(n) => n.value, - _ => panic!("the node was expected to be a heap node"), - } - } - - /// Unwraps the `HeapNode::value` field. - fn unwrap_value_ref(&self) -> &V { - match self { - Self::HeapNode(n) => &n.value, - _ => panic!("the node was expected to be a heap node"), - } - } - - /// Unwraps a mutable reference to the `HeapNode::heap_idx` field. - fn unwrap_heap_index_mut(&mut self) -> &mut usize { - match self { - Self::HeapNode(n) => &mut n.heap_idx, - _ => panic!("the node was expected to be a heap node"), - } - } -} - -/// A node that is no longer in the binary heap. -struct FreeNode { - // An index pointing to the next free node, if any. - next: Option, -} - -/// A node currently in the binary heap. -struct HeapNode { - // The value associated to this node. - value: V, - // Index of the node in the heap. - heap_idx: usize, -} - -/// A unique insertion key that can be used for key-value pair deletion. -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] -pub(crate) struct InsertKey { - // An index pointing to a node in the slab. - slab_idx: usize, - // The epoch when the node was inserted. - epoch: u64, -} - -/// A unique key made of the user-provided key complemented by a unique epoch. -/// -/// Implementation note: `UniqueKey` automatically derives `PartialOrd`, which -/// implies that lexicographic order between `key` and `epoch` must be preserved -/// to make sure that `key` has a higher sorting priority than `epoch`. -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] -struct UniqueKey { - /// The user-provided key. - key: K, - /// A unique epoch that indicates the insertion date. - epoch: u64, } #[cfg(all(test, not(asynchronix_loom)))] mod tests { - use std::fmt::Debug; - use super::*; - enum Op { - Insert(K, V), - InsertAndMark(K, V), - Pull(Option<(K, V)>), - DeleteMarked(bool), - } - - fn check( - operations: impl Iterator>, - ) { - let mut queue = PriorityQueue::new(); - let mut marked = None; - - for op in operations { - match op { - Op::Insert(key, value) => { - queue.insert(key, value); - } - Op::InsertAndMark(key, value) => { - marked = Some(queue.insert(key, value)); - } - Op::Pull(kv) => { - assert_eq!(queue.pull(), kv); - } - Op::DeleteMarked(success) => { - assert_eq!( - queue.delete(marked.take().expect("no item was marked for deletion")), - success - ) - } - } - } - } - #[test] - fn priority_queue_smoke() { - let operations = [ - Op::Insert(5, 'a'), - Op::Insert(2, 'b'), - Op::Insert(3, 'c'), - Op::Insert(4, 'd'), - Op::Insert(9, 'e'), - Op::Insert(1, 'f'), - Op::Insert(8, 'g'), - Op::Insert(0, 'h'), - Op::Insert(7, 'i'), - Op::Insert(6, 'j'), - Op::Pull(Some((0, 'h'))), - Op::Pull(Some((1, 'f'))), - Op::Pull(Some((2, 'b'))), - Op::Pull(Some((3, 'c'))), - Op::Pull(Some((4, 'd'))), - Op::Pull(Some((5, 'a'))), - Op::Pull(Some((6, 'j'))), - Op::Pull(Some((7, 'i'))), - Op::Pull(Some((8, 'g'))), - Op::Pull(Some((9, 'e'))), - ]; + fn priority_smoke() { + let mut q = PriorityQueue::new(); - check(operations.into_iter()); - } + q.insert(5, 'e'); + q.insert(2, 'y'); + q.insert(1, 'a'); + q.insert(3, 'c'); + q.insert(2, 'z'); + q.insert(4, 'd'); + q.insert(2, 'x'); - #[test] - fn priority_queue_interleaved() { - let operations = [ - Op::Insert(2, 'a'), - Op::Insert(7, 'b'), - Op::Insert(5, 'c'), - Op::Pull(Some((2, 'a'))), - Op::Insert(4, 'd'), - Op::Pull(Some((4, 'd'))), - Op::Insert(8, 'e'), - Op::Insert(2, 'f'), - Op::Pull(Some((2, 'f'))), - Op::Pull(Some((5, 'c'))), - Op::Pull(Some((7, 'b'))), - Op::Insert(5, 'g'), - Op::Insert(3, 'h'), - Op::Pull(Some((3, 'h'))), - Op::Pull(Some((5, 'g'))), - Op::Pull(Some((8, 'e'))), - Op::Pull(None), - ]; - - check(operations.into_iter()); - } - - #[test] - fn priority_queue_equal_keys() { - let operations = [ - Op::Insert(4, 'a'), - Op::Insert(1, 'b'), - Op::Insert(3, 'c'), - Op::Pull(Some((1, 'b'))), - Op::Insert(4, 'd'), - Op::Insert(8, 'e'), - Op::Insert(3, 'f'), - Op::Pull(Some((3, 'c'))), - Op::Pull(Some((3, 'f'))), - Op::Pull(Some((4, 'a'))), - Op::Insert(8, 'g'), - Op::Pull(Some((4, 'd'))), - Op::Pull(Some((8, 'e'))), - Op::Pull(Some((8, 'g'))), - Op::Pull(None), - ]; - - check(operations.into_iter()); - } - - #[test] - fn priority_queue_delete_valid() { - let operations = [ - Op::Insert(8, 'a'), - Op::Insert(1, 'b'), - Op::Insert(3, 'c'), - Op::InsertAndMark(3, 'd'), - Op::Insert(2, 'e'), - Op::Pull(Some((1, 'b'))), - Op::Insert(4, 'f'), - Op::DeleteMarked(true), - Op::Insert(5, 'g'), - Op::Pull(Some((2, 'e'))), - Op::Pull(Some((3, 'c'))), - Op::Pull(Some((4, 'f'))), - Op::Pull(Some((5, 'g'))), - Op::Pull(Some((8, 'a'))), - Op::Pull(None), - ]; - - check(operations.into_iter()); - } - - #[test] - fn priority_queue_delete_invalid() { - let operations = [ - Op::Insert(0, 'a'), - Op::Insert(7, 'b'), - Op::InsertAndMark(2, 'c'), - Op::Insert(4, 'd'), - Op::Pull(Some((0, 'a'))), - Op::Insert(2, 'e'), - Op::Pull(Some((2, 'c'))), - Op::Insert(4, 'f'), - Op::DeleteMarked(false), - Op::Pull(Some((2, 'e'))), - Op::Pull(Some((4, 'd'))), - Op::Pull(Some((4, 'f'))), - Op::Pull(Some((7, 'b'))), - Op::Pull(None), - ]; - - check(operations.into_iter()); - } - - #[test] - fn priority_queue_fuzz() { - use std::cell::Cell; - use std::collections::BTreeMap; - - use crate::util::rng::Rng; - - // Number of fuzzing operations. - const ITER: usize = if cfg!(miri) { 1000 } else { 10_000_000 }; - - // Inclusive upper bound for randomly generated keys. - const MAX_KEY: u64 = 99; - - // Probabilistic weight of each of the 4 operations. - // - // The weight for pull values should probably stay close to the sum of - // the two insertion weights to prevent queue size runaway. - const INSERT_WEIGHT: u64 = 5; - const INSERT_AND_MARK_WEIGHT: u64 = 1; - const PULL_WEIGHT: u64 = INSERT_WEIGHT + INSERT_AND_MARK_WEIGHT; - const DELETE_MARKED_WEIGHT: u64 = 1; - - // Defines 4 basic operations on the priority queue, each of them being - // performed on both the tested implementation and on a shadow queue - // implemented with a `BTreeMap`. Any mismatch between the outcomes of - // pull and delete operations between the two queues triggers a panic. - let epoch: Cell = Cell::new(0); - let marked: Cell> = Cell::new(None); - let shadow_marked: Cell> = Cell::new(None); - - let insert_fn = |queue: &mut PriorityQueue, - shadow_queue: &mut BTreeMap<(u64, usize), u64>, - key, - value| { - queue.insert(key, value); - shadow_queue.insert((key, epoch.get()), value); - epoch.set(epoch.get() + 1); - }; - - let insert_and_mark_fn = |queue: &mut PriorityQueue, - shadow_queue: &mut BTreeMap<(u64, usize), u64>, - key, - value| { - marked.set(Some(queue.insert(key, value))); - shadow_queue.insert((key, epoch.get()), value); - shadow_marked.set(Some((key, epoch.get()))); - epoch.set(epoch.get() + 1); - }; - - let pull_fn = |queue: &mut PriorityQueue, - shadow_queue: &mut BTreeMap<(u64, usize), u64>| { - let value = queue.pull(); - let shadow_value = match shadow_queue.iter().next() { - Some((&unique_key, &value)) => { - shadow_queue.remove(&unique_key); - Some((unique_key.0, value)) - } - None => None, - }; - assert_eq!(value, shadow_value); - }; - - let delete_marked_fn = - |queue: &mut PriorityQueue, - shadow_queue: &mut BTreeMap<(u64, usize), u64>| { - let success = match marked.take() { - Some(delete_key) => Some(queue.delete(delete_key)), - None => None, - }; - let shadow_success = match shadow_marked.take() { - Some(delete_key) => Some(shadow_queue.remove(&delete_key).is_some()), - None => None, - }; - assert_eq!(success, shadow_success); - }; - - // Fuzz away. - let mut queue = PriorityQueue::new(); - let mut shadow_queue = BTreeMap::new(); - - let rng = Rng::new(12345); - const TOTAL_WEIGHT: u64 = - INSERT_WEIGHT + INSERT_AND_MARK_WEIGHT + PULL_WEIGHT + DELETE_MARKED_WEIGHT; - - for _ in 0..ITER { - // Randomly choose one of the 4 possible operations, respecting the - // probability weights. - let mut op = rng.gen_bounded(TOTAL_WEIGHT); - if op < INSERT_WEIGHT { - let key = rng.gen_bounded(MAX_KEY + 1); - let val = rng.gen(); - insert_fn(&mut queue, &mut shadow_queue, key, val); - continue; - } - op -= INSERT_WEIGHT; - if op < INSERT_AND_MARK_WEIGHT { - let key = rng.gen_bounded(MAX_KEY + 1); - let val = rng.gen(); - insert_and_mark_fn(&mut queue, &mut shadow_queue, key, val); - continue; - } - op -= INSERT_AND_MARK_WEIGHT; - if op < PULL_WEIGHT { - pull_fn(&mut queue, &mut shadow_queue); - continue; - } - delete_marked_fn(&mut queue, &mut shadow_queue); - } + assert_eq!(q.peek(), Some((&1, &'a'))); + assert_eq!(q.pull(), Some((1, 'a'))); + assert_eq!(q.peek(), Some((&2, &'y'))); + assert_eq!(q.pull(), Some((2, 'y'))); + assert_eq!(q.peek(), Some((&2, &'z'))); + assert_eq!(q.pull(), Some((2, 'z'))); + assert_eq!(q.peek(), Some((&2, &'x'))); + assert_eq!(q.pull(), Some((2, 'x'))); + assert_eq!(q.peek(), Some((&3, &'c'))); + assert_eq!(q.pull(), Some((3, 'c'))); + assert_eq!(q.peek(), Some((&4, &'d'))); + assert_eq!(q.pull(), Some((4, 'd'))); + assert_eq!(q.peek(), Some((&5, &'e'))); + assert_eq!(q.pull(), Some((5, 'e'))); } }