diff --git a/README.md b/README.md index 9eb774b..945a9ea 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ pub struct DelayedMultiplier { impl DelayedMultiplier { pub fn input(&mut self, value: f64, scheduler: &Scheduler) { scheduler - .schedule_in(Duration::from_secs(1), Self::send, 2.0 * value) + .schedule_event_in(Duration::from_secs(1), Self::send, 2.0 * value) .unwrap(); } async fn send(&mut self, value: f64) { diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 981228a..bd39917 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -31,6 +31,7 @@ diatomic-waker = "0.1" futures-task = "0.3" multishot = "0.3" num_cpus = "1.13" +pin-project-lite = "0.2" recycle-box = "0.2" slab = "0.4" st3 = "0.4" diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index b504632..30a3fb4 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -3,7 +3,7 @@ //! This example demonstrates in particular: //! //! * non-trivial state machines, -//! * cancellation of calls scheduled at the current time step using epochs, +//! * cancellation of events, //! * model initialization, //! * simulation monitoring with event slots. //! @@ -37,7 +37,7 @@ use std::time::Duration; use asynchronix::model::{InitializedModel, Model, Output}; use asynchronix::simulation::{Mailbox, SimInit}; -use asynchronix::time::{MonotonicTime, Scheduler, SchedulerKey}; +use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; /// Water pump. pub struct Pump { @@ -79,12 +79,9 @@ pub struct Controller { brew_time: Duration, /// Current water sense state. water_sense: WaterSenseState, - /// Scheduler key, which if present indicates that the machine is current + /// Event key, which if present indicates that the machine is currently /// brewing -- internal state. - stop_brew_key: Option, - /// An epoch incremented when the scheduled 'stop_brew` callback must be - /// ignored -- internal state. - stop_brew_epoch: u64, + stop_brew_key: Option, } impl Controller { @@ -98,22 +95,16 @@ impl Controller { pump_cmd: Output::default(), stop_brew_key: None, water_sense: WaterSenseState::Empty, // will be overridden during init - stop_brew_epoch: 0, } } /// Signals a change in the water sensing state -- input port. - pub async fn water_sense(&mut self, state: WaterSenseState, scheduler: &Scheduler) { + pub async fn water_sense(&mut self, state: WaterSenseState) { // Check if the tank just got empty. if state == WaterSenseState::Empty && self.water_sense == WaterSenseState::NotEmpty { // If a brew was ongoing, we must cancel it. if let Some(key) = self.stop_brew_key.take() { - // Try to abort the scheduled call to `stop_brew()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(key).is_err() { - self.stop_brew_epoch = self.stop_brew_epoch.wrapping_add(1); - }; - + key.cancel_event(); self.pump_cmd.send(PumpCommand::Off).await; } } @@ -136,11 +127,8 @@ impl Controller { if let Some(key) = self.stop_brew_key.take() { self.pump_cmd.send(PumpCommand::Off).await; - // Try to abort the scheduled call to `stop_brew()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(key).is_err() { - self.stop_brew_epoch = self.stop_brew_epoch.wrapping_add(1); - }; + // Abort the scheduled call to `stop_brew()`. + key.cancel_event(); return; } @@ -153,19 +141,14 @@ impl Controller { // Schedule the `stop_brew()` method and turn on the pump. self.stop_brew_key = Some( scheduler - .schedule_in(self.brew_time, Self::stop_brew, self.stop_brew_epoch) + .schedule_keyed_event_in(self.brew_time, Self::stop_brew, ()) .unwrap(), ); self.pump_cmd.send(PumpCommand::On).await; } /// Stops brewing. - async fn stop_brew(&mut self, epoch: u64) { - // Ignore this call if the epoch has been incremented. - if self.stop_brew_epoch != epoch { - return; - } - + async fn stop_brew(&mut self) { if self.stop_brew_key.take().is_some() { self.pump_cmd.send(PumpCommand::Off).await; } @@ -190,9 +173,6 @@ pub struct Tank { volume: f64, /// State that exists when the mass flow rate is non-zero -- internal state. dynamic_state: Option, - /// An epoch incremented when the pending call to `set_empty()` must be - /// ignored -- internal state. - set_empty_epoch: u64, } impl Tank { /// Creates a new tank with the specified amount of water [m³]. @@ -204,7 +184,6 @@ impl Tank { Self { volume: water_volume, dynamic_state: None, - set_empty_epoch: 0, water_sense: Output::default(), } } @@ -224,11 +203,8 @@ impl Tank { // If the current flow rate is non-zero, compute the current volume and // schedule a new update. if let Some(state) = self.dynamic_state.take() { - // Try to abort the scheduled call to `set_empty()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(state.set_empty_key).is_err() { - self.set_empty_epoch = self.set_empty_epoch.wrapping_add(1); - } + // Abort the scheduled call to `set_empty()`. + state.set_empty_key.cancel_event(); // Update the volume, saturating at 0 in case of rounding errors. let time = scheduler.time(); @@ -260,11 +236,8 @@ impl Tank { // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { - // Try to abort the scheduled call to `set_empty()`. If this will - // fails, increment the epoch so that the call is ignored. - if scheduler.cancel(state.set_empty_key).is_err() { - self.set_empty_epoch = self.set_empty_epoch.wrapping_add(1); - } + // Abort the scheduled call to `set_empty()`. + state.set_empty_key.cancel_event(); // Update the volume, saturating at 0 in case of rounding errors. let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); @@ -301,7 +274,7 @@ impl Tank { let duration_until_empty = Duration::from_secs_f64(duration_until_empty); // Schedule the next update. - match scheduler.schedule_in(duration_until_empty, Self::set_empty, self.set_empty_epoch) { + match scheduler.schedule_keyed_event_in(duration_until_empty, Self::set_empty, ()) { Ok(set_empty_key) => { let state = TankDynamicState { last_volume_update: time, @@ -319,12 +292,7 @@ impl Tank { } /// Updates the state of the tank to indicate that there is no more water. - async fn set_empty(&mut self, epoch: u64) { - // Ignore this call if the epoch has been incremented. - if epoch != self.set_empty_epoch { - return; - } - + async fn set_empty(&mut self) { self.volume = 0.0; self.dynamic_state = None; self.water_sense.send(WaterSenseState::Empty).await; @@ -355,7 +323,7 @@ impl Model for Tank { /// is non-zero. struct TankDynamicState { last_volume_update: MonotonicTime, - set_empty_key: SchedulerKey, + set_empty_key: EventKey, flow_rate: f64, } @@ -429,7 +397,7 @@ fn main() { // Drink too much coffee. let volume_per_shot = pump_flow_rate * Controller::DEFAULT_BREW_TIME.as_secs_f64(); - let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who care about floating-point rounding errors? + let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who cares about floating-point rounding errors? for _ in 0..(shots_per_tank - 1) { simu.send_event(Controller::brew_cmd, (), &controller_addr); assert_eq!(flow_rate.take(), Some(pump_flow_rate)); @@ -463,7 +431,7 @@ fn main() { assert_eq!(flow_rate.take(), Some(0.0)); // Interrupt the brew after 15s by pressing again the brew button. - simu.schedule_in( + simu.schedule_event_in( Duration::from_secs(15), Controller::brew_cmd, (), diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 216197d..a232d88 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -174,7 +174,7 @@ impl Driver { // Schedule the next pulse. scheduler - .schedule_in(pulse_duration, Self::send_pulse, ()) + .schedule_event_in(pulse_duration, Self::send_pulse, ()) .unwrap(); } } @@ -224,7 +224,7 @@ fn main() { assert!(position.next().is_none()); // Start the motor in 2s with a PPS of 10Hz. - simu.schedule_in( + simu.schedule_event_in( Duration::from_secs(2), Driver::pulse_rate, 10.0, diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index dffe725..bd88eef 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -113,7 +113,7 @@ //! } //! impl Delay { //! pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap(); +//! scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap(); //! } //! //! async fn send(&mut self, value: f64) { @@ -184,7 +184,7 @@ //! # } //! # impl Delay { //! # pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! # scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap(); +//! # scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -242,7 +242,7 @@ //! [`Simulation::send_event()`](simulation::Simulation::send_event) or //! [`Simulation::send_query()`](simulation::Simulation::send_query), //! 3. by scheduling events, using for instance -//! [`Simulation::schedule_in()`](simulation::Simulation::schedule_in). +//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in). //! //! Simulation outputs can be monitored using //! [`EventSlot`](simulation::EventSlot)s and @@ -275,7 +275,7 @@ //! # } //! # impl Delay { //! # pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! # scheduler.schedule_in(Duration::from_secs(1), Self::send, value).unwrap(); +//! # scheduler.schedule_event_in(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -354,11 +354,12 @@ //! //! The first guarantee (and only the first) also extends to events scheduled //! from a simulation with -//! [`Simulation::schedule_in()`](simulation::Simulation::schedule_in) or -//! [`Simulation::schedule_at()`](simulation::Simulation::schedule_at): if the -//! scheduler contains several events to be delivered at the same time to the -//! same model, these events will always be processed in the order in which they -//! were scheduled. +//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in) +//! or +//! [`Simulation::schedule_event_at()`](simulation::Simulation::schedule_event_at): +//! if the scheduler contains several events to be delivered at the same time to +//! the same model, these events will always be processed in the order in which +//! they were scheduled. //! //! [actor_model]: https://en.wikipedia.org/wiki/Actor_model //! [pony]: https://www.ponylang.io/ diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index e63488f..8c5b957 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -73,7 +73,7 @@ //! //! At the moment, Asynchronix is unfortunately not able to discriminate between //! such pathological deadlocks and the "expected" deadlock that occurs when all -//! tasks in a given time slice have completed and all models are starved on an +//! events in a given time slice have completed and all models are starved on an //! empty mailbox. Consequently, blocking method such as [`SimInit::init()`], //! [`Simulation::step()`], [`Simulation::send_event()`], etc., will return //! without error after a pathological deadlock, leaving the user responsible @@ -91,8 +91,8 @@ //! There is actually a very simple solution to this problem: since the //! [`InputFn`](crate::model::InputFn) trait also matches closures of type //! `FnOnce(&mut impl Model)`, it is enough to invoke -//! [`Simulation::send_event()`] with a closure that connects or disconnects -//! a port, such as: +//! [`Simulation::send_event()`] with a closure that connects or disconnects a +//! port, such as: //! //! ``` //! # use asynchronix::model::{Model, Output}; @@ -129,15 +129,15 @@ pub use sim_init::SimInit; use std::error::Error; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; use crate::executor::Executor; use crate::model::{InputFn, Model, ReplierFn}; -use crate::time::{self, CancellationError, MonotonicTime, TearableAtomicTime}; -use crate::time::{ScheduledTimeError, SchedulerKey, SchedulerQueue}; +use crate::time::{self, MonotonicTime, TearableAtomicTime}; +use crate::time::{EventKey, ScheduledTimeError, SchedulerQueue}; use crate::util::futures::SeqFuture; use crate::util::slot; use crate::util::sync_cell::SyncCell; @@ -164,9 +164,9 @@ use crate::util::sync_cell::SyncCell; /// the case of queries, the response is returned. /// /// Events can also be scheduled at a future simulation time using -/// [`schedule_in()`](Simulation::schedule_in) or -/// [`schedule_at()`](Simulation::schedule_at). These methods queue an event -/// without blocking. +/// [`schedule_event_in()`](Simulation::schedule_event_in) or +/// [`schedule_event_at()`](Simulation::schedule_event_at). These methods queue +/// an event without blocking. /// /// Finally, the [`Simulation`] instance manages simulation time. Calling /// [`step()`](Simulation::step) will increment simulation time until that of @@ -201,20 +201,20 @@ impl Simulation { self.time.read() } - /// Advances simulation time to that of the next scheduled task, processing - /// that task as well as all other tasks scheduled for the same time. + /// Advances simulation time to that of the next scheduled event, processing + /// that event as well as all other event scheduled for the same time. /// /// This method may block. Once it returns, it is guaranteed that all newly - /// processed tasks (if any) have completed. + /// processed event (if any) have completed. pub fn step(&mut self) { self.step_to_next_bounded(MonotonicTime::MAX); } /// Iteratively advances the simulation time by the specified duration and - /// processes all tasks scheduled up to the target time. + /// processes all events scheduled up to the target time. /// /// This method may block. Once it returns, it is guaranteed that (i) all - /// tasks scheduled up to the specified target time have completed and (ii) + /// events scheduled up to the specified target time have completed and (ii) /// the final simulation time has been incremented by the specified /// duration. pub fn step_by(&mut self, duration: Duration) { @@ -223,11 +223,11 @@ impl Simulation { self.step_until_unchecked(target_time); } - /// Iteratively advances the simulation time and processes all tasks + /// Iteratively advances the simulation time and processes all events /// scheduled up to the specified target time. /// /// This method may block. Once it returns, it is guaranteed that (i) all - /// tasks scheduled up to the specified target time have completed and (ii) + /// events scheduled up to the specified target time have completed and (ii) /// the final simulation time matches the target time. pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ScheduledTimeError<()>> { if self.time.read() >= target_time { @@ -244,13 +244,13 @@ impl Simulation { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_in( + pub fn schedule_event_in( &mut self, duration: Duration, func: F, arg: T, address: impl Into>, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -261,7 +261,36 @@ impl Simulation { } let time = self.time.read() + duration; - let schedule_key = time::schedule_event_at_unchecked( + time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules an event at the lapse of the specified duration and returns an + /// event key. + /// + /// An error is returned if the specified duration is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + pub fn schedule_keyed_event_in( + &mut self, + duration: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if duration.is_zero() { + return Err(ScheduledTimeError(arg)); + } + let time = self.time.read() + duration; + + let event_key = time::schedule_keyed_event_at_unchecked( time, func, arg, @@ -269,7 +298,7 @@ impl Simulation { &self.scheduler_queue, ); - Ok(schedule_key) + Ok(event_key) } /// Schedules an event at a future time. @@ -279,13 +308,13 @@ impl Simulation { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_at( + pub fn schedule_event_at( &mut self, time: MonotonicTime, func: F, arg: T, address: impl Into>, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -294,7 +323,34 @@ impl Simulation { if self.time.read() >= time { return Err(ScheduledTimeError(arg)); } - let schedule_key = time::schedule_event_at_unchecked( + time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules an event at a future time and returns an event key. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + pub fn schedule_keyed_event_at( + &mut self, + time: MonotonicTime, + func: F, + arg: T, + address: impl Into>, + ) -> Result> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if self.time.read() >= time { + return Err(ScheduledTimeError(arg)); + } + let event_key = time::schedule_keyed_event_at_unchecked( time, func, arg, @@ -302,16 +358,7 @@ impl Simulation { &self.scheduler_queue, ); - Ok(schedule_key) - } - - /// Cancels an event with a scheduled time in the future of the current - /// simulation time. - /// - /// If the corresponding event was already executed, or if it is scheduled - /// for the current simulation time, an error is returned. - pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> { - time::cancel_scheduled(scheduler_key, &self.scheduler_queue) + Ok(event_key) } /// Sends and processes an event, blocking until completion. @@ -387,72 +434,84 @@ impl Simulation { reply_reader.try_read().map_err(|_| QueryError {}) } - /// Advances simulation time to that of the next scheduled task if its + /// Advances simulation time to that of the next scheduled event if its /// scheduling time does not exceed the specified bound, processing that - /// task as well as all other tasks scheduled for the same time. + /// event as well as all other events scheduled for the same time. /// - /// If at least one task was found that satisfied the time bound, the + /// If at least one event was found that satisfied the time bound, the /// corresponding new simulation time is returned. fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - let mut current_key = match scheduler_queue.peek_key() { - Some(&k) if k.0 <= upper_time_bound => k, - _ => return None, + // Closure returning the next key which time stamp is no older than the + // upper bound, if any. Cancelled events are discarded. + let get_next_key = |scheduler_queue: &mut MutexGuard| { + loop { + match scheduler_queue.peek() { + Some((&k, t)) if k.0 <= upper_time_bound => { + if !t.is_cancelled() { + break Some(k); + } + // Discard cancelled events. + scheduler_queue.pull(); + } + _ => break None, + } + } }; - // Set the simulation time to that of the next scheduled task + // Move to the next scheduled time. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let mut current_key = get_next_key(&mut scheduler_queue)?; self.time.write(current_key.0); loop { - let task = scheduler_queue.pull().unwrap().1; + let event = scheduler_queue.pull().unwrap().1; - let mut next_key = scheduler_queue.peek_key(); - if next_key != Some(¤t_key) { - // Since there are no other tasks targeting the same mailbox - // and the same time, the task is spawned immediately. - self.executor.spawn_and_forget(Box::into_pin(task)); + let mut next_key = get_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { + // Since there are no other events targeting the same mailbox + // and the same time, the event is spawned immediately. + self.executor.spawn_and_forget(Box::into_pin(event)); } else { // To ensure that their relative order of execution is - // preserved, all tasks targeting the same mailbox are - // concatenated into a single future. - let mut task_sequence = SeqFuture::new(); + // preserved, all event targeting the same mailbox are executed + // sequentially within a single compound future. + let mut event_sequence = SeqFuture::new(); - task_sequence.push(Box::into_pin(task)); + event_sequence.push(Box::into_pin(event)); loop { - let task = scheduler_queue.pull().unwrap().1; - task_sequence.push(Box::into_pin(task)); - next_key = scheduler_queue.peek_key(); - if next_key != Some(¤t_key) { + let event = scheduler_queue.pull().unwrap().1; + event_sequence.push(Box::into_pin(event)); + next_key = get_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { break; } } - // Spawn a parent task that sequentially polls all sub-tasks. - self.executor.spawn_and_forget(task_sequence); + // Spawn a parent event that sequentially polls all events + // targeting the same mailbox. + self.executor.spawn_and_forget(event_sequence); } - match next_key { - // If the next task is scheduled at the same time, update the key and continue. - Some(k) if k.0 == current_key.0 => { - current_key = *k; - } - // Otherwise wait until all tasks have completed and return. + current_key = match next_key { + // If the next event is scheduled at the same time, update the + // key and continue. + Some(k) if k.0 == current_key.0 => k, + // Otherwise wait until all events have completed and return. _ => { - drop(scheduler_queue); // make sure the queue's mutex is unlocked. + drop(scheduler_queue); // make sure the queue's mutex is released. self.executor.run(); return Some(current_key.0); } - } + }; } } - /// Iteratively advances simulation time and processes all tasks scheduled + /// Iteratively advances simulation time and processes all events scheduled /// up to the specified target time. /// - /// Once the method returns it is guaranteed that (i) all tasks scheduled up - /// to the specified target time have completed and (ii) the final + /// Once the method returns it is guaranteed that (i) all events scheduled + /// up to the specified target time have completed and (ii) the final /// simulation time matches the target time. /// /// This method does not check whether the specified time lies in the future @@ -462,7 +521,7 @@ impl Simulation { match self.step_to_next_bounded(target_time) { // The target time was reached exactly. Some(t) if t == target_time => return, - // No tasks are scheduled before or at the target time. + // No events are scheduled before or at the target time. None => { // Update the simulation time. self.time.write(target_time); diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index 19497b8..56dbb58 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -31,7 +31,7 @@ //! //! // Sets an alarm [input port]. //! pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { -//! if scheduler.schedule_at(setting, Self::ring, ()).is_err() { +//! if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() { //! println!("The alarm clock can only be set for a future time"); //! } //! } @@ -50,5 +50,7 @@ mod scheduler; pub(crate) use monotonic_time::TearableAtomicTime; pub use monotonic_time::{MonotonicTime, SystemTimeError}; -pub(crate) use scheduler::{cancel_scheduled, schedule_event_at_unchecked, SchedulerQueue}; -pub use scheduler::{CancellationError, ScheduledTimeError, Scheduler, SchedulerKey}; +pub(crate) use scheduler::{ + schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, SchedulerQueue, +}; +pub use scheduler::{EventKey, ScheduledTimeError, Scheduler}; diff --git a/asynchronix/src/time/scheduler.rs b/asynchronix/src/time/scheduler.rs index 142fcf1..99ee365 100644 --- a/asynchronix/src/time/scheduler.rs +++ b/asynchronix/src/time/scheduler.rs @@ -3,20 +3,23 @@ use std::error::Error; use std::fmt; use std::future::Future; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use std::time::Duration; +use pin_project_lite::pin_project; use recycle_box::{coerce_box, RecycleBox}; use crate::channel::{ChannelId, Sender}; use crate::model::{InputFn, Model}; use crate::time::{MonotonicTime, TearableAtomicTime}; -use crate::util::priority_queue::{self, PriorityQueue}; +use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCellReader; /// Shorthand for the scheduler queue type. pub(crate) type SchedulerQueue = - PriorityQueue<(MonotonicTime, ChannelId), Box + Send>>; + PriorityQueue<(MonotonicTime, ChannelId), Box + Send>>; /// A local scheduler for models. /// @@ -65,8 +68,8 @@ pub(crate) type SchedulerQueue = /// let greeting = format!("Hello, this message was scheduled at: /// {:?}.", time); /// -/// if let Err(err) = scheduler.schedule_in(delay, Self::send_msg, greeting) { -/// // ^^^^^^^^ scheduled method +/// if let Err(err) = scheduler.schedule_event_in(delay, Self::send_msg, greeting) { +/// // ^^^^^^^^ scheduled method /// // The duration was zero, so greet right away. /// let greeting = err.0; /// self.msg_out.send(greeting).await; @@ -144,19 +147,19 @@ impl Scheduler { /// /// // Schedule this method again in 1s with an incremented counter. /// scheduler - /// .schedule_in(Duration::from_secs(1), Self::trigger, counter + 1) + /// .schedule_event_in(Duration::from_secs(1), Self::trigger, counter + 1) /// .unwrap(); /// } /// } /// /// impl Model for PeriodicLogger {} /// ``` - pub fn schedule_in( + pub fn schedule_event_in( &self, duration: Duration, func: F, arg: T, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, @@ -166,10 +169,70 @@ impl Scheduler { } let time = self.time() + duration; let sender = self.sender.clone(); - let schedule_key = - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - Ok(schedule_key) + Ok(()) + } + + /// Schedules an event at the lapse of the specified duration and returns an + /// event key. + /// + /// An error is returned if the specified duration is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use std::future::Future; + /// use asynchronix::model::Model; + /// use asynchronix::time::{EventKey, Scheduler}; + /// + /// // A model that logs the value of a counter every second after being + /// // triggered the first time until logging is stopped. + /// pub struct PeriodicLogger { + /// event_key: Option + /// } + /// + /// impl PeriodicLogger { + /// // Triggers the logging of a timestamp every second [input port]. + /// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler) { + /// self.stop(); + /// println!("counter: {}", counter); + /// + /// // Schedule this method again in 1s with an incremented counter. + /// let event_key = scheduler + /// .schedule_keyed_event_in(Duration::from_secs(1), Self::trigger, counter + 1) + /// .unwrap(); + /// self.event_key = Some(event_key); + /// } + /// + /// // Cancels the logging of timestamps. + /// pub fn stop(&mut self) { + /// self.event_key.take().map(|k| k.cancel_event()); + /// } + /// } + /// + /// impl Model for PeriodicLogger {} + /// ``` + pub fn schedule_keyed_event_in( + &self, + duration: Duration, + func: F, + arg: T, + ) -> Result> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if duration.is_zero() { + return Err(ScheduledTimeError(arg)); + } + let time = self.time() + duration; + let sender = self.sender.clone(); + let event_key = + schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(event_key) } /// Schedules an event at a future time. @@ -183,7 +246,7 @@ impl Scheduler { /// use asynchronix::model::Model; /// use asynchronix::time::{MonotonicTime, Scheduler}; /// - /// // An alarm clock model. + /// // An alarm clock. /// pub struct AlarmClock { /// msg: String /// } @@ -196,7 +259,7 @@ impl Scheduler { /// /// // Sets an alarm [input port]. /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { - /// if scheduler.schedule_at(setting, Self::ring, ()).is_err() { + /// if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() { /// println!("The alarm clock can only be set for a future time"); /// } /// } @@ -209,12 +272,12 @@ impl Scheduler { /// /// impl Model for AlarmClock {} /// ``` - pub fn schedule_at( + pub fn schedule_event_at( &self, time: MonotonicTime, func: F, arg: T, - ) -> Result> + ) -> Result<(), ScheduledTimeError> where F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, @@ -223,20 +286,77 @@ impl Scheduler { return Err(ScheduledTimeError(arg)); } let sender = self.sender.clone(); - let schedule_key = - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - Ok(schedule_key) + Ok(()) } - /// Cancels an event with a scheduled time in the future of the current - /// simulation time. + /// Schedules an event at a future time and returns an event key. /// - /// If the corresponding event was already executed, or if it is scheduled - /// for the current simulation time but was not yet executed, an error is - /// returned. - pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> { - cancel_scheduled(scheduler_key, &self.scheduler_queue) + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::Model; + /// use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; + /// + /// // An alarm clock that can be cancelled. + /// pub struct AlarmClock { + /// msg: String, + /// event_key: Option, + /// } + /// + /// impl AlarmClock { + /// // Creates a new alarm clock. + /// pub fn new(msg: String) -> Self { + /// Self { + /// msg, + /// event_key: None + /// } + /// } + /// + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { + /// self.cancel(); + /// match scheduler.schedule_keyed_event_at(setting, Self::ring, ()) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels the current alarm, if any. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel_event()); + /// } + /// + /// // Rings the alarm [private input port]. + /// fn ring(&mut self) { + /// println!("{}", self.msg); + /// } + /// } + /// + /// impl Model for AlarmClock {} + /// ``` + pub fn schedule_keyed_event_at( + &self, + time: MonotonicTime, + func: F, + arg: T, + ) -> Result> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + { + if self.time() >= time { + return Err(ScheduledTimeError(arg)); + } + let sender = self.sender.clone(); + let event_key = + schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(event_key) } } @@ -246,15 +366,61 @@ impl fmt::Debug for Scheduler { } } -/// Unique identifier for a scheduled event. +/// Handle to a scheduled event. /// -/// A `SchedulerKey` can be used to cancel a future event. -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] -pub struct SchedulerKey(priority_queue::InsertKey); +/// An `EventKey` can be used to cancel a future event. +#[derive(Clone, Debug)] +pub struct EventKey { + state: Arc, +} -impl SchedulerKey { - pub(crate) fn new(key: priority_queue::InsertKey) -> Self { - Self(key) +impl EventKey { + const IS_PENDING: usize = 0; + const IS_CANCELLED: usize = 1; + const IS_PROCESSED: usize = 2; + + /// Creates a key for a pending event. + pub(crate) fn new() -> Self { + Self { + state: Arc::new(AtomicUsize::new(Self::IS_PENDING)), + } + } + + /// Checks whether the event was cancelled. + pub(crate) fn event_is_cancelled(&self) -> bool { + self.state.load(Ordering::Relaxed) == Self::IS_CANCELLED + } + + /// Marks the event as processed. + /// + /// If the event cannot be processed because it was cancelled, `false` is + /// returned. + pub(crate) fn process_event(self) -> bool { + match self.state.compare_exchange( + Self::IS_PENDING, + Self::IS_PROCESSED, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => true, + Err(s) => s == Self::IS_PROCESSED, + } + } + + /// Cancels the associated event if possible. + /// + /// If the event cannot be cancelled because it was already processed, + /// `false` is returned. + pub fn cancel_event(self) -> bool { + match self.state.compare_exchange( + Self::IS_PENDING, + Self::IS_CANCELLED, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => true, + Err(s) => s == Self::IS_CANCELLED, + } } } @@ -274,21 +440,6 @@ impl fmt::Display for ScheduledTimeError { impl Error for ScheduledTimeError {} -/// Error returned when the cancellation of a scheduler event is unsuccessful. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct CancellationError {} - -impl fmt::Display for CancellationError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "the scheduler key should belong to an event or command scheduled in the future of the current simulation time" - ) - } -} - -impl Error for CancellationError {} - /// Schedules an event at a future time. /// /// This method does not check whether the specified time lies in the future @@ -299,8 +450,7 @@ pub(crate) fn schedule_event_at_unchecked( arg: T, sender: Sender, scheduler_queue: &Mutex, -) -> SchedulerKey -where +) where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, @@ -321,26 +471,143 @@ where ) .await; }; + let fut = Box::new(UnkeyedEventFuture::new(fut)); let mut scheduler_queue = scheduler_queue.lock().unwrap(); - let insert_key = scheduler_queue.insert((time, channel_id), Box::new(fut)); - - SchedulerKey::new(insert_key) + scheduler_queue.insert((time, channel_id), fut); } -/// Cancels an event or command with a scheduled time in the future of the -/// current simulation time. +/// Schedules an event at a future time, returning an event key. /// -/// If the corresponding event or command was already executed, or if it is -/// scheduled for the current simulation time, an error is returned. -pub(crate) fn cancel_scheduled( - scheduler_key: SchedulerKey, +/// This method does not check whether the specified time lies in the future +/// of the current simulation time. +pub(crate) fn schedule_keyed_event_at_unchecked( + time: MonotonicTime, + func: F, + arg: T, + sender: Sender, scheduler_queue: &Mutex, -) -> Result<(), CancellationError> { - let mut scheduler_queue = scheduler_queue.lock().unwrap(); - if scheduler_queue.delete(scheduler_key.0) { - return Ok(()); - } +) -> EventKey +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let channel_id = sender.channel_id(); - Err(CancellationError {}) + let event_key = EventKey::new(); + let local_event_key = event_key.clone(); + + let fut = async move { + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = async move { + if local_event_key.process_event() { + func.call(model, arg, scheduler).await; + } + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; + }; + + // Implementation note: we end up with two atomic references to the event + // key stored inside the event future: one was moved above to the future + // itself and the other one is created below via cloning and stored + // separately in the `KeyedEventFuture`. This is not ideal as we could + // theoretically spare on atomic reference counting by storing a single + // reference, but this would likely require some tricky `unsafe`, not least + // because the inner future sent to the mailbox outlives the + // `KeyedEventFuture`. + let fut = Box::new(KeyedEventFuture::new(fut, event_key.clone())); + + let mut scheduler_queue = scheduler_queue.lock().unwrap(); + scheduler_queue.insert((time, channel_id), fut); + + event_key +} + +/// The future of an event which scheduling may be cancelled by the user. +pub(crate) trait EventFuture: Future { + /// Whether the scheduling of this event was cancelled. + fn is_cancelled(&self) -> bool; +} + +pin_project! { + /// Future associated to a regular event that cannot be cancelled. + pub(crate) struct UnkeyedEventFuture { + #[pin] + fut: F, + } +} + +impl UnkeyedEventFuture { + /// Creates a new `EventFuture`. + pub(crate) fn new(fut: F) -> Self { + Self { fut } + } +} + +impl Future for UnkeyedEventFuture +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl EventFuture for UnkeyedEventFuture +where + F: Future, +{ + fn is_cancelled(&self) -> bool { + false + } +} + +pin_project! { + /// Future associated to a keyed event that can be cancelled. + pub(crate) struct KeyedEventFuture { + event_key: EventKey, + #[pin] + fut: F, + } +} + +impl KeyedEventFuture { + /// Creates a new `EventFuture`. + pub(crate) fn new(fut: F, event_key: EventKey) -> Self { + Self { event_key, fut } + } +} + +impl Future for KeyedEventFuture +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl EventFuture for KeyedEventFuture +where + F: Future, +{ + fn is_cancelled(&self) -> bool { + self.event_key.event_is_cancelled() + } } diff --git a/asynchronix/src/util/futures.rs b/asynchronix/src/util/futures.rs index 22200a3..026e563 100644 --- a/asynchronix/src/util/futures.rs +++ b/asynchronix/src/util/futures.rs @@ -4,6 +4,8 @@ use std::future::Future; use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::task::{Context, Poll}; /// An owned future which sequentially polls a collection of futures. @@ -51,3 +53,39 @@ impl Future for SeqFuture { Poll::Pending } } + +trait RevocableFuture: Future { + fn is_revoked() -> bool; +} + +struct NeverRevokedFuture { + inner: F, +} + +impl NeverRevokedFuture { + fn new(fut: F) -> Self { + Self { inner: fut } + } +} +impl Future for NeverRevokedFuture { + type Output = T::Output; + + #[inline(always)] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll(cx) } + } +} + +impl RevocableFuture for NeverRevokedFuture { + fn is_revoked() -> bool { + false + } +} + +struct ConcurrentlyRevocableFuture { + inner: F, + is_revoked: Arc, +}