From f4686af49a4fe2a6a20e525ef094d3718205f450 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Fri, 15 Nov 2024 16:00:18 +0100 Subject: [PATCH] Finalize the Context and BuildContext API The API style is now more uniform: both are passed by mutable ref, and only expose accessors. Additionally, the methods that were initially accessed through the scheduler field are now directly implemented on `Context`. --- asynchronix-util/examples/observables.rs | 12 +- asynchronix/examples/assembly.rs | 6 +- asynchronix/examples/espresso_machine.rs | 27 +- asynchronix/examples/external_input.rs | 6 +- asynchronix/examples/stepper_motor.rs | 12 +- asynchronix/src/channel.rs | 18 +- asynchronix/src/dev_hooks.rs | 4 +- asynchronix/src/lib.rs | 14 +- asynchronix/src/model.rs | 16 +- asynchronix/src/model/context.rs | 312 ++++++- asynchronix/src/ports.rs | 4 +- asynchronix/src/ports/input/model_fn.rs | 40 +- asynchronix/src/ports/output/broadcaster.rs | 50 +- asynchronix/src/ports/source/broadcaster.rs | 50 +- asynchronix/src/simulation.rs | 18 +- asynchronix/src/simulation/scheduler.rs | 813 +++++++++++------- asynchronix/src/simulation/scheduler/inner.rs | 508 ----------- asynchronix/src/simulation/sim_init.rs | 4 +- asynchronix/src/time.rs | 4 +- .../tests/integration/model_scheduling.rs | 73 +- .../integration/simulation_scheduling.rs | 2 +- 21 files changed, 939 insertions(+), 1054 deletions(-) delete mode 100644 asynchronix/src/simulation/scheduler/inner.rs diff --git a/asynchronix-util/examples/observables.rs b/asynchronix-util/examples/observables.rs index 6403d18..bbe7a35 100644 --- a/asynchronix-util/examples/observables.rs +++ b/asynchronix-util/examples/observables.rs @@ -126,17 +126,11 @@ impl Processor { } /// Process data for dt milliseconds. - pub async fn process(&mut self, dt: u64, context: &Context) { + pub async fn process(&mut self, dt: u64, cx: &mut Context) { if matches!(self.state.observe(), ModeId::Idle | ModeId::Processing) { self.state .set(State::Processing( - context - .scheduler - .schedule_keyed_event( - Duration::from_millis(dt), - Self::finish_processing, - (), - ) + cx.schedule_keyed_event(Duration::from_millis(dt), Self::finish_processing, ()) .unwrap() .into_auto(), )) @@ -155,7 +149,7 @@ impl Processor { impl Model for Processor { /// Propagate all internal states. - async fn init(mut self, _: &Context) -> InitializedModel { + async fn init(mut self, _: &mut Context) -> InitializedModel { self.state.propagate().await; self.acc.propagate().await; self.elc.propagate().await; diff --git a/asynchronix/examples/assembly.rs b/asynchronix/examples/assembly.rs index 00b0dbb..9b24c7e 100644 --- a/asynchronix/examples/assembly.rs +++ b/asynchronix/examples/assembly.rs @@ -85,7 +85,7 @@ impl Model for MotorAssembly {} impl ProtoModel for ProtoMotorAssembly { type Model = MotorAssembly; - fn build(self, ctx: &mut BuildContext) -> MotorAssembly { + fn build(self, cx: &mut BuildContext) -> MotorAssembly { let mut assembly = MotorAssembly::new(); let mut motor = Motor::new(self.init_pos); let mut driver = Driver::new(1.0); @@ -105,8 +105,8 @@ impl ProtoModel for ProtoMotorAssembly { motor.position = self.position; // Add the submodels to the simulation. - ctx.add_submodel(driver, driver_mbox, "driver"); - ctx.add_submodel(motor, motor_mbox, "motor"); + cx.add_submodel(driver, driver_mbox, "driver"); + cx.add_submodel(motor, motor_mbox, "motor"); assembly } diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index 235dd4f..3a80350 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -120,7 +120,7 @@ impl Controller { } /// Starts brewing or cancels the current brew -- input port. - pub async fn brew_cmd(&mut self, _: (), context: &Context) { + pub async fn brew_cmd(&mut self, _: (), cx: &mut Context) { // If a brew was ongoing, sending the brew command is interpreted as a // request to cancel it. if let Some(key) = self.stop_brew_key.take() { @@ -139,9 +139,7 @@ impl Controller { // Schedule the `stop_brew()` method and turn on the pump. self.stop_brew_key = Some( - context - .scheduler - .schedule_keyed_event(self.brew_time, Self::stop_brew, ()) + cx.schedule_keyed_event(self.brew_time, Self::stop_brew, ()) .unwrap(), ); self.pump_cmd.send(PumpCommand::On).await; @@ -189,7 +187,7 @@ impl Tank { } /// Water volume added [m³] -- input port. - pub async fn fill(&mut self, added_volume: f64, context: &Context) { + pub async fn fill(&mut self, added_volume: f64, cx: &mut Context) { // Ignore zero and negative values. We could also impose a maximum based // on tank capacity. if added_volume <= 0.0 { @@ -207,11 +205,11 @@ impl Tank { state.set_empty_key.cancel(); // Update the volume, saturating at 0 in case of rounding errors. - let time = context.scheduler.time(); + let time = cx.time(); let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); - self.schedule_empty(state.flow_rate, time, context).await; + self.schedule_empty(state.flow_rate, time, cx).await; // There is no need to broadcast the state of the water sense since // it could not be previously `Empty` (otherwise the dynamic state @@ -229,10 +227,10 @@ impl Tank { /// # Panics /// /// This method will panic if the flow rate is negative. - pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context) { + pub async fn set_flow_rate(&mut self, flow_rate: f64, cx: &mut Context) { assert!(flow_rate >= 0.0); - let time = context.scheduler.time(); + let time = cx.time(); // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { @@ -244,7 +242,7 @@ impl Tank { self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); } - self.schedule_empty(flow_rate, time, context).await; + self.schedule_empty(flow_rate, time, cx).await; } /// Schedules a callback for when the tank becomes empty. @@ -257,7 +255,7 @@ impl Tank { &mut self, flow_rate: f64, time: MonotonicTime, - context: &Context, + cx: &mut Context, ) { // Determine when the tank will be empty at the current flow rate. let duration_until_empty = if self.volume == 0.0 { @@ -274,10 +272,7 @@ impl Tank { let duration_until_empty = Duration::from_secs_f64(duration_until_empty); // Schedule the next update. - match context - .scheduler - .schedule_keyed_event(duration_until_empty, Self::set_empty, ()) - { + match cx.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) { Ok(set_empty_key) => { let state = TankDynamicState { last_volume_update: time, @@ -304,7 +299,7 @@ impl Tank { impl Model for Tank { /// Broadcasts the initial state of the water sense. - async fn init(mut self, _: &Context) -> InitializedModel { + async fn init(mut self, _: &mut Context) -> InitializedModel { self.water_sense .send(if self.volume == 0.0 { WaterSenseState::Empty diff --git a/asynchronix/examples/external_input.rs b/asynchronix/examples/external_input.rs index b72bd7b..8828a90 100644 --- a/asynchronix/examples/external_input.rs +++ b/asynchronix/examples/external_input.rs @@ -136,11 +136,9 @@ impl Listener { impl Model for Listener { /// Initialize model. - async fn init(self, context: &Context) -> InitializedModel { + async fn init(self, cx: &mut Context) -> InitializedModel { // Schedule periodic function that processes external events. - context - .scheduler - .schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) + cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) .unwrap(); self.into() diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 470af09..2e21884 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -90,7 +90,7 @@ impl Motor { impl Model for Motor { /// Broadcasts the initial position of the motor. - async fn init(mut self, _: &Context) -> InitializedModel { + async fn init(mut self, _: &mut Context) -> InitializedModel { self.position.send(self.pos).await; self.into() } @@ -126,7 +126,7 @@ impl Driver { } /// Pulse rate (sign = direction) [Hz] -- input port. - pub async fn pulse_rate(&mut self, pps: f64, context: &Context) { + pub async fn pulse_rate(&mut self, pps: f64, cx: &mut Context) { let pps = pps.signum() * pps.abs().clamp(Self::MIN_PPS, Self::MAX_PPS); if pps == self.pps { return; @@ -138,7 +138,7 @@ impl Driver { // Trigger the rotation if the motor is currently idle. Otherwise the // new value will be accounted for at the next pulse. if is_idle { - self.send_pulse((), context).await; + self.send_pulse((), cx).await; } } @@ -149,7 +149,7 @@ impl Driver { fn send_pulse<'a>( &'a mut self, _: (), - context: &'a Context, + cx: &'a mut Context, ) -> impl Future + Send + 'a { async move { let current_out = match self.next_phase { @@ -170,9 +170,7 @@ impl Driver { let pulse_duration = Duration::from_secs_f64(1.0 / self.pps.abs()); // Schedule the next pulse. - context - .scheduler - .schedule_event(pulse_duration, Self::send_pulse, ()) + cx.schedule_event(pulse_duration, Self::send_pulse, ()) .unwrap(); } } diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index d7eb413..ccd4c82 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -53,7 +53,7 @@ impl Inner { } /// A receiver which can asynchronously execute `async` message that take an -/// argument of type `&mut M` and an optional `&Context` argument. +/// argument of type `&mut M` and an optional `&mut Context` argument. pub(crate) struct Receiver { /// Shared data. inner: Arc>, @@ -105,7 +105,7 @@ impl Receiver { pub(crate) async fn recv( &mut self, model: &mut M, - context: &Context, + cx: &mut Context, ) -> Result<(), RecvError> { let msg = unsafe { self.inner @@ -124,7 +124,7 @@ impl Receiver { THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1)); // Take the message to obtain a boxed future. - let fut = msg.call_once(model, context, self.future_box.take().unwrap()); + let fut = msg.call_once(model, cx, self.future_box.take().unwrap()); // Now that the message was taken, drop `msg` to free its slot // in the queue and signal to one awaiting sender that a slot is @@ -207,7 +207,7 @@ impl Sender { where F: for<'a> FnOnce( &'a mut M, - &'a Context, + &'a mut Context, RecycleBox<()>, ) -> RecycleBox + Send + 'a> + Send @@ -364,7 +364,7 @@ impl fmt::Debug for Sender { } /// A closure that can be called once to create a future boxed in a `RecycleBox` -/// from an `&mut M`, a `&Context` and an empty `RecycleBox`. +/// from an `&mut M`, a `&mut Context` and an empty `RecycleBox`. /// /// This is basically a workaround to emulate an `FnOnce` with the equivalent of /// an `FnMut` so that it is possible to call it as a `dyn` trait stored in a @@ -380,7 +380,7 @@ trait MessageFn: Send { fn call_once<'a>( &mut self, model: &'a mut M, - context: &'a Context, + cx: &'a mut Context, recycle_box: RecycleBox<()>, ) -> RecycleBox + Send + 'a>; } @@ -402,7 +402,7 @@ impl MessageFn for MessageFnOnce where F: for<'a> FnOnce( &'a mut M, - &'a Context, + &'a mut Context, RecycleBox<()>, ) -> RecycleBox + Send + 'a> + Send, @@ -410,12 +410,12 @@ where fn call_once<'a>( &mut self, model: &'a mut M, - context: &'a Context, + cx: &'a mut Context, recycle_box: RecycleBox<()>, ) -> RecycleBox + Send + 'a> { let closure = self.msg_fn.take().unwrap(); - (closure)(model, context, recycle_box) + (closure)(model, cx, recycle_box) } } diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 69f02c7..418dc0d 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -16,7 +16,7 @@ impl Executor { /// /// The maximum number of threads is set with the `pool_size` parameter. pub fn new(pool_size: usize) -> Self { - let dummy_context = crate::executor::SimulationContext { + let dummy_cx = crate::executor::SimulationContext { #[cfg(feature = "tracing")] time_reader: crate::util::sync_cell::SyncCell::new( crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH), @@ -25,7 +25,7 @@ impl Executor { }; Self(executor::Executor::new_multi_threaded( pool_size, - dummy_context, + dummy_cx, executor::Signal::new(), )) } diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index 115c70e..19cb1e2 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -45,7 +45,7 @@ //! * _input ports_, which are synchronous or asynchronous methods that //! implement the [`InputFn`](ports::InputFn) trait and take an `&mut self` //! argument, a message argument, and an optional -//! [`&Context`](model::Context) argument, +//! [`&mut Context`](model::Context) argument, //! * _replier ports_, which are similar to input ports but implement the //! [`ReplierFn`](ports::ReplierFn) trait and return a reply. //! @@ -118,8 +118,8 @@ //! pub output: Output, //! } //! impl Delay { -//! pub fn input(&mut self, value: f64, context: &Context) { -//! context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! pub fn input(&mut self, value: f64, cx: &mut Context) { +//! cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! } //! //! async fn send(&mut self, value: f64) { @@ -189,8 +189,8 @@ //! # pub output: Output, //! # } //! # impl Delay { -//! # pub fn input(&mut self, value: f64, context: &Context) { -//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # pub fn input(&mut self, value: f64, cx: &mut Context) { +//! # cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -290,8 +290,8 @@ //! # pub output: Output, //! # } //! # impl Delay { -//! # pub fn input(&mut self, value: f64, context: &Context) { -//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # pub fn input(&mut self, value: f64, cx: &mut Context) { +//! # cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; diff --git a/asynchronix/src/model.rs b/asynchronix/src/model.rs index 1976472..35793fb 100644 --- a/asynchronix/src/model.rs +++ b/asynchronix/src/model.rs @@ -56,7 +56,7 @@ //! impl Model for MyModel { //! async fn init( //! mut self, -//! ctx: &Context +//! ctx: &mut Context //! ) -> InitializedModel { //! println!("...initialization..."); //! @@ -173,10 +173,10 @@ //! ```ignore //! fn(&mut self) // argument elided, implies `T=()` //! fn(&mut self, T) -//! fn(&mut self, T, &Context) +//! fn(&mut self, T, &mut Context) //! async fn(&mut self) // argument elided, implies `T=()` //! async fn(&mut self, T) -//! async fn(&mut self, T, &Context) +//! async fn(&mut self, T, &mut Context) //! where //! Self: Model, //! T: Clone + Send + 'static, @@ -193,7 +193,7 @@ //! ```ignore //! async fn(&mut self) -> R // argument elided, implies `T=()` //! async fn(&mut self, T) -> R -//! async fn(&mut self, T, &Context) -> R +//! async fn(&mut self, T, &mut Context) -> R //! where //! Self: Model, //! T: Clone + Send + 'static, @@ -219,7 +219,7 @@ //! // ... //! } //! impl MyModel { -//! pub fn my_input(&mut self, input: String, context: &Context) { +//! pub fn my_input(&mut self, input: String, cx: &mut Context) { //! // ... //! } //! pub async fn my_replier(&mut self, request: u32) -> bool { // context argument elided @@ -273,7 +273,7 @@ pub trait Model: Sized + Send + 'static { /// impl Model for MyModel { /// async fn init( /// self, - /// context: &Context + /// cx: &mut Context /// ) -> InitializedModel { /// println!("...initialization..."); /// @@ -281,7 +281,7 @@ pub trait Model: Sized + Send + 'static { /// } /// } /// ``` - fn init(self, _: &Context) -> impl Future> + Send { + fn init(self, _: &mut Context) -> impl Future> + Send { async { self.into() } } } @@ -322,7 +322,7 @@ pub trait ProtoModel: Sized { /// This method is invoked when the /// [`SimInit::add_model()`](crate::simulation::SimInit::add_model) or /// [`BuildContext::add_submodel`] method is called. - fn build(self, ctx: &mut BuildContext) -> Self::Model; + fn build(self, cx: &mut BuildContext) -> Self::Model; } // Every model can be used as a prototype for itself. diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index 85f8dcd..6e8c70b 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -1,7 +1,10 @@ use std::fmt; +use std::time::Duration; use crate::executor::{Executor, Signal}; -use crate::simulation::{self, LocalScheduler, Mailbox, SchedulerInner}; +use crate::ports::InputFn; +use crate::simulation::{self, ActionKey, Address, GlobalScheduler, Mailbox, SchedulingError}; +use crate::time::{Deadline, MonotonicTime}; use super::{Model, ProtoModel}; @@ -22,7 +25,7 @@ use super::{Model, ProtoModel}; /// fn self_scheduling_method<'a>( /// &'a mut self, /// arg: MyEventType, -/// context: &'a Context +/// cx: &'a mut Context /// ) -> impl Future + Send + 'a { /// async move { /// /* implementation */ @@ -49,14 +52,14 @@ use super::{Model, ProtoModel}; /// /// impl DelayedGreeter { /// // Triggers a greeting on the output port after some delay [input port]. -/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context) { -/// let time = context.scheduler.time(); +/// pub async fn greet_with_delay(&mut self, delay: Duration, cx: &mut Context) { +/// let time = cx.time(); /// let greeting = format!("Hello, this message was scheduled at: {:?}.", time); /// /// if delay.is_zero() { /// self.msg_out.send(greeting).await; /// } else { -/// context.scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap(); +/// cx.schedule_event(delay, Self::send_msg, greeting).unwrap(); /// } /// } /// @@ -72,26 +75,293 @@ use super::{Model, ProtoModel}; // https://github.com/rust-lang/rust/issues/78649 pub struct Context { name: String, - - /// Local scheduler. - pub scheduler: LocalScheduler, + scheduler: GlobalScheduler, + address: Address, + origin_id: usize, } impl Context { /// Creates a new local context. - pub(crate) fn new(name: String, scheduler: LocalScheduler) -> Self { - Self { name, scheduler } + pub(crate) fn new(name: String, scheduler: GlobalScheduler, address: Address) -> Self { + // The only requirement for the origin ID is that it must be (i) + // specific to each model and (ii) different from 0 (which is reserved + // for the global scheduler). The channel ID of the model mailbox + // fulfills this requirement. + let origin_id = address.0.channel_id(); + + Self { + name, + scheduler, + address, + origin_id, + } } - /// Returns the model instance name. + /// Returns the fully qualified model instance name. + /// + /// The fully qualified name is made of the unqualified model name, if + /// relevant prepended by the dot-separated names of all parent models. pub fn name(&self) -> &str { &self.name } + + /// Returns the current simulation time. + pub fn time(&self) -> MonotonicTime { + self.scheduler.time() + } + + /// Schedules an event at a future time on this model. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// + /// // A timer. + /// pub struct Timer {} + /// + /// impl Timer { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: Duration, cx: &mut Context) { + /// if cx.schedule_event(setting, Self::ring, ()).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Rings [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg"); + /// } + /// } + /// + /// impl Model for Timer {} + /// ``` + pub fn schedule_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + self.scheduler + .schedule_event_from(deadline, func, arg, &self.address, self.origin_id) + } + + /// Schedules a cancellable event at a future time on this model and returns + /// an action key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock that can be cancelled. + /// #[derive(Default)] + /// pub struct CancellableAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { + /// self.cancel(); + /// match cx.schedule_keyed_event(setting, Self::ring, ()) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels the current alarm, if any [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Rings the alarm [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg!"); + /// } + /// } + /// + /// impl Model for CancellableAlarmClock {} + /// ``` + pub fn schedule_keyed_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = self.scheduler.schedule_keyed_event_from( + deadline, + func, + arg, + &self.address, + self.origin_id, + )?; + + Ok(event_key) + } + + /// Schedules a periodically recurring event on this model at a future time. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz. + /// pub struct BeepingAlarmClock {} + /// + /// impl BeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { + /// if cx.schedule_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for BeepingAlarmClock {} + /// ``` + pub fn schedule_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + self.scheduler.schedule_periodic_event_from( + deadline, + period, + func, + arg, + &self.address, + self.origin_id, + ) + } + + /// Schedules a cancellable, periodically recurring event on this model at a + /// future time and returns an action key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or + /// // stopped after it sets off. + /// #[derive(Default)] + /// pub struct CancellableBeepingAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableBeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { + /// self.cancel(); + /// match cx.schedule_keyed_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels or stops the alarm [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for CancellableBeepingAlarmClock {} + /// ``` + pub fn schedule_keyed_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = self.scheduler.schedule_keyed_periodic_event_from( + deadline, + period, + func, + arg, + &self.address, + self.origin_id, + )?; + + Ok(event_key) + } } impl fmt::Debug for Context { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Context").finish_non_exhaustive() + f.debug_struct("Context") + .field("name", &self.name()) + .field("time", &self.time()) + .field("address", &self.address) + .field("origin_id", &self.origin_id) + .finish_non_exhaustive() } } @@ -153,7 +423,7 @@ impl fmt::Debug for Context { /// /// fn build( /// self, -/// ctx: &mut BuildContext) +/// cx: &mut BuildContext) /// -> MultiplyBy4 { /// let mut mult = MultiplyBy4 { forward: Output::default() }; /// let mut submult1 = MultiplyBy2::default(); @@ -170,8 +440,8 @@ impl fmt::Debug for Context { /// submult1.output.connect(MultiplyBy2::input, &submult2_mbox); /// /// // Add the submodels to the simulation. -/// ctx.add_submodel(submult1, submult1_mbox, "submultiplier 1"); -/// ctx.add_submodel(submult2, submult2_mbox, "submultiplier 2"); +/// cx.add_submodel(submult1, submult1_mbox, "submultiplier 1"); +/// cx.add_submodel(submult2, submult2_mbox, "submultiplier 2"); /// /// mult /// } @@ -180,10 +450,9 @@ impl fmt::Debug for Context { /// ``` #[derive(Debug)] pub struct BuildContext<'a, P: ProtoModel> { - /// Mailbox of the model. - pub mailbox: &'a Mailbox, + mailbox: &'a Mailbox, name: &'a String, - scheduler: &'a SchedulerInner, + scheduler: &'a GlobalScheduler, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, @@ -194,7 +463,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { pub(crate) fn new( mailbox: &'a Mailbox, name: &'a String, - scheduler: &'a SchedulerInner, + scheduler: &'a GlobalScheduler, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, @@ -217,6 +486,11 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { self.name } + /// Returns a handle to the model's mailbox. + pub fn address(&self) -> Address { + self.mailbox.address() + } + /// Adds a sub-model to the simulation bench. /// /// The `name` argument needs not be unique. It is appended to that of the diff --git a/asynchronix/src/ports.rs b/asynchronix/src/ports.rs index 76b691a..dc23659 100644 --- a/asynchronix/src/ports.rs +++ b/asynchronix/src/ports.rs @@ -69,10 +69,10 @@ //! impl ProtoModel for ProtoParentModel { //! type Model = ParentModel; //! -//! fn build(self, ctx: &mut BuildContext) -> ParentModel { +//! fn build(self, cx: &mut BuildContext) -> ParentModel { //! let mut child = ChildModel::new(self.output.clone()); //! -//! ctx.add_submodel(child, Mailbox::new(), "child"); +//! cx.add_submodel(child, Mailbox::new(), "child"); //! //! ParentModel { output: self.output } //! } diff --git a/asynchronix/src/ports/input/model_fn.rs b/asynchronix/src/ports/input/model_fn.rs index d9668e5..550bb85 100644 --- a/asynchronix/src/ports/input/model_fn.rs +++ b/asynchronix/src/ports/input/model_fn.rs @@ -14,9 +14,9 @@ use super::markers; /// /// ```ignore /// FnOnce(&mut M, T) -/// FnOnce(&mut M, T, &Context) +/// FnOnce(&mut M, T, &mut Context) /// async fn(&mut M, T) -/// async fn(&mut M, T, &Context) +/// async fn(&mut M, T, &mut Context) /// where /// M: Model /// ``` @@ -34,7 +34,7 @@ pub trait InputFn<'a, M: Model, T, S>: Send + 'static { type Future: Future + Send + 'a; /// Calls the method. - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future; + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future; } impl<'a, M, F> InputFn<'a, M, (), markers::WithoutArguments> for F @@ -44,7 +44,7 @@ where { type Future = Ready<()>; - fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context) -> Self::Future { self(model); ready(()) @@ -58,7 +58,7 @@ where { type Future = Ready<()>; - fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context) -> Self::Future { self(model, arg); ready(()) @@ -68,12 +68,12 @@ where impl<'a, M, T, F> InputFn<'a, M, T, markers::WithContext> for F where M: Model, - F: FnOnce(&'a mut M, T, &'a Context) + Send + 'static, + F: FnOnce(&'a mut M, T, &'a mut Context) + Send + 'static, { type Future = Ready<()>; - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { - self(model, arg, context); + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future { + self(model, arg, cx); ready(()) } @@ -87,7 +87,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context) -> Self::Future { self(model) } } @@ -100,7 +100,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context) -> Self::Future { self(model, arg) } } @@ -109,12 +109,12 @@ impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithContext> for F where M: Model, Fut: Future + Send + 'a, - F: FnOnce(&'a mut M, T, &'a Context) -> Fut + Send + 'static, + F: FnOnce(&'a mut M, T, &'a mut Context) -> Fut + Send + 'static, { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { - self(model, arg, context) + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future { + self(model, arg, cx) } } @@ -126,7 +126,7 @@ where /// /// ```ignore /// async fn(&mut M, T) -> R -/// async fn(&mut M, T, &Context) -> R +/// async fn(&mut M, T, &mut Context) -> R /// where /// M: Model /// ``` @@ -143,7 +143,7 @@ pub trait ReplierFn<'a, M: Model, T, R, S>: Send + 'static { type Future: Future + Send + 'a; /// Calls the method. - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future; + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future; } impl<'a, M, R, Fut, F> ReplierFn<'a, M, (), R, markers::AsyncWithoutArguments> for F @@ -154,7 +154,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context) -> Self::Future { self(model) } } @@ -167,7 +167,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context) -> Self::Future { self(model, arg) } } @@ -176,11 +176,11 @@ impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithContext> for where M: Model, Fut: Future + Send + 'a, - F: FnOnce(&'a mut M, T, &'a Context) -> Fut + Send + 'static, + F: FnOnce(&'a mut M, T, &'a mut Context) -> Fut + Send + 'static, { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { - self(model, arg, context) + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future { + self(model, arg, cx) } } diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 980bbe8..4b7a36b 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -567,7 +567,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, SchedulerInner}; + use crate::simulation::{Address, GlobalScheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -635,14 +635,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); } }) }) @@ -707,17 +705,15 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); }); } }) @@ -769,14 +765,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) @@ -856,25 +850,23 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); }); diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 162f23c..136cdfa 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -468,7 +468,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, SchedulerInner}; + use crate::simulation::{Address, GlobalScheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -536,14 +536,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); } }) }) @@ -608,17 +606,15 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); }); } }) @@ -670,14 +666,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) @@ -757,25 +751,23 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - SchedulerInner::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); }); diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 023521b..9beee4d 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -124,11 +124,11 @@ mod sim_init; use scheduler::SchedulerQueue; pub(crate) use scheduler::{ - KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner, + GlobalScheduler, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, }; pub use mailbox::{Address, Mailbox}; -pub use scheduler::{Action, ActionKey, AutoActionKey, LocalScheduler, Scheduler, SchedulingError}; +pub use scheduler::{Action, ActionKey, AutoActionKey, Scheduler, SchedulingError}; pub use sim_init::SimInit; use std::any::Any; @@ -165,7 +165,7 @@ thread_local! { pub(crate) static CURRENT_MODEL_ID: Cell = const { Cell /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation /// itself, but also from models via the optional -/// [`&Context`](crate::model::Context) argument of input and replier port +/// [`&mut Context`](crate::model::Context) argument of input and replier port /// methods. Likewise, simulation time can be accessed with the /// [`Simulation::time()`] method, or from models with the /// [`LocalScheduler::time()`](crate::simulation::LocalScheduler::time) method. @@ -720,7 +720,7 @@ pub(crate) fn add_model( model: P, mailbox: Mailbox, name: String, - scheduler: SchedulerInner, + scheduler: GlobalScheduler, executor: &Executor, abort_signal: &Signal, model_names: &mut Vec, @@ -728,7 +728,7 @@ pub(crate) fn add_model( #[cfg(feature = "tracing")] let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); - let mut build_context = BuildContext::new( + let mut build_cx = BuildContext::new( &mailbox, &name, &scheduler, @@ -736,15 +736,15 @@ pub(crate) fn add_model( abort_signal, model_names, ); - let model = model.build(&mut build_context); + let model = model.build(&mut build_cx); let address = mailbox.address(); let mut receiver = mailbox.0; let abort_signal = abort_signal.clone(); - let context = Context::new(name.clone(), LocalScheduler::new(scheduler, address)); + let mut cx = Context::new(name.clone(), scheduler, address); let fut = async move { - let mut model = model.init(&context).await.0; - while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {} + let mut model = model.init(&mut cx).await.0; + while !abort_signal.is_set() && receiver.recv(&mut model, &mut cx).await.is_ok() {} }; let model_id = ModelId::new(model_names.len()); diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index c830e48..ac7cdce 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -1,37 +1,34 @@ //! Scheduling functions and types. -mod inner; - use std::error::Error; use std::future::Future; use std::hash::{Hash, Hasher}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use std::time::Duration; use std::{fmt, ptr}; +use pin_project::pin_project; +use recycle_box::{coerce_box, RecycleBox}; + +use crate::channel::Sender; use crate::executor::Executor; use crate::model::Model; use crate::ports::InputFn; use crate::simulation::Address; use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; - -use inner::ActionInner; - -pub(crate) use inner::{ - KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner, - SchedulerQueue, -}; +use crate::util::priority_queue::PriorityQueue; const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; /// A global scheduler. #[derive(Clone)] -pub struct Scheduler(SchedulerInner); +pub struct Scheduler(GlobalScheduler); impl Scheduler { pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { - Self(SchedulerInner::new(scheduler_queue, time)) + Self(GlobalScheduler::new(scheduler_queue, time)) } /// Returns the current simulation time. @@ -193,311 +190,6 @@ impl fmt::Debug for Scheduler { } } -/// Local scheduler. -pub struct LocalScheduler { - scheduler: SchedulerInner, - address: Address, - origin_id: usize, -} - -impl LocalScheduler { - pub(crate) fn new(scheduler: SchedulerInner, address: Address) -> Self { - // The only requirement for the origin ID is that it must be (i) - // specific to each model and (ii) different from 0 (which is reserved - // for the global scheduler). The channel ID of the model mailbox - // fulfills this requirement. - let origin_id = address.0.channel_id(); - - Self { - scheduler, - address, - origin_id, - } - } - - /// Returns the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::Model; - /// use asynchronix::simulation::LocalScheduler; - /// use asynchronix::time::MonotonicTime; - /// - /// fn is_third_millenium(scheduler: &LocalScheduler) -> bool { - /// let time = scheduler.time(); - /// time >= MonotonicTime::new(978307200, 0).unwrap() - /// && time < MonotonicTime::new(32535216000, 0).unwrap() - /// } - /// ``` - pub fn time(&self) -> MonotonicTime { - self.scheduler.time() - } - - /// Schedules an event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// - /// // A timer. - /// pub struct Timer {} - /// - /// impl Timer { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: Duration, context: &Context) { - /// if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Rings [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg"); - /// } - /// } - /// - /// impl Model for Timer {} - /// ``` - pub fn schedule_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - self.scheduler - .schedule_event_from(deadline, func, arg, &self.address, self.origin_id) - } - - /// Schedules a cancellable event at a future time and returns an action - /// key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::simulation::ActionKey; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock that can be cancelled. - /// #[derive(Default)] - /// pub struct CancellableAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// self.cancel(); - /// match context.scheduler.schedule_keyed_event(setting, Self::ring, ()) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels the current alarm, if any [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Rings the alarm [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg!"); - /// } - /// } - /// - /// impl Model for CancellableAlarmClock {} - /// ``` - pub fn schedule_keyed_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let event_key = self.scheduler.schedule_keyed_event_from( - deadline, - func, - arg, - &self.address, - self.origin_id, - )?; - - Ok(event_key) - } - - /// Schedules a periodically recurring event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock beeping at 1Hz. - /// pub struct BeepingAlarmClock {} - /// - /// impl BeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// if context.scheduler.schedule_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for BeepingAlarmClock {} - /// ``` - pub fn schedule_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - self.scheduler.schedule_periodic_event_from( - deadline, - period, - func, - arg, - &self.address, - self.origin_id, - ) - } - - /// Schedules a cancellable, periodically recurring event at a future time - /// and returns an action key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::simulation::ActionKey; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or - /// // stopped after it sets off. - /// #[derive(Default)] - /// pub struct CancellableBeepingAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableBeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// self.cancel(); - /// match context.scheduler.schedule_keyed_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels or stops the alarm [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for CancellableBeepingAlarmClock {} - /// ``` - pub fn schedule_keyed_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let event_key = self.scheduler.schedule_keyed_periodic_event_from( - deadline, - period, - func, - arg, - &self.address, - self.origin_id, - )?; - - Ok(event_key) - } -} - -impl Clone for LocalScheduler { - fn clone(&self) -> Self { - Self { - scheduler: self.scheduler.clone(), - address: self.address.clone(), - origin_id: self.origin_id, - } - } -} - -impl fmt::Debug for LocalScheduler { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LocalScheduler") - .field("time", &self.time()) - .field("address", &self.address) - .field("origin_id", &self.origin_id) - .finish_non_exhaustive() - } -} - /// Managed handle to a scheduled action. /// /// An `AutoActionKey` is a managed handle to a scheduled action that cancels @@ -638,3 +330,492 @@ impl fmt::Debug for Action { f.debug_struct("SchedulableEvent").finish_non_exhaustive() } } + +/// Alias for the scheduler queue type. +/// +/// Why use both time and origin ID as the key? The short answer is that this +/// allows to preserve the relative ordering of events which have the same +/// origin (where the origin is either a model instance or the global +/// scheduler). The preservation of this ordering is implemented by the event +/// loop, which aggregate events with the same origin into single sequential +/// futures, thus ensuring that they are not executed concurrently. +pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; + +/// Internal implementation of the global scheduler. +#[derive(Clone)] +pub(crate) struct GlobalScheduler { + scheduler_queue: Arc>, + time: AtomicTimeReader, +} + +impl GlobalScheduler { + pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { + Self { + scheduler_queue, + time, + } + } + + /// Returns the current simulation time. + pub(crate) fn time(&self) -> MonotonicTime { + // We use `read` rather than `try_read` because the scheduler can be + // sent to another thread than the simulator's and could thus + // potentially see a torn read if the simulator increments time + // concurrently. The chances of this happening are very small since + // simulation time is not changed frequently. + self.time.read() + } + + /// Schedules an action identified by its origin at a future time. + pub(crate) fn schedule_from( + &self, + deadline: impl Deadline, + action: Action, + origin_id: usize, + ) -> Result<(), SchedulingError> { + // The scheduler queue must always be locked when reading the time, + // otherwise the following race could occur: + // 1) this method reads the time and concludes that it is not too late + // to schedule the action, + // 2) the `Simulation` object takes the lock, increments simulation time + // and runs the simulation step, + // 3) this method takes the lock and schedules the now-outdated action. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules an event identified by its origin at a future time. + pub(crate) fn schedule_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let sender = address.into().0; + let action = Action::new(OnceAction::new(process_event(func, arg, sender))); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable event identified by its origin at a future time + /// and returns an event key. + pub(crate) fn schedule_keyed_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedOnceAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } + + /// Schedules a periodically recurring event identified by its origin at a + /// future time. + pub(crate) fn schedule_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = address.into().0; + let action = Action::new(PeriodicAction::new( + || process_event(func, arg, sender), + period, + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event identified by its + /// origin at a future time and returns an event key. + pub(crate) fn schedule_keyed_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedPeriodicAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + period, + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } +} + +impl fmt::Debug for GlobalScheduler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SchedulerInner") + .field("time", &self.time()) + .finish_non_exhaustive() + } +} + +/// Trait abstracting over the inner type of an action. +pub(crate) trait ActionInner: Send + 'static { + /// Reports whether the action was cancelled. + fn is_cancelled(&self) -> bool; + + /// If this is a periodic action, returns a boxed clone of this action and + /// its repetition period; otherwise returns `None`. + fn next(&self) -> Option<(Box, Duration)>; + + /// Returns a boxed future that performs the action. + fn into_future(self: Box) -> Pin + Send>>; + + /// Spawns the future that performs the action onto the provided executor. + /// + /// This method is typically more efficient that spawning the boxed future + /// from `into_future` since it can directly spawn the unboxed future. + fn spawn_and_forget(self: Box, executor: &Executor); +} + +/// An object that can be converted to a future performing a single +/// non-cancellable action. +/// +/// Note that this particular action is in fact already a future: since the +/// future cannot be cancelled and the action does not need to be cloned, +/// there is no need to defer the construction of the future. This makes +/// `into_future` a trivial cast, which saves a boxing operation. +#[pin_project] +pub(crate) struct OnceAction { + #[pin] + fut: F, +} + +impl OnceAction +where + F: Future + Send + 'static, +{ + /// Constructs a new `OnceAction`. + pub(crate) fn new(fut: F) -> Self { + OnceAction { fut } + } +} + +impl Future for OnceAction +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl ActionInner for OnceAction +where + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + false + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + // No need for boxing, type coercion is enough here. + Box::into_pin(self) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget(*self); + } +} + +/// An object that can be converted to a future performing a non-cancellable, +/// periodic action. +pub(crate) struct PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// A clonable generator for the associated future. + gen: G, + /// The action repetition period. + period: Duration, +} + +impl PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `PeriodicAction`. + pub(crate) fn new(gen: G, period: Duration) -> Self { + Self { gen, period } + } +} + +impl ActionInner for PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + false + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new(self.gen.clone(), self.period)); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)()) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)()); + } +} + +/// An object that can be converted to a future performing a single, cancellable +/// action. +pub(crate) struct KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + /// A generator for the associated future. + gen: G, + /// The event cancellation key. + event_key: ActionKey, +} + +impl KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `KeyedOnceAction`. + pub(crate) fn new(gen: G, event_key: ActionKey) -> Self { + Self { gen, event_key } + } +} + +impl ActionInner for KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)(self.event_key)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)(self.event_key)); + } +} + +/// An object that can be converted to a future performing a periodic, +/// cancellable action. +pub(crate) struct KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// A clonable generator for associated future. + gen: G, + /// The repetition period. + period: Duration, + /// The event cancellation key. + event_key: ActionKey, +} + +impl KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `KeyedPeriodicAction`. + pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self { + Self { + gen, + period, + event_key, + } + } +} + +impl ActionInner for KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new( + self.gen.clone(), + self.period, + self.event_key.clone(), + )); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)(self.event_key)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)(self.event_key)); + } +} + +/// Asynchronously sends a non-cancellable event to a model input. +pub(crate) async fn process_event(func: F, arg: T, sender: Sender) +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} + +/// Asynchronously sends a cancellable event to a model input. +pub(crate) async fn send_keyed_event( + event_key: ActionKey, + func: F, + arg: T, + sender: Sender, +) where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = async move { + // Only perform the call if the event wasn't cancelled. + if !event_key.is_cancelled() { + func.call(model, arg, scheduler).await; + } + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} diff --git a/asynchronix/src/simulation/scheduler/inner.rs b/asynchronix/src/simulation/scheduler/inner.rs deleted file mode 100644 index 6cb633d..0000000 --- a/asynchronix/src/simulation/scheduler/inner.rs +++ /dev/null @@ -1,508 +0,0 @@ -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; -use std::time::Duration; - -use pin_project::pin_project; -use recycle_box::{coerce_box, RecycleBox}; - -use crate::channel::Sender; -use crate::executor::Executor; -use crate::model::Model; -use crate::ports::InputFn; -use crate::simulation::Address; -use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; -use crate::util::priority_queue::PriorityQueue; - -use super::{Action, ActionKey, SchedulingError}; - -/// Alias for the scheduler queue type. -/// -/// Why use both time and origin ID as the key? The short answer is that this -/// allows to preserve the relative ordering of events which have the same -/// origin (where the origin is either a model instance or the global -/// scheduler). The preservation of this ordering is implemented by the event -/// loop, which aggregate events with the same origin into single sequential -/// futures, thus ensuring that they are not executed concurrently. -pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; - -/// Internal scheduler implementation. -#[derive(Clone)] -pub(crate) struct SchedulerInner { - scheduler_queue: Arc>, - time: AtomicTimeReader, -} - -impl SchedulerInner { - pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { - Self { - scheduler_queue, - time, - } - } - - /// Returns the current simulation time. - pub(crate) fn time(&self) -> MonotonicTime { - // We use `read` rather than `try_read` because the scheduler can be - // sent to another thread than the simulator's and could thus - // potentially see a torn read if the simulator increments time - // concurrently. The chances of this happening are very small since - // simulation time is not changed frequently. - self.time.read() - } - - /// Schedules an action identified by its origin at a future time. - pub(crate) fn schedule_from( - &self, - deadline: impl Deadline, - action: Action, - origin_id: usize, - ) -> Result<(), SchedulingError> { - // The scheduler queue must always be locked when reading the time, - // otherwise the following race could occur: - // 1) this method reads the time and concludes that it is not too late - // to schedule the action, - // 2) the `Simulation` object takes the lock, increments simulation time - // and runs the simulation step, - // 3) this method takes the lock and schedules the now-outdated action. - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - scheduler_queue.insert((time, origin_id), action); - - Ok(()) - } - - /// Schedules an event identified by its origin at a future time. - pub(crate) fn schedule_event_from( - &self, - deadline: impl Deadline, - func: F, - arg: T, - address: impl Into>, - origin_id: usize, - ) -> Result<(), SchedulingError> - where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let sender = address.into().0; - let action = Action::new(OnceAction::new(process_event(func, arg, sender))); - - // The scheduler queue must always be locked when reading the time (see - // `schedule_from`). - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - scheduler_queue.insert((time, origin_id), action); - - Ok(()) - } - - /// Schedules a cancellable event identified by its origin at a future time - /// and returns an event key. - pub(crate) fn schedule_keyed_event_from( - &self, - deadline: impl Deadline, - func: F, - arg: T, - address: impl Into>, - origin_id: usize, - ) -> Result - where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let event_key = ActionKey::new(); - let sender = address.into().0; - let action = Action::new(KeyedOnceAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - event_key.clone(), - )); - - // The scheduler queue must always be locked when reading the time (see - // `schedule_from`). - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - scheduler_queue.insert((time, origin_id), action); - - Ok(event_key) - } - - /// Schedules a periodically recurring event identified by its origin at a - /// future time. - pub(crate) fn schedule_periodic_event_from( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - address: impl Into>, - origin_id: usize, - ) -> Result<(), SchedulingError> - where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = address.into().0; - let action = Action::new(PeriodicAction::new( - || process_event(func, arg, sender), - period, - )); - - // The scheduler queue must always be locked when reading the time (see - // `schedule_from`). - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - scheduler_queue.insert((time, origin_id), action); - - Ok(()) - } - - /// Schedules a cancellable, periodically recurring event identified by its - /// origin at a future time and returns an event key. - pub(crate) fn schedule_keyed_periodic_event_from( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - address: impl Into>, - origin_id: usize, - ) -> Result - where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let action = Action::new(KeyedPeriodicAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - period, - event_key.clone(), - )); - - // The scheduler queue must always be locked when reading the time (see - // `schedule_from`). - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - scheduler_queue.insert((time, origin_id), action); - - Ok(event_key) - } -} - -impl fmt::Debug for SchedulerInner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SchedulerInner") - .field("time", &self.time()) - .finish_non_exhaustive() - } -} - -/// Trait abstracting over the inner type of an action. -pub(crate) trait ActionInner: Send + 'static { - /// Reports whether the action was cancelled. - fn is_cancelled(&self) -> bool; - - /// If this is a periodic action, returns a boxed clone of this action and - /// its repetition period; otherwise returns `None`. - fn next(&self) -> Option<(Box, Duration)>; - - /// Returns a boxed future that performs the action. - fn into_future(self: Box) -> Pin + Send>>; - - /// Spawns the future that performs the action onto the provided executor. - /// - /// This method is typically more efficient that spawning the boxed future - /// from `into_future` since it can directly spawn the unboxed future. - fn spawn_and_forget(self: Box, executor: &Executor); -} - -/// An object that can be converted to a future performing a single -/// non-cancellable action. -/// -/// Note that this particular action is in fact already a future: since the -/// future cannot be cancelled and the action does not need to be cloned, -/// there is no need to defer the construction of the future. This makes -/// `into_future` a trivial cast, which saves a boxing operation. -#[pin_project] -pub(crate) struct OnceAction { - #[pin] - fut: F, -} - -impl OnceAction -where - F: Future + Send + 'static, -{ - /// Constructs a new `OnceAction`. - pub(crate) fn new(fut: F) -> Self { - OnceAction { fut } - } -} - -impl Future for OnceAction -where - F: Future, -{ - type Output = F::Output; - - #[inline(always)] - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} - -impl ActionInner for OnceAction -where - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - false - } - fn next(&self) -> Option<(Box, Duration)> { - None - } - fn into_future(self: Box) -> Pin + Send>> { - // No need for boxing, type coercion is enough here. - Box::into_pin(self) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget(*self); - } -} - -/// An object that can be converted to a future performing a non-cancellable, -/// periodic action. -pub(crate) struct PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// A clonable generator for the associated future. - gen: G, - /// The action repetition period. - period: Duration, -} - -impl PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `PeriodicAction`. - pub(crate) fn new(gen: G, period: Duration) -> Self { - Self { gen, period } - } -} - -impl ActionInner for PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - false - } - fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new(self.gen.clone(), self.period)); - - Some((event, self.period)) - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)()) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)()); - } -} - -/// An object that can be converted to a future performing a single, cancellable -/// action. -pub(crate) struct KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - /// A generator for the associated future. - gen: G, - /// The event cancellation key. - event_key: ActionKey, -} - -impl KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `KeyedOnceAction`. - pub(crate) fn new(gen: G, event_key: ActionKey) -> Self { - Self { gen, event_key } - } -} - -impl ActionInner for KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - self.event_key.is_cancelled() - } - fn next(&self) -> Option<(Box, Duration)> { - None - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); - } -} - -/// An object that can be converted to a future performing a periodic, -/// cancellable action. -pub(crate) struct KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// A clonable generator for associated future. - gen: G, - /// The repetition period. - period: Duration, - /// The event cancellation key. - event_key: ActionKey, -} - -impl KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `KeyedPeriodicAction`. - pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self { - Self { - gen, - period, - event_key, - } - } -} - -impl ActionInner for KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - self.event_key.is_cancelled() - } - fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new( - self.gen.clone(), - self.period, - self.event_key.clone(), - )); - - Some((event, self.period)) - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); - } -} - -/// Asynchronously sends a non-cancellable event to a model input. -pub(crate) async fn process_event(func: F, arg: T, sender: Sender) -where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, -{ - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = func.call(model, arg, scheduler); - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; -} - -/// Asynchronously sends a cancellable event to a model input. -pub(crate) async fn send_keyed_event( - event_key: ActionKey, - func: F, - arg: T, - sender: Sender, -) where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, -{ - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = async move { - // Only perform the call if the event wasn't cancelled. - if !event_key.is_cancelled() { - func.call(model, arg, scheduler).await; - } - }; - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; -} diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index fb72641..49deb2c 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -11,7 +11,7 @@ use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; use super::{ - add_model, ExecutionError, Mailbox, SchedulerInner, SchedulerQueue, Signal, Simulation, + add_model, ExecutionError, Mailbox, GlobalScheduler, SchedulerQueue, Signal, Simulation, }; /// Builder for a multi-threaded, discrete-event simulation. @@ -90,7 +90,7 @@ impl SimInit { }; self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); - let scheduler = SchedulerInner::new(self.scheduler_queue.clone(), self.time.reader()); + let scheduler = GlobalScheduler::new(self.scheduler_queue.clone(), self.time.reader()); add_model( model, diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index ff91963..5bcdebb 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -30,8 +30,8 @@ //! } //! //! // Sets an alarm [input port]. -//! pub fn set(&mut self, setting: MonotonicTime, context: &Context) { -//! if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() { +//! pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { +//! if cx.schedule_event(setting, Self::ring, ()).is_err() { //! println!("The alarm clock can only be set for a future time"); //! } //! } diff --git a/asynchronix/tests/integration/model_scheduling.rs b/asynchronix/tests/integration/model_scheduling.rs index 6d841f2..641cf79 100644 --- a/asynchronix/tests/integration/model_scheduling.rs +++ b/asynchronix/tests/integration/model_scheduling.rs @@ -15,14 +15,8 @@ fn model_schedule_event(num_threads: usize) { output: Output<()>, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action, - (), - ) + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_event(Duration::from_secs(2), Self::action, ()) .unwrap(); } async fn action(&mut self) { @@ -59,22 +53,11 @@ fn model_cancel_future_keyed_event(num_threads: usize) { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_event( - context.scheduler.time() + Duration::from_secs(1), - Self::action1, - (), - ) + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_event(Duration::from_secs(1), Self::action1, ()) .unwrap(); - self.key = context - .scheduler - .schedule_keyed_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action2, - (), - ) + self.key = cx + .schedule_keyed_event(Duration::from_secs(2), Self::action2, ()) .ok(); } async fn action1(&mut self) { @@ -117,22 +100,11 @@ fn model_cancel_same_time_keyed_event(num_threads: usize) { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action1, - (), - ) + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_event(Duration::from_secs(2), Self::action1, ()) .unwrap(); - self.key = context - .scheduler - .schedule_keyed_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action2, - (), - ) + self.key = cx + .schedule_keyed_event(Duration::from_secs(2), Self::action2, ()) .ok(); } async fn action1(&mut self) { @@ -174,16 +146,14 @@ fn model_schedule_periodic_event(num_threads: usize) { output: Output, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_periodic_event( - context.scheduler.time() + Duration::from_secs(2), - Duration::from_secs(3), - Self::action, - 42, - ) - .unwrap(); + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_periodic_event( + Duration::from_secs(2), + Duration::from_secs(3), + Self::action, + 42, + ) + .unwrap(); } async fn action(&mut self, payload: i32) { self.output.send(payload).await; @@ -225,11 +195,10 @@ fn model_cancel_periodic_event(num_threads: usize) { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - self.key = context - .scheduler + fn trigger(&mut self, _: (), cx: &mut Context) { + self.key = cx .schedule_keyed_periodic_event( - context.scheduler.time() + Duration::from_secs(2), + Duration::from_secs(2), Duration::from_secs(3), Self::action, (), diff --git a/asynchronix/tests/integration/simulation_scheduling.rs b/asynchronix/tests/integration/simulation_scheduling.rs index 6552d20..f0ea1e5 100644 --- a/asynchronix/tests/integration/simulation_scheduling.rs +++ b/asynchronix/tests/integration/simulation_scheduling.rs @@ -278,7 +278,7 @@ impl TimestampModel { } #[cfg(not(miri))] impl Model for TimestampModel { - async fn init(mut self, _: &Context) -> asynchronix::model::InitializedModel { + async fn init(mut self, _: &mut Context) -> asynchronix::model::InitializedModel { self.stamp.send((Instant::now(), SystemTime::now())).await; self.into() }