1
0
forked from ROMEO/nexosim

Finalize the Context and BuildContext API

The API style is now more uniform: both are passed by mutable ref, and
only expose accessors. Additionally, the methods that were initially
accessed through the scheduler field are now directly implemented on
`Context`.
This commit is contained in:
Serge Barral
2024-11-15 16:00:18 +01:00
parent b1896dbde9
commit f4686af49a
21 changed files with 939 additions and 1054 deletions

View File

@ -53,7 +53,7 @@ impl<M: 'static> Inner<M> {
}
/// A receiver which can asynchronously execute `async` message that take an
/// argument of type `&mut M` and an optional `&Context<M>` argument.
/// argument of type `&mut M` and an optional `&mut Context<M>` argument.
pub(crate) struct Receiver<M> {
/// Shared data.
inner: Arc<Inner<M>>,
@ -105,7 +105,7 @@ impl<M: Model> Receiver<M> {
pub(crate) async fn recv(
&mut self,
model: &mut M,
context: &Context<M>,
cx: &mut Context<M>,
) -> Result<(), RecvError> {
let msg = unsafe {
self.inner
@ -124,7 +124,7 @@ impl<M: Model> Receiver<M> {
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));
// Take the message to obtain a boxed future.
let fut = msg.call_once(model, context, self.future_box.take().unwrap());
let fut = msg.call_once(model, cx, self.future_box.take().unwrap());
// Now that the message was taken, drop `msg` to free its slot
// in the queue and signal to one awaiting sender that a slot is
@ -207,7 +207,7 @@ impl<M: Model> Sender<M> {
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut Context<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send
@ -364,7 +364,7 @@ impl<M> fmt::Debug for Sender<M> {
}
/// A closure that can be called once to create a future boxed in a `RecycleBox`
/// from an `&mut M`, a `&Context<M>` and an empty `RecycleBox`.
/// from an `&mut M`, a `&mut Context<M>` and an empty `RecycleBox`.
///
/// This is basically a workaround to emulate an `FnOnce` with the equivalent of
/// an `FnMut` so that it is possible to call it as a `dyn` trait stored in a
@ -380,7 +380,7 @@ trait MessageFn<M: Model>: Send {
fn call_once<'a>(
&mut self,
model: &'a mut M,
context: &'a Context<M>,
cx: &'a mut Context<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>;
}
@ -402,7 +402,7 @@ impl<F, M: Model> MessageFn<M> for MessageFnOnce<F, M>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut Context<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send,
@ -410,12 +410,12 @@ where
fn call_once<'a>(
&mut self,
model: &'a mut M,
context: &'a Context<M>,
cx: &'a mut Context<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a> {
let closure = self.msg_fn.take().unwrap();
(closure)(model, context, recycle_box)
(closure)(model, cx, recycle_box)
}
}

View File

@ -16,7 +16,7 @@ impl Executor {
///
/// The maximum number of threads is set with the `pool_size` parameter.
pub fn new(pool_size: usize) -> Self {
let dummy_context = crate::executor::SimulationContext {
let dummy_cx = crate::executor::SimulationContext {
#[cfg(feature = "tracing")]
time_reader: crate::util::sync_cell::SyncCell::new(
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
@ -25,7 +25,7 @@ impl Executor {
};
Self(executor::Executor::new_multi_threaded(
pool_size,
dummy_context,
dummy_cx,
executor::Signal::new(),
))
}

View File

@ -45,7 +45,7 @@
//! * _input ports_, which are synchronous or asynchronous methods that
//! implement the [`InputFn`](ports::InputFn) trait and take an `&mut self`
//! argument, a message argument, and an optional
//! [`&Context`](model::Context) argument,
//! [`&mut Context`](model::Context) argument,
//! * _replier ports_, which are similar to input ports but implement the
//! [`ReplierFn`](ports::ReplierFn) trait and return a reply.
//!
@ -118,8 +118,8 @@
//! pub output: Output<f64>,
//! }
//! impl Delay {
//! pub fn input(&mut self, value: f64, context: &Context<Self>) {
//! context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! pub fn input(&mut self, value: f64, cx: &mut Context<Self>) {
//! cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! }
//!
//! async fn send(&mut self, value: f64) {
@ -189,8 +189,8 @@
//! # pub output: Output<f64>,
//! # }
//! # impl Delay {
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # pub fn input(&mut self, value: f64, cx: &mut Context<Self>) {
//! # cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # }
//! # async fn send(&mut self, value: f64) { // this method can be private
//! # self.output.send(value).await;
@ -290,8 +290,8 @@
//! # pub output: Output<f64>,
//! # }
//! # impl Delay {
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # pub fn input(&mut self, value: f64, cx: &mut Context<Self>) {
//! # cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # }
//! # async fn send(&mut self, value: f64) { // this method can be private
//! # self.output.send(value).await;

View File

@ -56,7 +56,7 @@
//! impl Model for MyModel {
//! async fn init(
//! mut self,
//! ctx: &Context<Self>
//! ctx: &mut Context<Self>
//! ) -> InitializedModel<Self> {
//! println!("...initialization...");
//!
@ -173,10 +173,10 @@
//! ```ignore
//! fn(&mut self) // argument elided, implies `T=()`
//! fn(&mut self, T)
//! fn(&mut self, T, &Context<Self>)
//! fn(&mut self, T, &mut Context<Self>)
//! async fn(&mut self) // argument elided, implies `T=()`
//! async fn(&mut self, T)
//! async fn(&mut self, T, &Context<Self>)
//! async fn(&mut self, T, &mut Context<Self>)
//! where
//! Self: Model,
//! T: Clone + Send + 'static,
@ -193,7 +193,7 @@
//! ```ignore
//! async fn(&mut self) -> R // argument elided, implies `T=()`
//! async fn(&mut self, T) -> R
//! async fn(&mut self, T, &Context<Self>) -> R
//! async fn(&mut self, T, &mut Context<Self>) -> R
//! where
//! Self: Model,
//! T: Clone + Send + 'static,
@ -219,7 +219,7 @@
//! // ...
//! }
//! impl MyModel {
//! pub fn my_input(&mut self, input: String, context: &Context<Self>) {
//! pub fn my_input(&mut self, input: String, cx: &mut Context<Self>) {
//! // ...
//! }
//! pub async fn my_replier(&mut self, request: u32) -> bool { // context argument elided
@ -273,7 +273,7 @@ pub trait Model: Sized + Send + 'static {
/// impl Model for MyModel {
/// async fn init(
/// self,
/// context: &Context<Self>
/// cx: &mut Context<Self>
/// ) -> InitializedModel<Self> {
/// println!("...initialization...");
///
@ -281,7 +281,7 @@ pub trait Model: Sized + Send + 'static {
/// }
/// }
/// ```
fn init(self, _: &Context<Self>) -> impl Future<Output = InitializedModel<Self>> + Send {
fn init(self, _: &mut Context<Self>) -> impl Future<Output = InitializedModel<Self>> + Send {
async { self.into() }
}
}
@ -322,7 +322,7 @@ pub trait ProtoModel: Sized {
/// This method is invoked when the
/// [`SimInit::add_model()`](crate::simulation::SimInit::add_model) or
/// [`BuildContext::add_submodel`] method is called.
fn build(self, ctx: &mut BuildContext<Self>) -> Self::Model;
fn build(self, cx: &mut BuildContext<Self>) -> Self::Model;
}
// Every model can be used as a prototype for itself.

View File

@ -1,7 +1,10 @@
use std::fmt;
use std::time::Duration;
use crate::executor::{Executor, Signal};
use crate::simulation::{self, LocalScheduler, Mailbox, SchedulerInner};
use crate::ports::InputFn;
use crate::simulation::{self, ActionKey, Address, GlobalScheduler, Mailbox, SchedulingError};
use crate::time::{Deadline, MonotonicTime};
use super::{Model, ProtoModel};
@ -22,7 +25,7 @@ use super::{Model, ProtoModel};
/// fn self_scheduling_method<'a>(
/// &'a mut self,
/// arg: MyEventType,
/// context: &'a Context<Self>
/// cx: &'a mut Context<Self>
/// ) -> impl Future<Output=()> + Send + 'a {
/// async move {
/// /* implementation */
@ -49,14 +52,14 @@ use super::{Model, ProtoModel};
///
/// impl DelayedGreeter {
/// // Triggers a greeting on the output port after some delay [input port].
/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context<Self>) {
/// let time = context.scheduler.time();
/// pub async fn greet_with_delay(&mut self, delay: Duration, cx: &mut Context<Self>) {
/// let time = cx.time();
/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time);
///
/// if delay.is_zero() {
/// self.msg_out.send(greeting).await;
/// } else {
/// context.scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap();
/// cx.schedule_event(delay, Self::send_msg, greeting).unwrap();
/// }
/// }
///
@ -72,26 +75,293 @@ use super::{Model, ProtoModel};
// https://github.com/rust-lang/rust/issues/78649
pub struct Context<M: Model> {
name: String,
/// Local scheduler.
pub scheduler: LocalScheduler<M>,
scheduler: GlobalScheduler,
address: Address<M>,
origin_id: usize,
}
impl<M: Model> Context<M> {
/// Creates a new local context.
pub(crate) fn new(name: String, scheduler: LocalScheduler<M>) -> Self {
Self { name, scheduler }
pub(crate) fn new(name: String, scheduler: GlobalScheduler, address: Address<M>) -> Self {
// The only requirement for the origin ID is that it must be (i)
// specific to each model and (ii) different from 0 (which is reserved
// for the global scheduler). The channel ID of the model mailbox
// fulfills this requirement.
let origin_id = address.0.channel_id();
Self {
name,
scheduler,
address,
origin_id,
}
}
/// Returns the model instance name.
/// Returns the fully qualified model instance name.
///
/// The fully qualified name is made of the unqualified model name, if
/// relevant prepended by the dot-separated names of all parent models.
pub fn name(&self) -> &str {
&self.name
}
/// Returns the current simulation time.
pub fn time(&self) -> MonotonicTime {
self.scheduler.time()
}
/// Schedules an event at a future time on this model.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
///
/// // A timer.
/// pub struct Timer {}
///
/// impl Timer {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: Duration, cx: &mut Context<Self>) {
/// if cx.schedule_event(setting, Self::ring, ()).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Rings [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg");
/// }
/// }
///
/// impl Model for Timer {}
/// ```
pub fn schedule_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
self.scheduler
.schedule_event_from(deadline, func, arg, &self.address, self.origin_id)
}
/// Schedules a cancellable event at a future time on this model and returns
/// an action key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock that can be cancelled.
/// #[derive(Default)]
/// pub struct CancellableAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context<Self>) {
/// self.cancel();
/// match cx.schedule_keyed_event(setting, Self::ring, ()) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels the current alarm, if any [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Rings the alarm [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg!");
/// }
/// }
///
/// impl Model for CancellableAlarmClock {}
/// ```
pub fn schedule_keyed_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = self.scheduler.schedule_keyed_event_from(
deadline,
func,
arg,
&self.address,
self.origin_id,
)?;
Ok(event_key)
}
/// Schedules a periodically recurring event on this model at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz.
/// pub struct BeepingAlarmClock {}
///
/// impl BeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context<Self>) {
/// if cx.schedule_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for BeepingAlarmClock {}
/// ```
pub fn schedule_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
self.scheduler.schedule_periodic_event_from(
deadline,
period,
func,
arg,
&self.address,
self.origin_id,
)
}
/// Schedules a cancellable, periodically recurring event on this model at a
/// future time and returns an action key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
/// // stopped after it sets off.
/// #[derive(Default)]
/// pub struct CancellableBeepingAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableBeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context<Self>) {
/// self.cancel();
/// match cx.schedule_keyed_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels or stops the alarm [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for CancellableBeepingAlarmClock {}
/// ```
pub fn schedule_keyed_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = self.scheduler.schedule_keyed_periodic_event_from(
deadline,
period,
func,
arg,
&self.address,
self.origin_id,
)?;
Ok(event_key)
}
}
impl<M: Model> fmt::Debug for Context<M> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Context").finish_non_exhaustive()
f.debug_struct("Context")
.field("name", &self.name())
.field("time", &self.time())
.field("address", &self.address)
.field("origin_id", &self.origin_id)
.finish_non_exhaustive()
}
}
@ -153,7 +423,7 @@ impl<M: Model> fmt::Debug for Context<M> {
///
/// fn build(
/// self,
/// ctx: &mut BuildContext<Self>)
/// cx: &mut BuildContext<Self>)
/// -> MultiplyBy4 {
/// let mut mult = MultiplyBy4 { forward: Output::default() };
/// let mut submult1 = MultiplyBy2::default();
@ -170,8 +440,8 @@ impl<M: Model> fmt::Debug for Context<M> {
/// submult1.output.connect(MultiplyBy2::input, &submult2_mbox);
///
/// // Add the submodels to the simulation.
/// ctx.add_submodel(submult1, submult1_mbox, "submultiplier 1");
/// ctx.add_submodel(submult2, submult2_mbox, "submultiplier 2");
/// cx.add_submodel(submult1, submult1_mbox, "submultiplier 1");
/// cx.add_submodel(submult2, submult2_mbox, "submultiplier 2");
///
/// mult
/// }
@ -180,10 +450,9 @@ impl<M: Model> fmt::Debug for Context<M> {
/// ```
#[derive(Debug)]
pub struct BuildContext<'a, P: ProtoModel> {
/// Mailbox of the model.
pub mailbox: &'a Mailbox<P::Model>,
mailbox: &'a Mailbox<P::Model>,
name: &'a String,
scheduler: &'a SchedulerInner,
scheduler: &'a GlobalScheduler,
executor: &'a Executor,
abort_signal: &'a Signal,
model_names: &'a mut Vec<String>,
@ -194,7 +463,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
pub(crate) fn new(
mailbox: &'a Mailbox<P::Model>,
name: &'a String,
scheduler: &'a SchedulerInner,
scheduler: &'a GlobalScheduler,
executor: &'a Executor,
abort_signal: &'a Signal,
model_names: &'a mut Vec<String>,
@ -217,6 +486,11 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
self.name
}
/// Returns a handle to the model's mailbox.
pub fn address(&self) -> Address<P::Model> {
self.mailbox.address()
}
/// Adds a sub-model to the simulation bench.
///
/// The `name` argument needs not be unique. It is appended to that of the

View File

@ -69,10 +69,10 @@
//! impl ProtoModel for ProtoParentModel {
//! type Model = ParentModel;
//!
//! fn build(self, ctx: &mut BuildContext<Self>) -> ParentModel {
//! fn build(self, cx: &mut BuildContext<Self>) -> ParentModel {
//! let mut child = ChildModel::new(self.output.clone());
//!
//! ctx.add_submodel(child, Mailbox::new(), "child");
//! cx.add_submodel(child, Mailbox::new(), "child");
//!
//! ParentModel { output: self.output }
//! }

View File

@ -14,9 +14,9 @@ use super::markers;
///
/// ```ignore
/// FnOnce(&mut M, T)
/// FnOnce(&mut M, T, &Context<M>)
/// FnOnce(&mut M, T, &mut Context<M>)
/// async fn(&mut M, T)
/// async fn(&mut M, T, &Context<M>)
/// async fn(&mut M, T, &mut Context<M>)
/// where
/// M: Model
/// ```
@ -34,7 +34,7 @@ pub trait InputFn<'a, M: Model, T, S>: Send + 'static {
type Future: Future<Output = ()> + Send + 'a;
/// Calls the method.
fn call(self, model: &'a mut M, arg: T, context: &'a Context<M>) -> Self::Future;
fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context<M>) -> Self::Future;
}
impl<'a, M, F> InputFn<'a, M, (), markers::WithoutArguments> for F
@ -44,7 +44,7 @@ where
{
type Future = Ready<()>;
fn call(self, model: &'a mut M, _arg: (), _context: &'a Context<M>) -> Self::Future {
fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context<M>) -> Self::Future {
self(model);
ready(())
@ -58,7 +58,7 @@ where
{
type Future = Ready<()>;
fn call(self, model: &'a mut M, arg: T, _context: &'a Context<M>) -> Self::Future {
fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context<M>) -> Self::Future {
self(model, arg);
ready(())
@ -68,12 +68,12 @@ where
impl<'a, M, T, F> InputFn<'a, M, T, markers::WithContext> for F
where
M: Model,
F: FnOnce(&'a mut M, T, &'a Context<M>) + Send + 'static,
F: FnOnce(&'a mut M, T, &'a mut Context<M>) + Send + 'static,
{
type Future = Ready<()>;
fn call(self, model: &'a mut M, arg: T, context: &'a Context<M>) -> Self::Future {
self(model, arg, context);
fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context<M>) -> Self::Future {
self(model, arg, cx);
ready(())
}
@ -87,7 +87,7 @@ where
{
type Future = Fut;
fn call(self, model: &'a mut M, _arg: (), _context: &'a Context<M>) -> Self::Future {
fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context<M>) -> Self::Future {
self(model)
}
}
@ -100,7 +100,7 @@ where
{
type Future = Fut;
fn call(self, model: &'a mut M, arg: T, _context: &'a Context<M>) -> Self::Future {
fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context<M>) -> Self::Future {
self(model, arg)
}
}
@ -109,12 +109,12 @@ impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithContext> for F
where
M: Model,
Fut: Future<Output = ()> + Send + 'a,
F: FnOnce(&'a mut M, T, &'a Context<M>) -> Fut + Send + 'static,
F: FnOnce(&'a mut M, T, &'a mut Context<M>) -> Fut + Send + 'static,
{
type Future = Fut;
fn call(self, model: &'a mut M, arg: T, context: &'a Context<M>) -> Self::Future {
self(model, arg, context)
fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context<M>) -> Self::Future {
self(model, arg, cx)
}
}
@ -126,7 +126,7 @@ where
///
/// ```ignore
/// async fn(&mut M, T) -> R
/// async fn(&mut M, T, &Context<M>) -> R
/// async fn(&mut M, T, &mut Context<M>) -> R
/// where
/// M: Model
/// ```
@ -143,7 +143,7 @@ pub trait ReplierFn<'a, M: Model, T, R, S>: Send + 'static {
type Future: Future<Output = R> + Send + 'a;
/// Calls the method.
fn call(self, model: &'a mut M, arg: T, context: &'a Context<M>) -> Self::Future;
fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context<M>) -> Self::Future;
}
impl<'a, M, R, Fut, F> ReplierFn<'a, M, (), R, markers::AsyncWithoutArguments> for F
@ -154,7 +154,7 @@ where
{
type Future = Fut;
fn call(self, model: &'a mut M, _arg: (), _context: &'a Context<M>) -> Self::Future {
fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context<M>) -> Self::Future {
self(model)
}
}
@ -167,7 +167,7 @@ where
{
type Future = Fut;
fn call(self, model: &'a mut M, arg: T, _context: &'a Context<M>) -> Self::Future {
fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context<M>) -> Self::Future {
self(model, arg)
}
}
@ -176,11 +176,11 @@ impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithContext> for
where
M: Model,
Fut: Future<Output = R> + Send + 'a,
F: FnOnce(&'a mut M, T, &'a Context<M>) -> Fut + Send + 'static,
F: FnOnce(&'a mut M, T, &'a mut Context<M>) -> Fut + Send + 'static,
{
type Future = Fut;
fn call(self, model: &'a mut M, arg: T, context: &'a Context<M>) -> Self::Future {
self(model, arg, context)
fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context<M>) -> Self::Future {
self(model, arg, cx)
}
}

View File

@ -567,7 +567,7 @@ mod tests {
use futures_executor::block_on;
use crate::channel::Receiver;
use crate::simulation::{Address, LocalScheduler, SchedulerInner};
use crate::simulation::{Address, GlobalScheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
@ -635,14 +635,12 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap();
}
})
})
@ -707,17 +705,15 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async {
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
});
}
})
@ -769,14 +765,12 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
})
@ -856,25 +850,23 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async {
mailbox
.recv(&mut double_model, &dummy_context)
.recv(&mut double_model, &mut dummy_cx)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.recv(&mut double_model, &mut dummy_cx)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.recv(&mut double_model, &mut dummy_cx)
.await
.unwrap();
});

