From b0f7e690392d63ed7417a791ad0390c4719bd500 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Fri, 28 Jul 2023 16:14:36 +0200 Subject: [PATCH] Implement periodic events --- asynchronix/examples/espresso_machine.rs | 8 +- asynchronix/src/lib.rs | 12 +- asynchronix/src/simulation.rs | 309 +++++-- asynchronix/src/time.rs | 6 +- asynchronix/src/time/scheduler.rs | 1028 ++++++++++++++++------ 5 files changed, 1008 insertions(+), 355 deletions(-) diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index 30a3fb4..6e9af84 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -104,7 +104,7 @@ impl Controller { if state == WaterSenseState::Empty && self.water_sense == WaterSenseState::NotEmpty { // If a brew was ongoing, we must cancel it. if let Some(key) = self.stop_brew_key.take() { - key.cancel_event(); + key.cancel(); self.pump_cmd.send(PumpCommand::Off).await; } } @@ -128,7 +128,7 @@ impl Controller { self.pump_cmd.send(PumpCommand::Off).await; // Abort the scheduled call to `stop_brew()`. - key.cancel_event(); + key.cancel(); return; } @@ -204,7 +204,7 @@ impl Tank { // schedule a new update. if let Some(state) = self.dynamic_state.take() { // Abort the scheduled call to `set_empty()`. - state.set_empty_key.cancel_event(); + state.set_empty_key.cancel(); // Update the volume, saturating at 0 in case of rounding errors. let time = scheduler.time(); @@ -237,7 +237,7 @@ impl Tank { // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { // Abort the scheduled call to `set_empty()`. - state.set_empty_key.cancel_event(); + state.set_empty_key.cancel(); // Update the volume, saturating at 0 in case of rounding errors. let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index bd88eef..2ce3bac 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -353,13 +353,11 @@ //! process `M1` before `M3`. //! //! The first guarantee (and only the first) also extends to events scheduled -//! from a simulation with -//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in) -//! or -//! [`Simulation::schedule_event_at()`](simulation::Simulation::schedule_event_at): -//! if the scheduler contains several events to be delivered at the same time to -//! the same model, these events will always be processed in the order in which -//! they were scheduled. +//! from a simulation with a +//! [`Simulation::schedule_*()`](simulation::Simulation::schedule_event_at) +//! method: if the scheduler contains several events to be delivered at the same +//! time to the same model, these events will always be processed in the order +//! in which they were scheduled. //! //! [actor_model]: https://en.wikipedia.org/wiki/Actor_model //! [pony]: https://www.ponylang.io/ diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 8c5b957..2d7e22d 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -136,8 +136,10 @@ use recycle_box::{coerce_box, RecycleBox}; use crate::executor::Executor; use crate::model::{InputFn, Model, ReplierFn}; -use crate::time::{self, MonotonicTime, TearableAtomicTime}; -use crate::time::{EventKey, ScheduledTimeError, SchedulerQueue}; +use crate::time::{ + self, EventKey, MonotonicTime, ScheduledEvent, SchedulerQueue, SchedulingError, + TearableAtomicTime, +}; use crate::util::futures::SeqFuture; use crate::util::slot; use crate::util::sync_cell::SyncCell; @@ -152,10 +154,10 @@ use crate::util::sync_cell::SyncCell; /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation /// itself, but also from models via the optional -/// [`&Scheduler`][crate::time::Scheduler] argument of input and replier port -/// methods. Likewise, simulation time can be accessed with the -/// [`Simulation::time()`] method, or from models with the -/// [`Scheduler::time()`](crate::time::Scheduler::time) method. +/// [`&Scheduler`][time::Scheduler] argument of input and replier port methods. +/// Likewise, simulation time can be accessed with the [`Simulation::time()`] +/// method, or from models with the +/// [`Scheduler::time()`](time::Scheduler::time) method. /// /// Events and queries can be scheduled immediately, *i.e.* for the current /// simulation time, using [`send_event()`](Simulation::send_event) and @@ -163,9 +165,8 @@ use crate::util::sync_cell::SyncCell; /// until all computations triggered by such event or query have completed. In /// the case of queries, the response is returned. /// -/// Events can also be scheduled at a future simulation time using -/// [`schedule_event_in()`](Simulation::schedule_event_in) or -/// [`schedule_event_at()`](Simulation::schedule_event_at). These methods queue +/// Events can also be scheduled at a future simulation time using one of the +/// [`schedule_*()`](Simulation::schedule_event_at) method. These methods queue /// an event without blocking. /// /// Finally, the [`Simulation`] instance manages simulation time. Calling @@ -229,66 +230,138 @@ impl Simulation { /// This method may block. Once it returns, it is guaranteed that (i) all /// events scheduled up to the specified target time have completed and (ii) /// the final simulation time matches the target time. - pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ScheduledTimeError<()>> { + pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), SchedulingError> { if self.time.read() >= target_time { - return Err(ScheduledTimeError(())); + return Err(SchedulingError::InvalidScheduledTime); } self.step_until_unchecked(target_time); Ok(()) } - /// Schedules an event at the lapse of the specified duration. + /// Schedules an event at a future time. /// - /// An error is returned if the specified duration is null. + /// An error is returned if the specified time is not in the future of the + /// current simulation time. /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_event_in( + /// + /// See also: [`time::Scheduler::schedule_event_at`]. + pub fn schedule_event_at( &mut self, - duration: Duration, + time: MonotonicTime, func: F, arg: T, address: impl Into>, - ) -> Result<(), ScheduledTimeError> + ) -> Result<(), SchedulingError> where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, + S: Send + 'static, { - if duration.is_zero() { - return Err(ScheduledTimeError(arg)); + if self.time.read() >= time { + return Err(SchedulingError::InvalidScheduledTime); } - let time = self.time.read() + duration; + time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules an event at the lapse of the specified duration. + /// + /// An error is returned if the specified delay is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`time::Scheduler::schedule_event_in`]. + pub fn schedule_event_in( + &mut self, + delay: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + let time = self.time.read() + delay; time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); Ok(()) } - /// Schedules an event at the lapse of the specified duration and returns an - /// event key. + /// Schedules a cancellable event at a future time and returns an event key. /// - /// An error is returned if the specified duration is null. + /// An error is returned if the specified time is not in the future of the + /// current simulation time. /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_keyed_event_in( + /// + /// See also: [`time::Scheduler::schedule_keyed_event_at`]. + pub fn schedule_keyed_event_at( &mut self, - duration: Duration, + time: MonotonicTime, func: F, arg: T, address: impl Into>, - ) -> Result> + ) -> Result where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, + S: Send + 'static, { - if duration.is_zero() { - return Err(ScheduledTimeError(arg)); + if self.time.read() >= time { + return Err(SchedulingError::InvalidScheduledTime); } - let time = self.time.read() + duration; + let event_key = time::schedule_keyed_event_at_unchecked( + time, + func, + arg, + address.into().0, + &self.scheduler_queue, + ); + + Ok(event_key) + } + + /// Schedules a cancellable event at the lapse of the specified duration and + /// returns an event key. + /// + /// An error is returned if the specified delay is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`time::Scheduler::schedule_keyed_event_in`]. + pub fn schedule_keyed_event_in( + &mut self, + delay: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + let time = self.time.read() + delay; let event_key = time::schedule_keyed_event_at_unchecked( time, @@ -301,57 +374,168 @@ impl Simulation { Ok(event_key) } - /// Schedules an event at a future time. + /// Schedules a periodically recurring event at a future time. /// /// An error is returned if the specified time is not in the future of the - /// current simulation time. + /// current simulation time or if the specified period is null. /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_event_at( + /// + /// See also: [`time::Scheduler::schedule_periodic_event_at`]. + pub fn schedule_periodic_event_at( &mut self, time: MonotonicTime, + period: Duration, func: F, arg: T, address: impl Into>, - ) -> Result<(), ScheduledTimeError> + ) -> Result<(), SchedulingError> where M: Model, - F: for<'a> InputFn<'a, M, T, S>, + F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + Clone + 'static, + S: Send + 'static, { if self.time.read() >= time { - return Err(ScheduledTimeError(arg)); + return Err(SchedulingError::InvalidScheduledTime); } - time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + time::schedule_periodic_event_at_unchecked( + time, + period, + func, + arg, + address.into().0, + &self.scheduler_queue, + ); Ok(()) } - /// Schedules an event at a future time and returns an event key. + /// Schedules a periodically recurring event at the lapse of the specified + /// duration. /// - /// An error is returned if the specified time is not in the future of the - /// current simulation time. + /// An error is returned if the specified delay or the specified period are + /// null. /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - pub fn schedule_keyed_event_at( + /// + /// See also: [`time::Scheduler::schedule_periodic_event_in`]. + pub fn schedule_periodic_event_in( &mut self, - time: MonotonicTime, + delay: Duration, + period: Duration, func: F, arg: T, address: impl Into>, - ) -> Result> + ) -> Result<(), SchedulingError> where M: Model, - F: for<'a> InputFn<'a, M, T, S>, + F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let time = self.time.read() + delay; + + time::schedule_periodic_event_at_unchecked( + time, + period, + func, + arg, + address.into().0, + &self.scheduler_queue, + ); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event at a future time + /// and returns an event key. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time or if the specified period is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`time::Scheduler::schedule_periodic_keyed_event_at`]. + pub fn schedule_periodic_keyed_event_at( + &mut self, + time: MonotonicTime, + period: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, { if self.time.read() >= time { - return Err(ScheduledTimeError(arg)); + return Err(SchedulingError::InvalidScheduledTime); } - let event_key = time::schedule_keyed_event_at_unchecked( + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let event_key = time::schedule_periodic_keyed_event_at_unchecked( time, + period, + func, + arg, + address.into().0, + &self.scheduler_queue, + ); + + Ok(event_key) + } + + /// Schedules a cancellable, periodically recurring event at the lapse of + /// the specified duration and returns an event key. + /// + /// An error is returned if the specified delay or the specified period are + /// null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`time::Scheduler::schedule_periodic_keyed_event_in`]. + pub fn schedule_periodic_keyed_event_in( + &mut self, + delay: Duration, + period: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let time = self.time.read() + delay; + + let event_key = time::schedule_periodic_keyed_event_at_unchecked( + time, + period, func, arg, address.into().0, @@ -441,9 +625,22 @@ impl Simulation { /// If at least one event was found that satisfied the time bound, the /// corresponding new simulation time is returned. fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option { + // Function pulling the next event. If the event is periodic, it is + // immediately cloned and re-scheduled. + fn pull_next_event( + scheduler_queue: &mut MutexGuard, + ) -> Box { + let ((time, channel_id), event) = scheduler_queue.pull().unwrap(); + if let Some((event_clone, period)) = event.next() { + scheduler_queue.insert((time + period, channel_id), event_clone); + } + + event + } + // Closure returning the next key which time stamp is no older than the - // upper bound, if any. Cancelled events are discarded. - let get_next_key = |scheduler_queue: &mut MutexGuard| { + // upper bound, if any. Cancelled events are pulled and discarded. + let peek_next_key = |scheduler_queue: &mut MutexGuard| { loop { match scheduler_queue.peek() { Some((&k, t)) if k.0 <= upper_time_bound => { @@ -460,34 +657,32 @@ impl Simulation { // Move to the next scheduled time. let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let mut current_key = get_next_key(&mut scheduler_queue)?; + let mut current_key = peek_next_key(&mut scheduler_queue)?; self.time.write(current_key.0); loop { - let event = scheduler_queue.pull().unwrap().1; - - let mut next_key = get_next_key(&mut scheduler_queue); + let event = pull_next_event(&mut scheduler_queue); + let mut next_key = peek_next_key(&mut scheduler_queue); if next_key != Some(current_key) { // Since there are no other events targeting the same mailbox // and the same time, the event is spawned immediately. - self.executor.spawn_and_forget(Box::into_pin(event)); + event.spawn_and_forget(&self.executor); } else { // To ensure that their relative order of execution is // preserved, all event targeting the same mailbox are executed // sequentially within a single compound future. let mut event_sequence = SeqFuture::new(); - - event_sequence.push(Box::into_pin(event)); + event_sequence.push(event.into_future()); loop { - let event = scheduler_queue.pull().unwrap().1; - event_sequence.push(Box::into_pin(event)); - next_key = get_next_key(&mut scheduler_queue); + let event = pull_next_event(&mut scheduler_queue); + event_sequence.push(event.into_future()); + next_key = peek_next_key(&mut scheduler_queue); if next_key != Some(current_key) { break; } } - // Spawn a parent event that sequentially polls all events + // Spawn a compound future that sequentially polls all events // targeting the same mailbox. self.executor.spawn_and_forget(event_sequence); } diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index 56dbb58..5ff4dcd 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -51,6 +51,8 @@ mod scheduler; pub(crate) use monotonic_time::TearableAtomicTime; pub use monotonic_time::{MonotonicTime, SystemTimeError}; pub(crate) use scheduler::{ - schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, SchedulerQueue, + schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, + schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, + ScheduledEvent, SchedulerQueue, }; -pub use scheduler::{EventKey, ScheduledTimeError, Scheduler}; +pub use scheduler::{EventKey, Scheduler, SchedulingError}; diff --git a/asynchronix/src/time/scheduler.rs b/asynchronix/src/time/scheduler.rs index 99ee365..af5dac8 100644 --- a/asynchronix/src/time/scheduler.rs +++ b/asynchronix/src/time/scheduler.rs @@ -3,7 +3,9 @@ use std::error::Error; use std::fmt; use std::future::Future; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::Duration; @@ -12,20 +14,20 @@ use pin_project_lite::pin_project; use recycle_box::{coerce_box, RecycleBox}; use crate::channel::{ChannelId, Sender}; +use crate::executor::Executor; use crate::model::{InputFn, Model}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCellReader; /// Shorthand for the scheduler queue type. -pub(crate) type SchedulerQueue = - PriorityQueue<(MonotonicTime, ChannelId), Box + Send>>; +pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, ChannelId), Box>; /// A local scheduler for models. /// /// A `Scheduler` is a handle to the global scheduler associated to a model -/// instance. It can be used by the model to retrieve the simulation time, to -/// schedule delayed actions on itself or to cancel such actions. +/// instance. It can be used by the model to retrieve the simulation time or +/// schedule delayed actions on itself. /// /// ### Caveat: self-scheduling `async` methods /// @@ -59,20 +61,19 @@ pub(crate) type SchedulerQueue = /// /// #[derive(Default)] /// pub struct DelayedGreeter { -/// msg_out: Output +/// msg_out: Output, /// } +/// /// impl DelayedGreeter { /// // Triggers a greeting on the output port after some delay [input port]. /// pub async fn greet_with_delay(&mut self, delay: Duration, scheduler: &Scheduler) { /// let time = scheduler.time(); -/// let greeting = format!("Hello, this message was scheduled at: -/// {:?}.", time); +/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time); /// -/// if let Err(err) = scheduler.schedule_event_in(delay, Self::send_msg, greeting) { -/// // ^^^^^^^^ scheduled method -/// // The duration was zero, so greet right away. -/// let greeting = err.0; +/// if delay.is_zero() { /// self.msg_out.send(greeting).await; +/// } else { +/// scheduler.schedule_event_in(delay, Self::send_msg, greeting).unwrap(); /// } /// } /// @@ -124,117 +125,6 @@ impl Scheduler { self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") } - /// Schedules an event at the lapse of the specified duration. - /// - /// An error is returned if the specified duration is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// use std::future::Future; - /// use asynchronix::model::Model; - /// use asynchronix::time::Scheduler; - /// - /// // A model that logs the value of a counter every second after being - /// // triggered the first time. - /// pub struct PeriodicLogger {} - /// - /// impl PeriodicLogger { - /// // Triggers the logging of a timestamp every second [input port]. - /// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler) { - /// println!("counter: {}", counter); - /// - /// // Schedule this method again in 1s with an incremented counter. - /// scheduler - /// .schedule_event_in(Duration::from_secs(1), Self::trigger, counter + 1) - /// .unwrap(); - /// } - /// } - /// - /// impl Model for PeriodicLogger {} - /// ``` - pub fn schedule_event_in( - &self, - duration: Duration, - func: F, - arg: T, - ) -> Result<(), ScheduledTimeError> - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - { - if duration.is_zero() { - return Err(ScheduledTimeError(arg)); - } - let time = self.time() + duration; - let sender = self.sender.clone(); - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - - Ok(()) - } - - /// Schedules an event at the lapse of the specified duration and returns an - /// event key. - /// - /// An error is returned if the specified duration is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// use std::future::Future; - /// use asynchronix::model::Model; - /// use asynchronix::time::{EventKey, Scheduler}; - /// - /// // A model that logs the value of a counter every second after being - /// // triggered the first time until logging is stopped. - /// pub struct PeriodicLogger { - /// event_key: Option - /// } - /// - /// impl PeriodicLogger { - /// // Triggers the logging of a timestamp every second [input port]. - /// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler) { - /// self.stop(); - /// println!("counter: {}", counter); - /// - /// // Schedule this method again in 1s with an incremented counter. - /// let event_key = scheduler - /// .schedule_keyed_event_in(Duration::from_secs(1), Self::trigger, counter + 1) - /// .unwrap(); - /// self.event_key = Some(event_key); - /// } - /// - /// // Cancels the logging of timestamps. - /// pub fn stop(&mut self) { - /// self.event_key.take().map(|k| k.cancel_event()); - /// } - /// } - /// - /// impl Model for PeriodicLogger {} - /// ``` - pub fn schedule_keyed_event_in( - &self, - duration: Duration, - func: F, - arg: T, - ) -> Result> - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - { - if duration.is_zero() { - return Err(ScheduledTimeError(arg)); - } - let time = self.time() + duration; - let sender = self.sender.clone(); - let event_key = - schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - - Ok(event_key) - } - /// Schedules an event at a future time. /// /// An error is returned if the specified time is not in the future of the @@ -247,16 +137,9 @@ impl Scheduler { /// use asynchronix::time::{MonotonicTime, Scheduler}; /// /// // An alarm clock. - /// pub struct AlarmClock { - /// msg: String - /// } + /// pub struct AlarmClock {} /// /// impl AlarmClock { - /// // Creates a new alarm clock. - /// pub fn new(msg: String) -> Self { - /// Self { msg } - /// } - /// /// // Sets an alarm [input port]. /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { /// if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() { @@ -266,7 +149,7 @@ impl Scheduler { /// /// // Rings the alarm [private input port]. /// fn ring(&mut self) { - /// println!("{}", self.msg); + /// println!("Brringggg"); /// } /// } /// @@ -277,13 +160,14 @@ impl Scheduler { time: MonotonicTime, func: F, arg: T, - ) -> Result<(), ScheduledTimeError> + ) -> Result<(), SchedulingError> where F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, + S: Send + 'static, { if self.time() >= time { - return Err(ScheduledTimeError(arg)); + return Err(SchedulingError::InvalidScheduledTime); } let sender = self.sender.clone(); schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); @@ -291,7 +175,58 @@ impl Scheduler { Ok(()) } - /// Schedules an event at a future time and returns an event key. + /// Schedules an event at the lapse of the specified duration. + /// + /// An error is returned if the specified delay is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use std::future::Future; + /// use asynchronix::model::Model; + /// use asynchronix::time::Scheduler; + /// + /// // A model that logs the value of a counter every second after being + /// // triggered the first time. + /// pub struct CounterLogger {} + /// + /// impl CounterLogger { + /// // Triggers the logging of a timestamp every second [input port]. + /// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler) { + /// println!("counter: {}", counter); + /// + /// // Schedule this method again in 1s with an incremented counter. + /// scheduler + /// .schedule_event_in(Duration::from_secs(1), Self::trigger, counter + 1) + /// .unwrap(); + /// } + /// } + /// + /// impl Model for CounterLogger {} + /// ``` + pub fn schedule_event_in( + &self, + delay: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + let time = self.time() + delay; + let sender = self.sender.clone(); + schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules a cancellable event at a future time and returns an event key. /// /// An error is returned if the specified time is not in the future of the /// current simulation time. @@ -303,20 +238,12 @@ impl Scheduler { /// use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; /// /// // An alarm clock that can be cancelled. - /// pub struct AlarmClock { - /// msg: String, + /// #[derive(Default)] + /// pub struct CancellableAlarmClock { /// event_key: Option, /// } /// - /// impl AlarmClock { - /// // Creates a new alarm clock. - /// pub fn new(msg: String) -> Self { - /// Self { - /// msg, - /// event_key: None - /// } - /// } - /// + /// impl CancellableAlarmClock { /// // Sets an alarm [input port]. /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { /// self.cancel(); @@ -326,31 +253,32 @@ impl Scheduler { /// }; /// } /// - /// // Cancels the current alarm, if any. + /// // Cancels the current alarm, if any [input port]. /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel_event()); + /// self.event_key.take().map(|k| k.cancel()); /// } /// /// // Rings the alarm [private input port]. /// fn ring(&mut self) { - /// println!("{}", self.msg); + /// println!("Brringggg!"); /// } /// } /// - /// impl Model for AlarmClock {} + /// impl Model for CancellableAlarmClock {} /// ``` pub fn schedule_keyed_event_at( &self, time: MonotonicTime, func: F, arg: T, - ) -> Result> + ) -> Result where F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, + S: Send + 'static, { if self.time() >= time { - return Err(ScheduledTimeError(arg)); + return Err(SchedulingError::InvalidScheduledTime); } let sender = self.sender.clone(); let event_key = @@ -358,6 +286,266 @@ impl Scheduler { Ok(event_key) } + + /// Schedules a cancellable event at the lapse of the specified duration and + /// returns an event key. + /// + /// An error is returned if the specified delay is null. + /// + /// See also: + /// [`schedule_keyed_event_at`][Scheduler::schedule_keyed_event_at], + /// [`schedule_event_in`][Scheduler::schedule_event_in]. + pub fn schedule_keyed_event_in( + &self, + delay: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + let time = self.time() + delay; + let sender = self.sender.clone(); + let event_key = + schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(event_key) + } + + /// Schedules a periodically recurring event at a future time. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::Model; + /// use asynchronix::time::{MonotonicTime, Scheduler}; + /// + /// // An alarm clock beeping at 1Hz. + /// pub struct BeepingAlarmClock {} + /// + /// impl BeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { + /// if scheduler.schedule_periodic_event_at( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for BeepingAlarmClock {} + /// ``` + pub fn schedule_periodic_event_at( + &self, + time: MonotonicTime, + period: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if self.time() >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = self.sender.clone(); + schedule_periodic_event_at_unchecked( + time, + period, + func, + arg, + sender, + &self.scheduler_queue, + ); + + Ok(()) + } + + /// Schedules a periodically recurring event at the lapse of the specified + /// duration. + /// + /// An error is returned if the specified delay or the specified period are + /// null. + /// + /// See also: + /// [`schedule_periodic_event_at`][Scheduler::schedule_periodic_event_at], + /// [`schedule_event_in`][Scheduler::schedule_event_in]. + pub fn schedule_periodic_event_in( + &self, + delay: Duration, + period: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let time = self.time() + delay; + let sender = self.sender.clone(); + schedule_periodic_event_at_unchecked( + time, + period, + func, + arg, + sender, + &self.scheduler_queue, + ); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event at a future time + /// and returns an event key. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::Model; + /// use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; + /// + /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or + /// // stopped after it sets off. + /// #[derive(Default)] + /// pub struct CancellableBeepingAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableBeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { + /// self.cancel(); + /// match scheduler.schedule_periodic_keyed_event_at( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels or stops the alarm [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for CancellableBeepingAlarmClock {} + /// ``` + pub fn schedule_periodic_keyed_event_at( + &self, + time: MonotonicTime, + period: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if self.time() >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = self.sender.clone(); + let event_key = schedule_periodic_keyed_event_at_unchecked( + time, + period, + func, + arg, + sender, + &self.scheduler_queue, + ); + + Ok(event_key) + } + + /// Schedules a cancellable, periodically recurring event at the lapse of + /// the specified duration and returns an event key. + /// + /// An error is returned if the specified delay or the specified period are + /// null. + /// + /// See also: + /// [`schedule_periodic_keyed_event_at`][Scheduler::schedule_periodic_keyed_event_at], + /// [`schedule_event_in`][Scheduler::schedule_event_in]. + pub fn schedule_periodic_keyed_event_in( + &self, + delay: Duration, + period: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if delay.is_zero() { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let time = self.time() + delay; + let sender = self.sender.clone(); + let event_key = schedule_periodic_keyed_event_at_unchecked( + time, + period, + func, + arg, + sender, + &self.scheduler_queue, + ); + + Ok(event_key) + } } impl fmt::Debug for Scheduler { @@ -371,74 +559,51 @@ impl fmt::Debug for Scheduler { /// An `EventKey` can be used to cancel a future event. #[derive(Clone, Debug)] pub struct EventKey { - state: Arc, + is_cancelled: Arc, } impl EventKey { - const IS_PENDING: usize = 0; - const IS_CANCELLED: usize = 1; - const IS_PROCESSED: usize = 2; - /// Creates a key for a pending event. pub(crate) fn new() -> Self { Self { - state: Arc::new(AtomicUsize::new(Self::IS_PENDING)), + is_cancelled: Arc::new(AtomicBool::new(false)), } } /// Checks whether the event was cancelled. - pub(crate) fn event_is_cancelled(&self) -> bool { - self.state.load(Ordering::Relaxed) == Self::IS_CANCELLED + pub(crate) fn is_cancelled(&self) -> bool { + self.is_cancelled.load(Ordering::Relaxed) } - /// Marks the event as processed. - /// - /// If the event cannot be processed because it was cancelled, `false` is - /// returned. - pub(crate) fn process_event(self) -> bool { - match self.state.compare_exchange( - Self::IS_PENDING, - Self::IS_PROCESSED, - Ordering::Relaxed, - Ordering::Relaxed, - ) { - Ok(_) => true, - Err(s) => s == Self::IS_PROCESSED, - } - } - - /// Cancels the associated event if possible. - /// - /// If the event cannot be cancelled because it was already processed, - /// `false` is returned. - pub fn cancel_event(self) -> bool { - match self.state.compare_exchange( - Self::IS_PENDING, - Self::IS_CANCELLED, - Ordering::Relaxed, - Ordering::Relaxed, - ) { - Ok(_) => true, - Err(s) => s == Self::IS_CANCELLED, - } + /// Cancels the associated event. + pub fn cancel(self) { + self.is_cancelled.store(true, Ordering::Relaxed); } } -/// Error returned when the scheduled time does not lie in the future of the -/// current simulation time. +/// Error returned when the scheduled time or the repetition period are invalid. #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct ScheduledTimeError(pub T); +pub enum SchedulingError { + /// The scheduled time does not lie in the future of the current simulation + /// time. + InvalidScheduledTime, + /// The repetition period is zero. + NullRepetitionPeriod, +} -impl fmt::Display for ScheduledTimeError { +impl fmt::Display for SchedulingError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "the scheduled time should be in the future of the current simulation time" - ) + match self { + Self::InvalidScheduledTime => write!( + fmt, + "the scheduled time should be in the future of the current simulation time" + ), + Self::NullRepetitionPeriod => write!(fmt, "the repetition period cannot be zero"), + } } } -impl Error for ScheduledTimeError {} +impl Error for SchedulingError {} /// Schedules an event at a future time. /// @@ -454,27 +619,14 @@ pub(crate) fn schedule_event_at_unchecked( M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, + S: Send + 'static, { let channel_id = sender.channel_id(); - let fut = async move { - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = func.call(model, arg, scheduler); - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; - }; - let fut = Box::new(UnkeyedEventFuture::new(fut)); + let event_dispatcher = Box::new(new_event_dispatcher(func, arg, sender)); let mut scheduler_queue = scheduler_queue.lock().unwrap(); - scheduler_queue.insert((time, channel_id), fut); + scheduler_queue.insert((time, channel_id), event_dispatcher); } /// Schedules an event at a future time, returning an event key. @@ -492,69 +644,137 @@ where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, + S: Send + 'static, { - let channel_id = sender.channel_id(); - let event_key = EventKey::new(); - let local_event_key = event_key.clone(); - - let fut = async move { - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = async move { - if local_event_key.process_event() { - func.call(model, arg, scheduler).await; - } - }; - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; - }; - - // Implementation note: we end up with two atomic references to the event - // key stored inside the event future: one was moved above to the future - // itself and the other one is created below via cloning and stored - // separately in the `KeyedEventFuture`. This is not ideal as we could - // theoretically spare on atomic reference counting by storing a single - // reference, but this would likely require some tricky `unsafe`, not least - // because the inner future sent to the mailbox outlives the - // `KeyedEventFuture`. - let fut = Box::new(KeyedEventFuture::new(fut, event_key.clone())); + let channel_id = sender.channel_id(); + let event_dispatcher = Box::new(KeyedEventDispatcher::new( + event_key.clone(), + func, + arg, + sender, + )); let mut scheduler_queue = scheduler_queue.lock().unwrap(); - scheduler_queue.insert((time, channel_id), fut); + scheduler_queue.insert((time, channel_id), event_dispatcher); event_key } -/// The future of an event which scheduling may be cancelled by the user. -pub(crate) trait EventFuture: Future { - /// Whether the scheduling of this event was cancelled. +/// Schedules a periodic event at a future time. +/// +/// This method does not check whether the specified time lies in the future +/// of the current simulation time. +pub(crate) fn schedule_periodic_event_at_unchecked( + time: MonotonicTime, + period: Duration, + func: F, + arg: T, + sender: Sender, + scheduler_queue: &Mutex, +) where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, +{ + let channel_id = sender.channel_id(); + + let event_dispatcher = Box::new(PeriodicEventDispatcher::new(func, arg, sender, period)); + + let mut scheduler_queue = scheduler_queue.lock().unwrap(); + scheduler_queue.insert((time, channel_id), event_dispatcher); +} + +/// Schedules an event at a future time, returning an event key. +/// +/// This method does not check whether the specified time lies in the future +/// of the current simulation time. +pub(crate) fn schedule_periodic_keyed_event_at_unchecked( + time: MonotonicTime, + period: Duration, + func: F, + arg: T, + sender: Sender, + scheduler_queue: &Mutex, +) -> EventKey +where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, +{ + let event_key = EventKey::new(); + let channel_id = sender.channel_id(); + let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new( + event_key.clone(), + func, + arg, + sender, + period, + )); + + let mut scheduler_queue = scheduler_queue.lock().unwrap(); + scheduler_queue.insert((time, channel_id), event_dispatcher); + + event_key +} + +/// Trait for objects that can be converted to a future dispatching a scheduled +/// event. +pub(crate) trait ScheduledEvent: Send { + /// Reports whether the associated event was cancelled. fn is_cancelled(&self) -> bool; + + /// Returns a boxed clone of this event and the repetition period if this is + /// a periodic even, otherwise returns `None`. + fn next(&self) -> Option<(Box, Duration)>; + + /// Returns a boxed future dispatching the associated event. + fn into_future(self: Box) -> Pin + Send>>; + + /// Spawns the future that dispatches the associated event onto the provided + /// executor. + /// + /// This method is typically more efficient that spawning the boxed future + /// from `into_future` since it can directly spawn the unboxed future. + fn spawn_and_forget(self: Box, executor: &Executor); } pin_project! { - /// Future associated to a regular event that cannot be cancelled. - pub(crate) struct UnkeyedEventFuture { + /// Object that can be converted to a future dispatching a non-cancellable + /// event. + /// + /// Note that this particular event dispatcher is in fact already a future: + /// since the future cannot be cancelled and the dispatcher does not need to + /// be cloned, there is no need to defer the construction of the future. + /// This makes `into_future` a trivial cast, which saves a boxing operation. + pub(crate) struct EventDispatcher { #[pin] fut: F, } } -impl UnkeyedEventFuture { - /// Creates a new `EventFuture`. - pub(crate) fn new(fut: F) -> Self { - Self { fut } - } +/// Constructs a new `EventDispatcher`. +/// +/// Due to some limitations of type inference or of my understanding of it, the +/// constructor for this event dispatchers is a freestanding function. +fn new_event_dispatcher( + func: F, + arg: T, + sender: Sender, +) -> EventDispatcher> +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let fut = dispatch_event(func, arg, sender); + + EventDispatcher { fut } } -impl Future for UnkeyedEventFuture +impl Future for EventDispatcher where F: Future, { @@ -566,48 +786,286 @@ where } } -impl EventFuture for UnkeyedEventFuture +impl ScheduledEvent for EventDispatcher where - F: Future, + F: Future + Send + 'static, { fn is_cancelled(&self) -> bool { false } -} - -pin_project! { - /// Future associated to a keyed event that can be cancelled. - pub(crate) struct KeyedEventFuture { - event_key: EventKey, - #[pin] - fut: F, + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + // No need for boxing, type coercion is enough here. + Box::into_pin(self) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget(*self); } } -impl KeyedEventFuture { - /// Creates a new `EventFuture`. - pub(crate) fn new(fut: F, event_key: EventKey) -> Self { - Self { event_key, fut } - } -} - -impl Future for KeyedEventFuture +/// Object that can be converted to a future dispatching a non-cancellable periodic +/// event. +pub(crate) struct PeriodicEventDispatcher where - F: Future, + M: Model, { - type Output = F::Output; + func: F, + arg: T, + sender: Sender, + period: Duration, + _input_kind: PhantomData, +} - #[inline(always)] - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) +impl PeriodicEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + /// Constructs a new `PeriodicEventDispatcher`. + fn new(func: F, arg: T, sender: Sender, period: Duration) -> Self { + Self { + func, + arg, + sender, + period, + _input_kind: PhantomData, + } } } -impl EventFuture for KeyedEventFuture +impl ScheduledEvent for PeriodicEventDispatcher where - F: Future, + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, { fn is_cancelled(&self) -> bool { - self.event_key.event_is_cancelled() + false + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new( + self.func.clone(), + self.arg.clone(), + self.sender.clone(), + self.period, + )); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + let Self { + func, arg, sender, .. + } = *self; + + Box::pin(dispatch_event(func, arg, sender)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + let Self { + func, arg, sender, .. + } = *self; + + let fut = dispatch_event(func, arg, sender); + executor.spawn_and_forget(fut); } } + +/// Object that can be converted to a future dispatching a cancellable event. +pub(crate) struct KeyedEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + event_key: EventKey, + func: F, + arg: T, + sender: Sender, + _input_kind: PhantomData, +} + +impl KeyedEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + /// Constructs a new `KeyedEventDispatcher`. + fn new(event_key: EventKey, func: F, arg: T, sender: Sender) -> Self { + Self { + event_key, + func, + arg, + sender, + _input_kind: PhantomData, + } + } +} + +impl ScheduledEvent for KeyedEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + Box::pin(dispatch_keyed_event(event_key, func, arg, sender)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + let fut = dispatch_keyed_event(event_key, func, arg, sender); + executor.spawn_and_forget(fut); + } +} + +/// Object that can be converted to a future dispatching a cancellable event. +pub(crate) struct PeriodicKeyedEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + event_key: EventKey, + func: F, + arg: T, + sender: Sender, + period: Duration, + _input_kind: PhantomData, +} + +impl PeriodicKeyedEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + /// Constructs a new `KeyedEventDispatcher`. + fn new(event_key: EventKey, func: F, arg: T, sender: Sender, period: Duration) -> Self { + Self { + event_key, + func, + arg, + sender, + period, + _input_kind: PhantomData, + } + } +} + +impl ScheduledEvent for PeriodicKeyedEventDispatcher +where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new( + self.event_key.clone(), + self.func.clone(), + self.arg.clone(), + self.sender.clone(), + self.period, + )); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + Box::pin(dispatch_keyed_event(event_key, func, arg, sender)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + let fut = dispatch_keyed_event(event_key, func, arg, sender); + executor.spawn_and_forget(fut); + } +} + +/// Asynchronously dispatch a regular, non-cancellable event. +async fn dispatch_event(func: F, arg: T, sender: Sender) +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} + +/// Asynchronously dispatch a cancellable event. +async fn dispatch_keyed_event(event_key: EventKey, func: F, arg: T, sender: Sender) +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = async move { + // Only perform the call if the event wasn't cancelled. + if !event_key.is_cancelled() { + func.call(model, arg, scheduler).await; + } + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +}