forked from ROMEO/nexosim
Make it possible to cancel current-time events
This is a pretty large patch that impacts the API. Until now, it was not possible to cancel events that were scheduled for the current simulation time slice, making it necessary for the user to use complex workarounds (see former version of the espresso machine example). The new implementation makes this possible but the generation of a key associated to an event has now a non-negligible cost (basicaly it creates three references to an Arc). For this reason, the API now defaults to NOT creating a key, and new methods were added for situations when the event may need to be cancelled and a key is necessary. See the much simplified implementation of the espresso machine example for a motivating case.
This commit is contained in:
@ -3,20 +3,23 @@
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use recycle_box::{coerce_box, RecycleBox};
|
||||
|
||||
use crate::channel::{ChannelId, Sender};
|
||||
use crate::model::{InputFn, Model};
|
||||
use crate::time::{MonotonicTime, TearableAtomicTime};
|
||||
use crate::util::priority_queue::{self, PriorityQueue};
|
||||
use crate::util::priority_queue::PriorityQueue;
|
||||
use crate::util::sync_cell::SyncCellReader;
|
||||
|
||||
/// Shorthand for the scheduler queue type.
|
||||
pub(crate) type SchedulerQueue =
|
||||
PriorityQueue<(MonotonicTime, ChannelId), Box<dyn Future<Output = ()> + Send>>;
|
||||
PriorityQueue<(MonotonicTime, ChannelId), Box<dyn EventFuture<Output = ()> + Send>>;
|
||||
|
||||
/// A local scheduler for models.
|
||||
///
|
||||
@ -65,8 +68,8 @@ pub(crate) type SchedulerQueue =
|
||||
/// let greeting = format!("Hello, this message was scheduled at:
|
||||
/// {:?}.", time);
|
||||
///
|
||||
/// if let Err(err) = scheduler.schedule_in(delay, Self::send_msg, greeting) {
|
||||
/// // ^^^^^^^^ scheduled method
|
||||
/// if let Err(err) = scheduler.schedule_event_in(delay, Self::send_msg, greeting) {
|
||||
/// // ^^^^^^^^ scheduled method
|
||||
/// // The duration was zero, so greet right away.
|
||||
/// let greeting = err.0;
|
||||
/// self.msg_out.send(greeting).await;
|
||||
@ -144,19 +147,19 @@ impl<M: Model> Scheduler<M> {
|
||||
///
|
||||
/// // Schedule this method again in 1s with an incremented counter.
|
||||
/// scheduler
|
||||
/// .schedule_in(Duration::from_secs(1), Self::trigger, counter + 1)
|
||||
/// .schedule_event_in(Duration::from_secs(1), Self::trigger, counter + 1)
|
||||
/// .unwrap();
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl Model for PeriodicLogger {}
|
||||
/// ```
|
||||
pub fn schedule_in<F, T, S>(
|
||||
pub fn schedule_event_in<F, T, S>(
|
||||
&self,
|
||||
duration: Duration,
|
||||
func: F,
|
||||
arg: T,
|
||||
) -> Result<SchedulerKey, ScheduledTimeError<T>>
|
||||
) -> Result<(), ScheduledTimeError<T>>
|
||||
where
|
||||
F: for<'a> InputFn<'a, M, T, S>,
|
||||
T: Send + Clone + 'static,
|
||||
@ -166,10 +169,70 @@ impl<M: Model> Scheduler<M> {
|
||||
}
|
||||
let time = self.time() + duration;
|
||||
let sender = self.sender.clone();
|
||||
let schedule_key =
|
||||
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
||||
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
||||
|
||||
Ok(schedule_key)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedules an event at the lapse of the specified duration and returns an
|
||||
/// event key.
|
||||
///
|
||||
/// An error is returned if the specified duration is null.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use std::time::Duration;
|
||||
/// use std::future::Future;
|
||||
/// use asynchronix::model::Model;
|
||||
/// use asynchronix::time::{EventKey, Scheduler};
|
||||
///
|
||||
/// // A model that logs the value of a counter every second after being
|
||||
/// // triggered the first time until logging is stopped.
|
||||
/// pub struct PeriodicLogger {
|
||||
/// event_key: Option<EventKey>
|
||||
/// }
|
||||
///
|
||||
/// impl PeriodicLogger {
|
||||
/// // Triggers the logging of a timestamp every second [input port].
|
||||
/// pub fn trigger(&mut self, counter: u64, scheduler: &Scheduler<Self>) {
|
||||
/// self.stop();
|
||||
/// println!("counter: {}", counter);
|
||||
///
|
||||
/// // Schedule this method again in 1s with an incremented counter.
|
||||
/// let event_key = scheduler
|
||||
/// .schedule_keyed_event_in(Duration::from_secs(1), Self::trigger, counter + 1)
|
||||
/// .unwrap();
|
||||
/// self.event_key = Some(event_key);
|
||||
/// }
|
||||
///
|
||||
/// // Cancels the logging of timestamps.
|
||||
/// pub fn stop(&mut self) {
|
||||
/// self.event_key.take().map(|k| k.cancel_event());
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl Model for PeriodicLogger {}
|
||||
/// ```
|
||||
pub fn schedule_keyed_event_in<F, T, S>(
|
||||
&self,
|
||||
duration: Duration,
|
||||
func: F,
|
||||
arg: T,
|
||||
) -> Result<EventKey, ScheduledTimeError<T>>
|
||||
where
|
||||
F: for<'a> InputFn<'a, M, T, S>,
|
||||
T: Send + Clone + 'static,
|
||||
{
|
||||
if duration.is_zero() {
|
||||
return Err(ScheduledTimeError(arg));
|
||||
}
|
||||
let time = self.time() + duration;
|
||||
let sender = self.sender.clone();
|
||||
let event_key =
|
||||
schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
||||
|
||||
Ok(event_key)
|
||||
}
|
||||
|
||||
/// Schedules an event at a future time.
|
||||
@ -183,7 +246,7 @@ impl<M: Model> Scheduler<M> {
|
||||
/// use asynchronix::model::Model;
|
||||
/// use asynchronix::time::{MonotonicTime, Scheduler};
|
||||
///
|
||||
/// // An alarm clock model.
|
||||
/// // An alarm clock.
|
||||
/// pub struct AlarmClock {
|
||||
/// msg: String
|
||||
/// }
|
||||
@ -196,7 +259,7 @@ impl<M: Model> Scheduler<M> {
|
||||
///
|
||||
/// // Sets an alarm [input port].
|
||||
/// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
|
||||
/// if scheduler.schedule_at(setting, Self::ring, ()).is_err() {
|
||||
/// if scheduler.schedule_event_at(setting, Self::ring, ()).is_err() {
|
||||
/// println!("The alarm clock can only be set for a future time");
|
||||
/// }
|
||||
/// }
|
||||
@ -209,12 +272,12 @@ impl<M: Model> Scheduler<M> {
|
||||
///
|
||||
/// impl Model for AlarmClock {}
|
||||
/// ```
|
||||
pub fn schedule_at<F, T, S>(
|
||||
pub fn schedule_event_at<F, T, S>(
|
||||
&self,
|
||||
time: MonotonicTime,
|
||||
func: F,
|
||||
arg: T,
|
||||
) -> Result<SchedulerKey, ScheduledTimeError<T>>
|
||||
) -> Result<(), ScheduledTimeError<T>>
|
||||
where
|
||||
F: for<'a> InputFn<'a, M, T, S>,
|
||||
T: Send + Clone + 'static,
|
||||
@ -223,20 +286,77 @@ impl<M: Model> Scheduler<M> {
|
||||
return Err(ScheduledTimeError(arg));
|
||||
}
|
||||
let sender = self.sender.clone();
|
||||
let schedule_key =
|
||||
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
||||
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
||||
|
||||
Ok(schedule_key)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Cancels an event with a scheduled time in the future of the current
|
||||
/// simulation time.
|
||||
/// Schedules an event at a future time and returns an event key.
|
||||
///
|
||||
/// If the corresponding event was already executed, or if it is scheduled
|
||||
/// for the current simulation time but was not yet executed, an error is
|
||||
/// returned.
|
||||
pub fn cancel(&self, scheduler_key: SchedulerKey) -> Result<(), CancellationError> {
|
||||
cancel_scheduled(scheduler_key, &self.scheduler_queue)
|
||||
/// An error is returned if the specified time is not in the future of the
|
||||
/// current simulation time.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use asynchronix::model::Model;
|
||||
/// use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
|
||||
///
|
||||
/// // An alarm clock that can be cancelled.
|
||||
/// pub struct AlarmClock {
|
||||
/// msg: String,
|
||||
/// event_key: Option<EventKey>,
|
||||
/// }
|
||||
///
|
||||
/// impl AlarmClock {
|
||||
/// // Creates a new alarm clock.
|
||||
/// pub fn new(msg: String) -> Self {
|
||||
/// Self {
|
||||
/// msg,
|
||||
/// event_key: None
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// // Sets an alarm [input port].
|
||||
/// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
|
||||
/// self.cancel();
|
||||
/// match scheduler.schedule_keyed_event_at(setting, Self::ring, ()) {
|
||||
/// Ok(event_key) => self.event_key = Some(event_key),
|
||||
/// Err(_) => println!("The alarm clock can only be set for a future time"),
|
||||
/// };
|
||||
/// }
|
||||
///
|
||||
/// // Cancels the current alarm, if any.
|
||||
/// pub fn cancel(&mut self) {
|
||||
/// self.event_key.take().map(|k| k.cancel_event());
|
||||
/// }
|
||||
///
|
||||
/// // Rings the alarm [private input port].
|
||||
/// fn ring(&mut self) {
|
||||
/// println!("{}", self.msg);
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl Model for AlarmClock {}
|
||||
/// ```
|
||||
pub fn schedule_keyed_event_at<F, T, S>(
|
||||
&self,
|
||||
time: MonotonicTime,
|
||||
func: F,
|
||||
arg: T,
|
||||
) -> Result<EventKey, ScheduledTimeError<T>>
|
||||
where
|
||||
F: for<'a> InputFn<'a, M, T, S>,
|
||||
T: Send + Clone + 'static,
|
||||
{
|
||||
if self.time() >= time {
|
||||
return Err(ScheduledTimeError(arg));
|
||||
}
|
||||
let sender = self.sender.clone();
|
||||
let event_key =
|
||||
schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
||||
|
||||
Ok(event_key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,15 +366,61 @@ impl<M: Model> fmt::Debug for Scheduler<M> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Unique identifier for a scheduled event.
|
||||
/// Handle to a scheduled event.
|
||||
///
|
||||
/// A `SchedulerKey` can be used to cancel a future event.
|
||||
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
|
||||
pub struct SchedulerKey(priority_queue::InsertKey);
|
||||
/// An `EventKey` can be used to cancel a future event.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EventKey {
|
||||
state: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl SchedulerKey {
|
||||
pub(crate) fn new(key: priority_queue::InsertKey) -> Self {
|
||||
Self(key)
|
||||
impl EventKey {
|
||||
const IS_PENDING: usize = 0;
|
||||
const IS_CANCELLED: usize = 1;
|
||||
const IS_PROCESSED: usize = 2;
|
||||
|
||||
/// Creates a key for a pending event.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
state: Arc::new(AtomicUsize::new(Self::IS_PENDING)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the event was cancelled.
|
||||
pub(crate) fn event_is_cancelled(&self) -> bool {
|
||||
self.state.load(Ordering::Relaxed) == Self::IS_CANCELLED
|
||||
}
|
||||
|
||||
/// Marks the event as processed.
|
||||
///
|
||||
/// If the event cannot be processed because it was cancelled, `false` is
|
||||
/// returned.
|
||||
pub(crate) fn process_event(self) -> bool {
|
||||
match self.state.compare_exchange(
|
||||
Self::IS_PENDING,
|
||||
Self::IS_PROCESSED,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => true,
|
||||
Err(s) => s == Self::IS_PROCESSED,
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancels the associated event if possible.
|
||||
///
|
||||
/// If the event cannot be cancelled because it was already processed,
|
||||
/// `false` is returned.
|
||||
pub fn cancel_event(self) -> bool {
|
||||
match self.state.compare_exchange(
|
||||
Self::IS_PENDING,
|
||||
Self::IS_CANCELLED,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => true,
|
||||
Err(s) => s == Self::IS_CANCELLED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -274,21 +440,6 @@ impl<T> fmt::Display for ScheduledTimeError<T> {
|
||||
|
||||
impl<T: fmt::Debug> Error for ScheduledTimeError<T> {}
|
||||
|
||||
/// Error returned when the cancellation of a scheduler event is unsuccessful.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct CancellationError {}
|
||||
|
||||
impl fmt::Display for CancellationError {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
fmt,
|
||||
"the scheduler key should belong to an event or command scheduled in the future of the current simulation time"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for CancellationError {}
|
||||
|
||||
/// Schedules an event at a future time.
|
||||
///
|
||||
/// This method does not check whether the specified time lies in the future
|
||||
@ -299,8 +450,7 @@ pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
|
||||
arg: T,
|
||||
sender: Sender<M>,
|
||||
scheduler_queue: &Mutex<SchedulerQueue>,
|
||||
) -> SchedulerKey
|
||||
where
|
||||
) where
|
||||
M: Model,
|
||||
F: for<'a> InputFn<'a, M, T, S>,
|
||||
T: Send + Clone + 'static,
|
||||
@ -321,26 +471,143 @@ where
|
||||
)
|
||||
.await;
|
||||
};
|
||||
let fut = Box::new(UnkeyedEventFuture::new(fut));
|
||||
|
||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
||||
let insert_key = scheduler_queue.insert((time, channel_id), Box::new(fut));
|
||||
|
||||
SchedulerKey::new(insert_key)
|
||||
scheduler_queue.insert((time, channel_id), fut);
|
||||
}
|
||||
|
||||
/// Cancels an event or command with a scheduled time in the future of the
|
||||
/// current simulation time.
|
||||
/// Schedules an event at a future time, returning an event key.
|
||||
///
|
||||
/// If the corresponding event or command was already executed, or if it is
|
||||
/// scheduled for the current simulation time, an error is returned.
|
||||
pub(crate) fn cancel_scheduled(
|
||||
scheduler_key: SchedulerKey,
|
||||
/// This method does not check whether the specified time lies in the future
|
||||
/// of the current simulation time.
|
||||
pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
|
||||
time: MonotonicTime,
|
||||
func: F,
|
||||
arg: T,
|
||||
sender: Sender<M>,
|
||||
scheduler_queue: &Mutex<SchedulerQueue>,
|
||||
) -> Result<(), CancellationError> {
|
||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
||||
if scheduler_queue.delete(scheduler_key.0) {
|
||||
return Ok(());
|
||||
}
|
||||
) -> EventKey
|
||||
where
|
||||
M: Model,
|
||||
F: for<'a> InputFn<'a, M, T, S>,
|
||||
T: Send + Clone + 'static,
|
||||
{
|
||||
let channel_id = sender.channel_id();
|
||||
|
||||
Err(CancellationError {})
|
||||
let event_key = EventKey::new();
|
||||
let local_event_key = event_key.clone();
|
||||
|
||||
let fut = async move {
|
||||
let _ = sender
|
||||
.send(
|
||||
move |model: &mut M,
|
||||
scheduler,
|
||||
recycle_box: RecycleBox<()>|
|
||||
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
|
||||
let fut = async move {
|
||||
if local_event_key.process_event() {
|
||||
func.call(model, arg, scheduler).await;
|
||||
}
|
||||
};
|
||||
|
||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||
},
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
// Implementation note: we end up with two atomic references to the event
|
||||
// key stored inside the event future: one was moved above to the future
|
||||
// itself and the other one is created below via cloning and stored
|
||||
// separately in the `KeyedEventFuture`. This is not ideal as we could
|
||||
// theoretically spare on atomic reference counting by storing a single
|
||||
// reference, but this would likely require some tricky `unsafe`, not least
|
||||
// because the inner future sent to the mailbox outlives the
|
||||
// `KeyedEventFuture`.
|
||||
let fut = Box::new(KeyedEventFuture::new(fut, event_key.clone()));
|
||||
|
||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
||||
scheduler_queue.insert((time, channel_id), fut);
|
||||
|
||||
event_key
|
||||
}
|
||||
|
||||
/// The future of an event which scheduling may be cancelled by the user.
|
||||
pub(crate) trait EventFuture: Future {
|
||||
/// Whether the scheduling of this event was cancelled.
|
||||
fn is_cancelled(&self) -> bool;
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// Future associated to a regular event that cannot be cancelled.
|
||||
pub(crate) struct UnkeyedEventFuture<F> {
|
||||
#[pin]
|
||||
fut: F,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> UnkeyedEventFuture<F> {
|
||||
/// Creates a new `EventFuture`.
|
||||
pub(crate) fn new(fut: F) -> Self {
|
||||
Self { fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Future for UnkeyedEventFuture<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
type Output = F::Output;
|
||||
|
||||
#[inline(always)]
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.project().fut.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> EventFuture for UnkeyedEventFuture<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
fn is_cancelled(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// Future associated to a keyed event that can be cancelled.
|
||||
pub(crate) struct KeyedEventFuture<F> {
|
||||
event_key: EventKey,
|
||||
#[pin]
|
||||
fut: F,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> KeyedEventFuture<F> {
|
||||
/// Creates a new `EventFuture`.
|
||||
pub(crate) fn new(fut: F, event_key: EventKey) -> Self {
|
||||
Self { event_key, fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Future for KeyedEventFuture<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
type Output = F::Output;
|
||||
|
||||
#[inline(always)]
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.project().fut.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> EventFuture for KeyedEventFuture<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
fn is_cancelled(&self) -> bool {
|
||||
self.event_key.event_is_cancelled()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user