View File

@ -468,7 +468,7 @@ mod tests {
use futures_executor::block_on;
use crate::channel::Receiver;
use crate::simulation::{Address, LocalScheduler, SchedulerInner};
use crate::simulation::{Address, GlobalScheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
@ -536,14 +536,12 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap();
}
})
})
@ -608,17 +606,15 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async {
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
});
}
})
@ -670,14 +666,12 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
})
@ -757,25 +751,23 @@ mod tests {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
let mut dummy_cx = Context::new(
String::new(),
LocalScheduler::new(
SchedulerInner::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async {
mailbox
.recv(&mut double_model, &dummy_context)
.recv(&mut double_model, &mut dummy_cx)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.recv(&mut double_model, &mut dummy_cx)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.recv(&mut double_model, &mut dummy_cx)
.await
.unwrap();
});

View File

@ -124,11 +124,11 @@ mod sim_init;
use scheduler::SchedulerQueue;
pub(crate) use scheduler::{
KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner,
GlobalScheduler, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction,
};
pub use mailbox::{Address, Mailbox};
pub use scheduler::{Action, ActionKey, AutoActionKey, LocalScheduler, Scheduler, SchedulingError};
pub use scheduler::{Action, ActionKey, AutoActionKey, Scheduler, SchedulingError};
pub use sim_init::SimInit;
use std::any::Any;
@ -165,7 +165,7 @@ thread_local! { pub(crate) static CURRENT_MODEL_ID: Cell<ModelId> = const { Cell
/// A [`Simulation`] object also manages an event scheduling queue and
/// simulation time. The scheduling queue can be accessed from the simulation
/// itself, but also from models via the optional
/// [`&Context`](crate::model::Context) argument of input and replier port
/// [`&mut Context`](crate::model::Context) argument of input and replier port
/// methods. Likewise, simulation time can be accessed with the
/// [`Simulation::time()`] method, or from models with the
/// [`LocalScheduler::time()`](crate::simulation::LocalScheduler::time) method.
@ -720,7 +720,7 @@ pub(crate) fn add_model<P: ProtoModel>(
model: P,
mailbox: Mailbox<P::Model>,
name: String,
scheduler: SchedulerInner,
scheduler: GlobalScheduler,
executor: &Executor,
abort_signal: &Signal,
model_names: &mut Vec<String>,
@ -728,7 +728,7 @@ pub(crate) fn add_model<P: ProtoModel>(
#[cfg(feature = "tracing")]
let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name);
let mut build_context = BuildContext::new(
let mut build_cx = BuildContext::new(
&mailbox,
&name,
&scheduler,
@ -736,15 +736,15 @@ pub(crate) fn add_model<P: ProtoModel>(
abort_signal,
model_names,
);
let model = model.build(&mut build_context);
let model = model.build(&mut build_cx);
let address = mailbox.address();
let mut receiver = mailbox.0;
let abort_signal = abort_signal.clone();
let context = Context::new(name.clone(), LocalScheduler::new(scheduler, address));
let mut cx = Context::new(name.clone(), scheduler, address);
let fut = async move {
let mut model = model.init(&context).await.0;
while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {}
let mut model = model.init(&mut cx).await.0;
while !abort_signal.is_set() && receiver.recv(&mut model, &mut cx).await.is_ok() {}
};
let model_id = ModelId::new(model_names.len());

View File

@ -1,37 +1,34 @@
//! Scheduling functions and types.
mod inner;
use std::error::Error;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, ptr};
use pin_project::pin_project;
use recycle_box::{coerce_box, RecycleBox};
use crate::channel::Sender;
use crate::executor::Executor;
use crate::model::Model;
use crate::ports::InputFn;
use crate::simulation::Address;
use crate::time::{AtomicTimeReader, Deadline, MonotonicTime};
use inner::ActionInner;
pub(crate) use inner::{
KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner,
SchedulerQueue,
};
use crate::util::priority_queue::PriorityQueue;
const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0;
/// A global scheduler.
#[derive(Clone)]
pub struct Scheduler(SchedulerInner);
pub struct Scheduler(GlobalScheduler);
impl Scheduler {
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
Self(SchedulerInner::new(scheduler_queue, time))
Self(GlobalScheduler::new(scheduler_queue, time))
}
/// Returns the current simulation time.
@ -193,311 +190,6 @@ impl fmt::Debug for Scheduler {
}
}
/// Local scheduler.
pub struct LocalScheduler<M: Model> {
scheduler: SchedulerInner,
address: Address<M>,
origin_id: usize,
}
impl<M: Model> LocalScheduler<M> {
pub(crate) fn new(scheduler: SchedulerInner, address: Address<M>) -> Self {
// The only requirement for the origin ID is that it must be (i)
// specific to each model and (ii) different from 0 (which is reserved
// for the global scheduler). The channel ID of the model mailbox
// fulfills this requirement.
let origin_id = address.0.channel_id();
Self {
scheduler,
address,
origin_id,
}
}
/// Returns the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::Model;
/// use asynchronix::simulation::LocalScheduler;
/// use asynchronix::time::MonotonicTime;
///
/// fn is_third_millenium<M: Model>(scheduler: &LocalScheduler<M>) -> bool {
/// let time = scheduler.time();
/// time >= MonotonicTime::new(978307200, 0).unwrap()
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
/// }
/// ```
pub fn time(&self) -> MonotonicTime {
self.scheduler.time()
}
/// Schedules an event at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
///
/// // A timer.
/// pub struct Timer {}
///
/// impl Timer {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: Duration, context: &Context<Self>) {
/// if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Rings [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg");
/// }
/// }
///
/// impl Model for Timer {}
/// ```
pub fn schedule_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
self.scheduler
.schedule_event_from(deadline, func, arg, &self.address, self.origin_id)
}
/// Schedules a cancellable event at a future time and returns an action
/// key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock that can be cancelled.
/// #[derive(Default)]
/// pub struct CancellableAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// self.cancel();
/// match context.scheduler.schedule_keyed_event(setting, Self::ring, ()) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels the current alarm, if any [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Rings the alarm [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg!");
/// }
/// }
///
/// impl Model for CancellableAlarmClock {}
/// ```
pub fn schedule_keyed_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = self.scheduler.schedule_keyed_event_from(
deadline,
func,
arg,
&self.address,
self.origin_id,
)?;
Ok(event_key)
}
/// Schedules a periodically recurring event at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz.
/// pub struct BeepingAlarmClock {}
///
/// impl BeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// if context.scheduler.schedule_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for BeepingAlarmClock {}
/// ```
pub fn schedule_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
self.scheduler.schedule_periodic_event_from(
deadline,
period,
func,
arg,
&self.address,
self.origin_id,
)
}
/// Schedules a cancellable, periodically recurring event at a future time
/// and returns an action key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
/// // stopped after it sets off.
/// #[derive(Default)]
/// pub struct CancellableBeepingAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableBeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// self.cancel();
/// match context.scheduler.schedule_keyed_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels or stops the alarm [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for CancellableBeepingAlarmClock {}
/// ```
pub fn schedule_keyed_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = self.scheduler.schedule_keyed_periodic_event_from(
deadline,
period,
func,
arg,
&self.address,
self.origin_id,
)?;
Ok(event_key)
}
}
impl<M: Model> Clone for LocalScheduler<M> {
fn clone(&self) -> Self {
Self {
scheduler: self.scheduler.clone(),
address: self.address.clone(),
origin_id: self.origin_id,
}
}
}
impl<M: Model> fmt::Debug for LocalScheduler<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalScheduler")
.field("time", &self.time())
.field("address", &self.address)
.field("origin_id", &self.origin_id)
.finish_non_exhaustive()
}
}
/// Managed handle to a scheduled action.
///
/// An `AutoActionKey` is a managed handle to a scheduled action that cancels
@ -638,3 +330,492 @@ impl fmt::Debug for Action {
f.debug_struct("SchedulableEvent").finish_non_exhaustive()
}
}
/// Alias for the scheduler queue type.
///
/// Why use both time and origin ID as the key? The short answer is that this
/// allows to preserve the relative ordering of events which have the same
/// origin (where the origin is either a model instance or the global
/// scheduler). The preservation of this ordering is implemented by the event
/// loop, which aggregate events with the same origin into single sequential
/// futures, thus ensuring that they are not executed concurrently.
pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>;
/// Internal implementation of the global scheduler.
#[derive(Clone)]
pub(crate) struct GlobalScheduler {
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
}
impl GlobalScheduler {
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
Self {
scheduler_queue,
time,
}
}
/// Returns the current simulation time.
pub(crate) fn time(&self) -> MonotonicTime {
// We use `read` rather than `try_read` because the scheduler can be
// sent to another thread than the simulator's and could thus
// potentially see a torn read if the simulator increments time
// concurrently. The chances of this happening are very small since
// simulation time is not changed frequently.
self.time.read()
}
/// Schedules an action identified by its origin at a future time.
pub(crate) fn schedule_from(
&self,
deadline: impl Deadline,
action: Action,
origin_id: usize,
) -> Result<(), SchedulingError> {
// The scheduler queue must always be locked when reading the time,
// otherwise the following race could occur:
// 1) this method reads the time and concludes that it is not too late
// to schedule the action,
// 2) the `Simulation` object takes the lock, increments simulation time
// and runs the simulation step,
// 3) this method takes the lock and schedules the now-outdated action.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(())
}
/// Schedules an event identified by its origin at a future time.
pub(crate) fn schedule_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let sender = address.into().0;
let action = Action::new(OnceAction::new(process_event(func, arg, sender)));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(())
}
/// Schedules a cancellable event identified by its origin at a future time
/// and returns an event key.
pub(crate) fn schedule_keyed_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = ActionKey::new();
let sender = address.into().0;
let action = Action::new(KeyedOnceAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
event_key.clone(),
));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(event_key)
}
/// Schedules a periodically recurring event identified by its origin at a
/// future time.
pub(crate) fn schedule_periodic_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let sender = address.into().0;
let action = Action::new(PeriodicAction::new(
|| process_event(func, arg, sender),
period,
));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(())
}
/// Schedules a cancellable, periodically recurring event identified by its
/// origin at a future time and returns an event key.
pub(crate) fn schedule_keyed_periodic_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = ActionKey::new();
let sender = address.into().0;
let action = Action::new(KeyedPeriodicAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
period,
event_key.clone(),
));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(event_key)
}
}
impl fmt::Debug for GlobalScheduler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SchedulerInner")
.field("time", &self.time())
.finish_non_exhaustive()
}
}
/// Trait abstracting over the inner type of an action.
pub(crate) trait ActionInner: Send + 'static {
/// Reports whether the action was cancelled.
fn is_cancelled(&self) -> bool;
/// If this is a periodic action, returns a boxed clone of this action and
/// its repetition period; otherwise returns `None`.
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)>;
/// Returns a boxed future that performs the action.
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
/// Spawns the future that performs the action onto the provided executor.
///
/// This method is typically more efficient that spawning the boxed future
/// from `into_future` since it can directly spawn the unboxed future.
fn spawn_and_forget(self: Box<Self>, executor: &Executor);
}
/// An object that can be converted to a future performing a single
/// non-cancellable action.
///
/// Note that this particular action is in fact already a future: since the
/// future cannot be cancelled and the action does not need to be cloned,
/// there is no need to defer the construction of the future. This makes
/// `into_future` a trivial cast, which saves a boxing operation.
#[pin_project]
pub(crate) struct OnceAction<F> {
#[pin]
fut: F,
}
impl<F> OnceAction<F>
where
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `OnceAction`.
pub(crate) fn new(fut: F) -> Self {
OnceAction { fut }
}
}
impl<F> Future for OnceAction<F>
where
F: Future,
{
type Output = F::Output;
#[inline(always)]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
impl<F> ActionInner for OnceAction<F>
where
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
false
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
None
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// No need for boxing, type coercion is enough here.
Box::into_pin(self)
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget(*self);
}
}
/// An object that can be converted to a future performing a non-cancellable,
/// periodic action.
pub(crate) struct PeriodicAction<G, F>
where
G: (FnOnce() -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// A clonable generator for the associated future.
gen: G,
/// The action repetition period.
period: Duration,
}
impl<G, F> PeriodicAction<G, F>
where
G: (FnOnce() -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `PeriodicAction`.
pub(crate) fn new(gen: G, period: Duration) -> Self {
Self { gen, period }
}
}
impl<G, F> ActionInner for PeriodicAction<G, F>
where
G: (FnOnce() -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
false
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
let event = Box::new(Self::new(self.gen.clone(), self.period));
Some((event, self.period))
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)())
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)());
}
}
/// An object that can be converted to a future performing a single, cancellable
/// action.
pub(crate) struct KeyedOnceAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// A generator for the associated future.
gen: G,
/// The event cancellation key.
event_key: ActionKey,
}
impl<G, F> KeyedOnceAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `KeyedOnceAction`.
pub(crate) fn new(gen: G, event_key: ActionKey) -> Self {
Self { gen, event_key }
}
}
impl<G, F> ActionInner for KeyedOnceAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
self.event_key.is_cancelled()
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
None
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)(self.event_key))
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)(self.event_key));
}
}
/// An object that can be converted to a future performing a periodic,
/// cancellable action.
pub(crate) struct KeyedPeriodicAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// A clonable generator for associated future.
gen: G,
/// The repetition period.
period: Duration,
/// The event cancellation key.
event_key: ActionKey,
}
impl<G, F> KeyedPeriodicAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `KeyedPeriodicAction`.
pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self {
Self {
gen,
period,
event_key,
}
}
}
impl<G, F> ActionInner for KeyedPeriodicAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
self.event_key.is_cancelled()
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
let event = Box::new(Self::new(
self.gen.clone(),
self.period,
self.event_key.clone(),
));
Some((event, self.period))
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)(self.event_key))
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)(self.event_key));
}
}
/// Asynchronously sends a non-cancellable event to a model input.
pub(crate) async fn process_event<M, F, T, S>(func: F, arg: T, sender: Sender<M>)
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + 'static,
{
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
}
/// Asynchronously sends a cancellable event to a model input.
pub(crate) async fn send_keyed_event<M, F, T, S>(
event_key: ActionKey,
func: F,
arg: T,
sender: Sender<M>,
) where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = async move {
// Only perform the call if the event wasn't cancelled.
if !event_key.is_cancelled() {
func.call(model, arg, scheduler).await;
}
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
}

