1
0
forked from ROMEO/nexosim

Implement periodic events

This commit is contained in:
Serge Barral 2023-07-28 16:14:36 +02:00
parent f6c714937a
commit b0f7e69039
5 changed files with 1008 additions and 355 deletions

View File

@ -104,7 +104,7 @@ impl Controller {
if state == WaterSenseState::Empty && self.water_sense == WaterSenseState::NotEmpty { if state == WaterSenseState::Empty && self.water_sense == WaterSenseState::NotEmpty {
// If a brew was ongoing, we must cancel it. // If a brew was ongoing, we must cancel it.
if let Some(key) = self.stop_brew_key.take() { if let Some(key) = self.stop_brew_key.take() {
key.cancel_event(); key.cancel();
self.pump_cmd.send(PumpCommand::Off).await; self.pump_cmd.send(PumpCommand::Off).await;
} }
} }
@ -128,7 +128,7 @@ impl Controller {
self.pump_cmd.send(PumpCommand::Off).await; self.pump_cmd.send(PumpCommand::Off).await;
// Abort the scheduled call to `stop_brew()`. // Abort the scheduled call to `stop_brew()`.
key.cancel_event(); key.cancel();
return; return;
} }
@ -204,7 +204,7 @@ impl Tank {
// schedule a new update. // schedule a new update.
if let Some(state) = self.dynamic_state.take() { if let Some(state) = self.dynamic_state.take() {
// Abort the scheduled call to `set_empty()`. // Abort the scheduled call to `set_empty()`.
state.set_empty_key.cancel_event(); state.set_empty_key.cancel();
// Update the volume, saturating at 0 in case of rounding errors. // Update the volume, saturating at 0 in case of rounding errors.
let time = scheduler.time(); let time = scheduler.time();
@ -237,7 +237,7 @@ impl Tank {
// If the flow rate was non-zero up to now, update the volume. // If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() { if let Some(state) = self.dynamic_state.take() {
// Abort the scheduled call to `set_empty()`. // Abort the scheduled call to `set_empty()`.
state.set_empty_key.cancel_event(); state.set_empty_key.cancel();
// Update the volume, saturating at 0 in case of rounding errors. // Update the volume, saturating at 0 in case of rounding errors.
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();

View File

@ -353,13 +353,11 @@
//! process `M1` before `M3`. //! process `M1` before `M3`.
//! //!
//! The first guarantee (and only the first) also extends to events scheduled //! The first guarantee (and only the first) also extends to events scheduled
//! from a simulation with //! from a simulation with a
//! [`Simulation::schedule_event_in()`](simulation::Simulation::schedule_event_in) //! [`Simulation::schedule_*()`](simulation::Simulation::schedule_event_at)
//! or //! method: if the scheduler contains several events to be delivered at the same
//! [`Simulation::schedule_event_at()`](simulation::Simulation::schedule_event_at): //! time to the same model, these events will always be processed in the order
//! if the scheduler contains several events to be delivered at the same time to //! in which they were scheduled.
//! the same model, these events will always be processed in the order in which
//! they were scheduled.
//! //!
//! [actor_model]: https://en.wikipedia.org/wiki/Actor_model //! [actor_model]: https://en.wikipedia.org/wiki/Actor_model
//! [pony]: https://www.ponylang.io/ //! [pony]: https://www.ponylang.io/

View File

@ -136,8 +136,10 @@ use recycle_box::{coerce_box, RecycleBox};
use crate::executor::Executor; use crate::executor::Executor;
use crate::model::{InputFn, Model, ReplierFn}; use crate::model::{InputFn, Model, ReplierFn};
use crate::time::{self, MonotonicTime, TearableAtomicTime}; use crate::time::{
use crate::time::{EventKey, ScheduledTimeError, SchedulerQueue}; self, EventKey, MonotonicTime, ScheduledEvent, SchedulerQueue, SchedulingError,
TearableAtomicTime,
};
use crate::util::futures::SeqFuture; use crate::util::futures::SeqFuture;
use crate::util::slot; use crate::util::slot;
use crate::util::sync_cell::SyncCell; use crate::util::sync_cell::SyncCell;
@ -152,10 +154,10 @@ use crate::util::sync_cell::SyncCell;
/// A [`Simulation`] object also manages an event scheduling queue and /// A [`Simulation`] object also manages an event scheduling queue and
/// simulation time. The scheduling queue can be accessed from the simulation /// simulation time. The scheduling queue can be accessed from the simulation
/// itself, but also from models via the optional /// itself, but also from models via the optional
/// [`&Scheduler`][crate::time::Scheduler] argument of input and replier port /// [`&Scheduler`][time::Scheduler] argument of input and replier port methods.
/// methods. Likewise, simulation time can be accessed with the /// Likewise, simulation time can be accessed with the [`Simulation::time()`]
/// [`Simulation::time()`] method, or from models with the /// method, or from models with the
/// [`Scheduler::time()`](crate::time::Scheduler::time) method. /// [`Scheduler::time()`](time::Scheduler::time) method.
/// ///
/// Events and queries can be scheduled immediately, *i.e.* for the current /// Events and queries can be scheduled immediately, *i.e.* for the current
/// simulation time, using [`send_event()`](Simulation::send_event) and /// simulation time, using [`send_event()`](Simulation::send_event) and
@ -163,9 +165,8 @@ use crate::util::sync_cell::SyncCell;
/// until all computations triggered by such event or query have completed. In /// until all computations triggered by such event or query have completed. In
/// the case of queries, the response is returned. /// the case of queries, the response is returned.
/// ///
/// Events can also be scheduled at a future simulation time using /// Events can also be scheduled at a future simulation time using one of the
/// [`schedule_event_in()`](Simulation::schedule_event_in) or /// [`schedule_*()`](Simulation::schedule_event_at) method. These methods queue
/// [`schedule_event_at()`](Simulation::schedule_event_at). These methods queue
/// an event without blocking. /// an event without blocking.
/// ///
/// Finally, the [`Simulation`] instance manages simulation time. Calling /// Finally, the [`Simulation`] instance manages simulation time. Calling
@ -229,66 +230,138 @@ impl Simulation {
/// This method may block. Once it returns, it is guaranteed that (i) all /// This method may block. Once it returns, it is guaranteed that (i) all
/// events scheduled up to the specified target time have completed and (ii) /// events scheduled up to the specified target time have completed and (ii)
/// the final simulation time matches the target time. /// the final simulation time matches the target time.
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ScheduledTimeError<()>> { pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), SchedulingError> {
if self.time.read() >= target_time { if self.time.read() >= target_time {
return Err(ScheduledTimeError(())); return Err(SchedulingError::InvalidScheduledTime);
} }
self.step_until_unchecked(target_time); self.step_until_unchecked(target_time);
Ok(()) Ok(())
} }
/// Schedules an event at the lapse of the specified duration. /// Schedules an event at a future time.
/// ///
/// An error is returned if the specified duration is null. /// An error is returned if the specified time is not in the future of the
/// current simulation time.
/// ///
/// Events scheduled for the same time and targeting the same model are /// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order. /// guaranteed to be processed according to the scheduling order.
pub fn schedule_event_in<M, F, T, S>( ///
/// See also: [`time::Scheduler::schedule_event_at`].
pub fn schedule_event_at<M, F, T, S>(
&mut self, &mut self,
duration: Duration, time: MonotonicTime,
func: F, func: F,
arg: T, arg: T,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> Result<(), ScheduledTimeError<T>> ) -> Result<(), SchedulingError>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static, T: Send + Clone + 'static,
S: Send + 'static,
{ {
if duration.is_zero() { if self.time.read() >= time {
return Err(ScheduledTimeError(arg)); return Err(SchedulingError::InvalidScheduledTime);
} }
let time = self.time.read() + duration; time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
Ok(())
}
/// Schedules an event at the lapse of the specified duration.
///
/// An error is returned if the specified delay is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`time::Scheduler::schedule_event_in`].
pub fn schedule_event_in<M, F, T, S>(
&mut self,
delay: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
if delay.is_zero() {
return Err(SchedulingError::InvalidScheduledTime);
}
let time = self.time.read() + delay;
time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
Ok(()) Ok(())
} }
/// Schedules an event at the lapse of the specified duration and returns an /// Schedules a cancellable event at a future time and returns an event key.
/// event key.
/// ///
/// An error is returned if the specified duration is null. /// An error is returned if the specified time is not in the future of the
/// current simulation time.
/// ///
/// Events scheduled for the same time and targeting the same model are /// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order. /// guaranteed to be processed according to the scheduling order.
pub fn schedule_keyed_event_in<M, F, T, S>( ///
/// See also: [`time::Scheduler::schedule_keyed_event_at`].
pub fn schedule_keyed_event_at<M, F, T, S>(
&mut self, &mut self,
duration: Duration, time: MonotonicTime,
func: F, func: F,
arg: T, arg: T,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> Result<EventKey, ScheduledTimeError<T>> ) -> Result<EventKey, SchedulingError>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static, T: Send + Clone + 'static,
S: Send + 'static,
{ {
if duration.is_zero() { if self.time.read() >= time {
return Err(ScheduledTimeError(arg)); return Err(SchedulingError::InvalidScheduledTime);
} }
let time = self.time.read() + duration; let event_key = time::schedule_keyed_event_at_unchecked(
time,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(event_key)
}
/// Schedules a cancellable event at the lapse of the specified duration and
/// returns an event key.
///
/// An error is returned if the specified delay is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`time::Scheduler::schedule_keyed_event_in`].
pub fn schedule_keyed_event_in<M, F, T, S>(
&mut self,
delay: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
if delay.is_zero() {
return Err(SchedulingError::InvalidScheduledTime);
}
let time = self.time.read() + delay;
let event_key = time::schedule_keyed_event_at_unchecked( let event_key = time::schedule_keyed_event_at_unchecked(
time, time,
@ -301,57 +374,168 @@ impl Simulation {
Ok(event_key) Ok(event_key)
} }
/// Schedules an event at a future time. /// Schedules a periodically recurring event at a future time.
/// ///
/// An error is returned if the specified time is not in the future of the /// An error is returned if the specified time is not in the future of the
/// current simulation time. /// current simulation time or if the specified period is null.
/// ///
/// Events scheduled for the same time and targeting the same model are /// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order. /// guaranteed to be processed according to the scheduling order.
pub fn schedule_event_at<M, F, T, S>( ///
/// See also: [`time::Scheduler::schedule_periodic_event_at`].
pub fn schedule_periodic_event_at<M, F, T, S>(
&mut self, &mut self,
time: MonotonicTime, time: MonotonicTime,
period: Duration,
func: F, func: F,
arg: T, arg: T,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> Result<(), ScheduledTimeError<T>> ) -> Result<(), SchedulingError>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static, T: Send + Clone + 'static,
S: Send + 'static,
{ {
if self.time.read() >= time { if self.time.read() >= time {
return Err(ScheduledTimeError(arg)); return Err(SchedulingError::InvalidScheduledTime);
} }
time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
time::schedule_periodic_event_at_unchecked(
time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(()) Ok(())
} }
/// Schedules an event at a future time and returns an event key. /// Schedules a periodically recurring event at the lapse of the specified
/// duration.
/// ///
/// An error is returned if the specified time is not in the future of the /// An error is returned if the specified delay or the specified period are
/// current simulation time. /// null.
/// ///
/// Events scheduled for the same time and targeting the same model are /// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order. /// guaranteed to be processed according to the scheduling order.
pub fn schedule_keyed_event_at<M, F, T, S>( ///
/// See also: [`time::Scheduler::schedule_periodic_event_in`].
pub fn schedule_periodic_event_in<M, F, T, S>(
&mut self, &mut self,
time: MonotonicTime, delay: Duration,
period: Duration,
func: F, func: F,
arg: T, arg: T,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> Result<EventKey, ScheduledTimeError<T>> ) -> Result<(), SchedulingError>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static, T: Send + Clone + 'static,
S: Send + 'static,
{
if delay.is_zero() {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let time = self.time.read() + delay;
time::schedule_periodic_event_at_unchecked(
time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(())
}
/// Schedules a cancellable, periodically recurring event at a future time
/// and returns an event key.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time or if the specified period is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`time::Scheduler::schedule_periodic_keyed_event_at`].
pub fn schedule_periodic_keyed_event_at<M, F, T, S>(
&mut self,
time: MonotonicTime,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{ {
if self.time.read() >= time { if self.time.read() >= time {
return Err(ScheduledTimeError(arg)); return Err(SchedulingError::InvalidScheduledTime);
} }
let event_key = time::schedule_keyed_event_at_unchecked( if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = time::schedule_periodic_keyed_event_at_unchecked(
time, time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(event_key)
}
/// Schedules a cancellable, periodically recurring event at the lapse of
/// the specified duration and returns an event key.
///
/// An error is returned if the specified delay or the specified period are
/// null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`time::Scheduler::schedule_periodic_keyed_event_in`].
pub fn schedule_periodic_keyed_event_in<M, F, T, S>(
&mut self,
delay: Duration,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
if delay.is_zero() {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let time = self.time.read() + delay;
let event_key = time::schedule_periodic_keyed_event_at_unchecked(
time,
period,
func, func,
arg, arg,
address.into().0, address.into().0,
@ -441,9 +625,22 @@ impl Simulation {
/// If at least one event was found that satisfied the time bound, the /// If at least one event was found that satisfied the time bound, the
/// corresponding new simulation time is returned. /// corresponding new simulation time is returned.
fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option<MonotonicTime> { fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option<MonotonicTime> {
// Function pulling the next event. If the event is periodic, it is
// immediately cloned and re-scheduled.
fn pull_next_event(
scheduler_queue: &mut MutexGuard<SchedulerQueue>,
) -> Box<dyn ScheduledEvent> {
let ((time, channel_id), event) = scheduler_queue.pull().unwrap();
if let Some((event_clone, period)) = event.next() {
scheduler_queue.insert((time + period, channel_id), event_clone);
}
event
}
// Closure returning the next key which time stamp is no older than the // Closure returning the next key which time stamp is no older than the
// upper bound, if any. Cancelled events are discarded. // upper bound, if any. Cancelled events are pulled and discarded.
let get_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| { let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
loop { loop {
match scheduler_queue.peek() { match scheduler_queue.peek() {
Some((&k, t)) if k.0 <= upper_time_bound => { Some((&k, t)) if k.0 <= upper_time_bound => {
@ -460,34 +657,32 @@ impl Simulation {
// Move to the next scheduled time. // Move to the next scheduled time.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = get_next_key(&mut scheduler_queue)?; let mut current_key = peek_next_key(&mut scheduler_queue)?;
self.time.write(current_key.0); self.time.write(current_key.0);
loop { loop {
let event = scheduler_queue.pull().unwrap().1; let event = pull_next_event(&mut scheduler_queue);
let mut next_key = peek_next_key(&mut scheduler_queue);
let mut next_key = get_next_key(&mut scheduler_queue);
if next_key != Some(current_key) { if next_key != Some(current_key) {
// Since there are no other events targeting the same mailbox // Since there are no other events targeting the same mailbox
// and the same time, the event is spawned immediately. // and the same time, the event is spawned immediately.
self.executor.spawn_and_forget(Box::into_pin(event)); event.spawn_and_forget(&self.executor);
} else { } else {
// To ensure that their relative order of execution is // To ensure that their relative order of execution is
// preserved, all event targeting the same mailbox are executed // preserved, all event targeting the same mailbox are executed
// sequentially within a single compound future. // sequentially within a single compound future.
let mut event_sequence = SeqFuture::new(); let mut event_sequence = SeqFuture::new();
event_sequence.push(event.into_future());
event_sequence.push(Box::into_pin(event));
loop { loop {
let event = scheduler_queue.pull().unwrap().1; let event = pull_next_event(&mut scheduler_queue);
event_sequence.push(Box::into_pin(event)); event_sequence.push(event.into_future());
next_key = get_next_key(&mut scheduler_queue); next_key = peek_next_key(&mut scheduler_queue);
if next_key != Some(current_key) { if next_key != Some(current_key) {
break; break;
} }
} }
// Spawn a parent event that sequentially polls all events // Spawn a compound future that sequentially polls all events
// targeting the same mailbox. // targeting the same mailbox.
self.executor.spawn_and_forget(event_sequence); self.executor.spawn_and_forget(event_sequence);
} }

View File

@ -51,6 +51,8 @@ mod scheduler;
pub(crate) use monotonic_time::TearableAtomicTime; pub(crate) use monotonic_time::TearableAtomicTime;
pub use monotonic_time::{MonotonicTime, SystemTimeError}; pub use monotonic_time::{MonotonicTime, SystemTimeError};
pub(crate) use scheduler::{ pub(crate) use scheduler::{
schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, SchedulerQueue, schedule_event_at_unchecked, schedule_keyed_event_at_unchecked,
schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked,
ScheduledEvent, SchedulerQueue,
}; };
pub use scheduler::{EventKey, ScheduledTimeError, Scheduler}; pub use scheduler::{EventKey, Scheduler, SchedulingError};

File diff suppressed because it is too large Load Diff