View File

@ -1,508 +0,0 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use pin_project::pin_project;
use recycle_box::{coerce_box, RecycleBox};
use crate::channel::Sender;
use crate::executor::Executor;
use crate::model::Model;
use crate::ports::InputFn;
use crate::simulation::Address;
use crate::time::{AtomicTimeReader, Deadline, MonotonicTime};
use crate::util::priority_queue::PriorityQueue;
use super::{Action, ActionKey, SchedulingError};
/// Alias for the scheduler queue type.
///
/// Why use both time and origin ID as the key? The short answer is that this
/// allows to preserve the relative ordering of events which have the same
/// origin (where the origin is either a model instance or the global
/// scheduler). The preservation of this ordering is implemented by the event
/// loop, which aggregate events with the same origin into single sequential
/// futures, thus ensuring that they are not executed concurrently.
pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>;
/// Internal scheduler implementation.
#[derive(Clone)]
pub(crate) struct SchedulerInner {
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
}
impl SchedulerInner {
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
Self {
scheduler_queue,
time,
}
}
/// Returns the current simulation time.
pub(crate) fn time(&self) -> MonotonicTime {
// We use `read` rather than `try_read` because the scheduler can be
// sent to another thread than the simulator's and could thus
// potentially see a torn read if the simulator increments time
// concurrently. The chances of this happening are very small since
// simulation time is not changed frequently.
self.time.read()
}
/// Schedules an action identified by its origin at a future time.
pub(crate) fn schedule_from(
&self,
deadline: impl Deadline,
action: Action,
origin_id: usize,
) -> Result<(), SchedulingError> {
// The scheduler queue must always be locked when reading the time,
// otherwise the following race could occur:
// 1) this method reads the time and concludes that it is not too late
// to schedule the action,
// 2) the `Simulation` object takes the lock, increments simulation time
// and runs the simulation step,
// 3) this method takes the lock and schedules the now-outdated action.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(())
}
/// Schedules an event identified by its origin at a future time.
pub(crate) fn schedule_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let sender = address.into().0;
let action = Action::new(OnceAction::new(process_event(func, arg, sender)));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(())
}
/// Schedules a cancellable event identified by its origin at a future time
/// and returns an event key.
pub(crate) fn schedule_keyed_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = ActionKey::new();
let sender = address.into().0;
let action = Action::new(KeyedOnceAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
event_key.clone(),
));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(event_key)
}
/// Schedules a periodically recurring event identified by its origin at a
/// future time.
pub(crate) fn schedule_periodic_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let sender = address.into().0;
let action = Action::new(PeriodicAction::new(
|| process_event(func, arg, sender),
period,
));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(())
}
/// Schedules a cancellable, periodically recurring event identified by its
/// origin at a future time and returns an event key.
pub(crate) fn schedule_keyed_periodic_event_from<M, F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
origin_id: usize,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = ActionKey::new();
let sender = address.into().0;
let action = Action::new(KeyedPeriodicAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
period,
event_key.clone(),
));
// The scheduler queue must always be locked when reading the time (see
// `schedule_from`).
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), action);
Ok(event_key)
}
}
impl fmt::Debug for SchedulerInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SchedulerInner")
.field("time", &self.time())
.finish_non_exhaustive()
}
}
/// Trait abstracting over the inner type of an action.
pub(crate) trait ActionInner: Send + 'static {
/// Reports whether the action was cancelled.
fn is_cancelled(&self) -> bool;
/// If this is a periodic action, returns a boxed clone of this action and
/// its repetition period; otherwise returns `None`.
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)>;
/// Returns a boxed future that performs the action.
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
/// Spawns the future that performs the action onto the provided executor.
///
/// This method is typically more efficient that spawning the boxed future
/// from `into_future` since it can directly spawn the unboxed future.
fn spawn_and_forget(self: Box<Self>, executor: &Executor);
}
/// An object that can be converted to a future performing a single
/// non-cancellable action.
///
/// Note that this particular action is in fact already a future: since the
/// future cannot be cancelled and the action does not need to be cloned,
/// there is no need to defer the construction of the future. This makes
/// `into_future` a trivial cast, which saves a boxing operation.
#[pin_project]
pub(crate) struct OnceAction<F> {
#[pin]
fut: F,
}
impl<F> OnceAction<F>
where
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `OnceAction`.
pub(crate) fn new(fut: F) -> Self {
OnceAction { fut }
}
}
impl<F> Future for OnceAction<F>
where
F: Future,
{
type Output = F::Output;
#[inline(always)]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
impl<F> ActionInner for OnceAction<F>
where
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
false
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
None
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// No need for boxing, type coercion is enough here.
Box::into_pin(self)
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget(*self);
}
}
/// An object that can be converted to a future performing a non-cancellable,
/// periodic action.
pub(crate) struct PeriodicAction<G, F>
where
G: (FnOnce() -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// A clonable generator for the associated future.
gen: G,
/// The action repetition period.
period: Duration,
}
impl<G, F> PeriodicAction<G, F>
where
G: (FnOnce() -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `PeriodicAction`.
pub(crate) fn new(gen: G, period: Duration) -> Self {
Self { gen, period }
}
}
impl<G, F> ActionInner for PeriodicAction<G, F>
where
G: (FnOnce() -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
false
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
let event = Box::new(Self::new(self.gen.clone(), self.period));
Some((event, self.period))
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)())
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)());
}
}
/// An object that can be converted to a future performing a single, cancellable
/// action.
pub(crate) struct KeyedOnceAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// A generator for the associated future.
gen: G,
/// The event cancellation key.
event_key: ActionKey,
}
impl<G, F> KeyedOnceAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `KeyedOnceAction`.
pub(crate) fn new(gen: G, event_key: ActionKey) -> Self {
Self { gen, event_key }
}
}
impl<G, F> ActionInner for KeyedOnceAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
self.event_key.is_cancelled()
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
None
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)(self.event_key))
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)(self.event_key));
}
}
/// An object that can be converted to a future performing a periodic,
/// cancellable action.
pub(crate) struct KeyedPeriodicAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// A clonable generator for associated future.
gen: G,
/// The repetition period.
period: Duration,
/// The event cancellation key.
event_key: ActionKey,
}
impl<G, F> KeyedPeriodicAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
/// Constructs a new `KeyedPeriodicAction`.
pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self {
Self {
gen,
period,
event_key,
}
}
}
impl<G, F> ActionInner for KeyedPeriodicAction<G, F>
where
G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn is_cancelled(&self) -> bool {
self.event_key.is_cancelled()
}
fn next(&self) -> Option<(Box<dyn ActionInner>, Duration)> {
let event = Box::new(Self::new(
self.gen.clone(),
self.period,
self.event_key.clone(),
));
Some((event, self.period))
}
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)(self.event_key))
}
fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)(self.event_key));
}
}
/// Asynchronously sends a non-cancellable event to a model input.
pub(crate) async fn process_event<M, F, T, S>(func: F, arg: T, sender: Sender<M>)
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + 'static,
{
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
}
/// Asynchronously sends a cancellable event to a model input.
pub(crate) async fn send_keyed_event<M, F, T, S>(
event_key: ActionKey,
func: F,
arg: T,
sender: Sender<M>,
) where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = async move {
// Only perform the call if the event wasn't cancelled.
if !event_key.is_cancelled() {
func.call(model, arg, scheduler).await;
}
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
}

View File

@ -11,7 +11,7 @@ use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::{
add_model, ExecutionError, Mailbox, SchedulerInner, SchedulerQueue, Signal, Simulation,
add_model, ExecutionError, Mailbox, GlobalScheduler, SchedulerQueue, Signal, Simulation,
};
/// Builder for a multi-threaded, discrete-event simulation.
@ -90,7 +90,7 @@ impl SimInit {
};
self.observers
.push((name.clone(), Box::new(mailbox.0.observer())));
let scheduler = SchedulerInner::new(self.scheduler_queue.clone(), self.time.reader());
let scheduler = GlobalScheduler::new(self.scheduler_queue.clone(), self.time.reader());
add_model(
model,

View File

@ -30,8 +30,8 @@
//! }
//!
//! // Sets an alarm [input port].
//! pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
//! if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() {
//! pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context<Self>) {
//! if cx.schedule_event(setting, Self::ring, ()).is_err() {
//! println!("The alarm clock can only be set for a future time");
//! }
//! }