From 7526ffbceadf9dcddc02656d2f6715543fe08480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Thu, 11 Apr 2024 09:40:37 +0200 Subject: [PATCH] Add setup step. --- .github/workflows/ci.yml | 2 +- asynchronix/Cargo.toml | 2 +- asynchronix/examples/espresso_machine.rs | 51 +- asynchronix/examples/power_supply.rs | 2 +- asynchronix/examples/stepper_motor.rs | 25 +- asynchronix/src/channel.rs | 21 +- asynchronix/src/lib.rs | 57 +- asynchronix/src/model.rs | 135 +++-- asynchronix/src/model/context.rs | 485 ++++++++++++++++++ asynchronix/src/ports/input/markers.rs | 16 +- asynchronix/src/ports/input/model_fn.rs | 55 +- asynchronix/src/ports/output/broadcaster.rs | 14 +- asynchronix/src/ports/source.rs | 5 +- asynchronix/src/ports/source/broadcaster.rs | 14 +- .../src/rpc/codegen/custom_transport.rs | 10 +- asynchronix/src/rpc/codegen/simulation.rs | 178 ++----- asynchronix/src/simulation.rs | 67 ++- .../src/{time => simulation}/scheduler.rs | 386 +------------- asynchronix/src/simulation/sim_init.rs | 15 +- asynchronix/src/time.rs | 19 +- asynchronix/src/util/sync_cell.rs | 9 +- asynchronix/tests/model_scheduling.rs | 44 +- asynchronix/tests/simulation_scheduling.rs | 22 +- 23 files changed, 836 insertions(+), 798 deletions(-) create mode 100644 asynchronix/src/model/context.rs rename asynchronix/src/{time => simulation}/scheduler.rs (57%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e25d852..efb6ebc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: matrix: rust: - stable - - 1.64.0 + - 1.75.0 steps: - name: Checkout sources uses: actions/checkout@v3 diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index e9bef07..a91ff21 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -9,7 +9,7 @@ name = "asynchronix" authors = ["Serge Barral "] version = "0.2.2" edition = "2021" -rust-version = "1.64" +rust-version = "1.75" license = "MIT OR Apache-2.0" repository = "https://github.com/asynchronics/asynchronix" readme = "../README.md" diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index a2c0826..bd6b1a6 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -31,14 +31,12 @@ //! (-) //! ``` -use std::future::Future; -use std::pin::Pin; use std::time::Duration; -use asynchronix::model::{InitializedModel, Model}; +use asynchronix::model::{Context, InitializedModel, Model}; use asynchronix::ports::{EventSlot, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; -use asynchronix::time::{ActionKey, MonotonicTime, Scheduler}; +use asynchronix::simulation::{ActionKey, Mailbox, SimInit}; +use asynchronix::time::MonotonicTime; /// Water pump. pub struct Pump { @@ -122,7 +120,7 @@ impl Controller { } /// Starts brewing or cancels the current brew -- input port. - pub async fn brew_cmd(&mut self, _: (), scheduler: &Scheduler) { + pub async fn brew_cmd(&mut self, _: (), context: &Context) { // If a brew was ongoing, sending the brew command is interpreted as a // request to cancel it. if let Some(key) = self.stop_brew_key.take() { @@ -141,7 +139,7 @@ impl Controller { // Schedule the `stop_brew()` method and turn on the pump. self.stop_brew_key = Some( - scheduler + context .schedule_keyed_event(self.brew_time, Self::stop_brew, ()) .unwrap(), ); @@ -190,7 +188,7 @@ impl Tank { } /// Water volume added [m³] -- input port. - pub async fn fill(&mut self, added_volume: f64, scheduler: &Scheduler) { + pub async fn fill(&mut self, added_volume: f64, context: &Context) { // Ignore zero and negative values. We could also impose a maximum based // on tank capacity. if added_volume <= 0.0 { @@ -208,11 +206,11 @@ impl Tank { state.set_empty_key.cancel(); // Update the volume, saturating at 0 in case of rounding errors. - let time = scheduler.time(); + let time = context.time(); let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); - self.schedule_empty(state.flow_rate, time, scheduler).await; + self.schedule_empty(state.flow_rate, time, context).await; // There is no need to broadcast the state of the water sense since // it could not be previously `Empty` (otherwise the dynamic state @@ -230,10 +228,10 @@ impl Tank { /// # Panics /// /// This method will panic if the flow rate is negative. - pub async fn set_flow_rate(&mut self, flow_rate: f64, scheduler: &Scheduler) { + pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context) { assert!(flow_rate >= 0.0); - let time = scheduler.time(); + let time = context.time(); // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { @@ -245,7 +243,7 @@ impl Tank { self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); } - self.schedule_empty(flow_rate, time, scheduler).await; + self.schedule_empty(flow_rate, time, context).await; } /// Schedules a callback for when the tank becomes empty. @@ -258,7 +256,7 @@ impl Tank { &mut self, flow_rate: f64, time: MonotonicTime, - scheduler: &Scheduler, + context: &Context, ) { // Determine when the tank will be empty at the current flow rate. let duration_until_empty = if self.volume == 0.0 { @@ -275,7 +273,7 @@ impl Tank { let duration_until_empty = Duration::from_secs_f64(duration_until_empty); // Schedule the next update. - match scheduler.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) { + match context.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) { Ok(set_empty_key) => { let state = TankDynamicState { last_volume_update: time, @@ -302,21 +300,16 @@ impl Tank { impl Model for Tank { /// Broadcasts the initial state of the water sense. - fn init( - mut self, - _scheduler: &Scheduler, - ) -> Pin> + Send + '_>> { - Box::pin(async move { - self.water_sense - .send(if self.volume == 0.0 { - WaterSenseState::Empty - } else { - WaterSenseState::NotEmpty - }) - .await; + async fn init(mut self, _: &Context) -> InitializedModel { + self.water_sense + .send(if self.volume == 0.0 { + WaterSenseState::Empty + } else { + WaterSenseState::NotEmpty + }) + .await; - self.into() - }) + self.into() } } diff --git a/asynchronix/examples/power_supply.rs b/asynchronix/examples/power_supply.rs index 4b4930a..bce7b9c 100644 --- a/asynchronix/examples/power_supply.rs +++ b/asynchronix/examples/power_supply.rs @@ -24,7 +24,7 @@ //! │ Power │ ◀current │ │ //! │ supply │ └────────┘ //! │ ├───────────────────────────────▶ Total power -//! └──────────┘ +//! └──────────┘ //! ``` use asynchronix::model::Model; use asynchronix::ports::{EventSlot, Output, Requestor}; diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 9f5d764..3d24221 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -15,13 +15,12 @@ //! ``` use std::future::Future; -use std::pin::Pin; use std::time::Duration; -use asynchronix::model::{InitializedModel, Model}; +use asynchronix::model::{Context, InitializedModel, Model}; use asynchronix::ports::{EventBuffer, Output}; use asynchronix::simulation::{Mailbox, SimInit}; -use asynchronix::time::{MonotonicTime, Scheduler}; +use asynchronix::time::MonotonicTime; /// Stepper motor. pub struct Motor { @@ -88,15 +87,9 @@ impl Motor { impl Model for Motor { /// Broadcasts the initial position of the motor. - fn init( - mut self, - _scheduler: &Scheduler, - ) -> Pin> + Send + '_>> { - Box::pin(async move { - self.position.send(self.pos).await; - - self.into() - }) + async fn init(mut self, _: &Context) -> InitializedModel { + self.position.send(self.pos).await; + self.into() } } @@ -130,7 +123,7 @@ impl Driver { } /// Sets the pulse rate (sign = direction) [Hz] -- input port. - pub async fn pulse_rate(&mut self, pps: f64, scheduler: &Scheduler) { + pub async fn pulse_rate(&mut self, pps: f64, context: &Context) { let pps = pps.signum() * pps.abs().clamp(Self::MIN_PPS, Self::MAX_PPS); if pps == self.pps { return; @@ -142,7 +135,7 @@ impl Driver { // Trigger the rotation if the motor is currently idle. Otherwise the // new value will be accounted for at the next pulse. if is_idle { - self.send_pulse((), scheduler).await; + self.send_pulse((), context).await; } } @@ -153,7 +146,7 @@ impl Driver { fn send_pulse<'a>( &'a mut self, _: (), - scheduler: &'a Scheduler, + context: &'a Context, ) -> impl Future + Send + 'a { async move { let current_out = match self.next_phase { @@ -174,7 +167,7 @@ impl Driver { let pulse_duration = Duration::from_secs_f64(1.0 / self.pps.abs()); // Schedule the next pulse. - scheduler + context .schedule_event(pulse_duration, Self::send_pulse, ()) .unwrap(); } diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index a1e8a43..5fe67b2 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -18,8 +18,7 @@ use recycle_box::RecycleBox; use queue::{PopError, PushError, Queue}; use recycle_box::coerce_box; -use crate::model::Model; -use crate::time::Scheduler; +use crate::model::{Context, Model}; /// Data shared between the receiver and the senders. struct Inner { @@ -45,7 +44,7 @@ impl Inner { } /// A receiver which can asynchronously execute `async` message that take an -/// argument of type `&mut M` and an optional `&Scheduler` argument. +/// argument of type `&mut M` and an optional `&Context` argument. pub(crate) struct Receiver { /// Shared data. inner: Arc>, @@ -90,7 +89,7 @@ impl Receiver { pub(crate) async fn recv( &mut self, model: &mut M, - scheduler: &Scheduler, + context: &Context, ) -> Result<(), RecvError> { let msg = unsafe { self.inner @@ -106,7 +105,7 @@ impl Receiver { match msg { Some(mut msg) => { // Consume the message to obtain a boxed future. - let fut = msg.call_once(model, scheduler, self.future_box.take().unwrap()); + let fut = msg.call_once(model, context, self.future_box.take().unwrap()); // Now that `msg` was consumed and its slot in the queue was // freed, signal to one awaiting sender that one slot is @@ -188,7 +187,7 @@ impl Sender { where F: for<'a> FnOnce( &'a mut M, - &'a Scheduler, + &'a Context, RecycleBox<()>, ) -> RecycleBox + Send + 'a> + Send @@ -311,7 +310,7 @@ impl fmt::Debug for Sender { } /// A closure that can be called once to create a future boxed in a `RecycleBox` -/// from an `&mut M`, a `&Scheduler` and an empty `RecycleBox`. +/// from an `&mut M`, a `&Context` and an empty `RecycleBox`. /// /// This is basically a workaround to emulate an `FnOnce` with the equivalent of /// an `FnMut` so that it is possible to call it as a `dyn` trait stored in a @@ -327,7 +326,7 @@ trait MessageFn: Send { fn call_once<'a>( &mut self, model: &'a mut M, - scheduler: &'a Scheduler, + context: &'a Context, recycle_box: RecycleBox<()>, ) -> RecycleBox + Send + 'a>; } @@ -349,7 +348,7 @@ impl MessageFn for MessageFnOnce where F: for<'a> FnOnce( &'a mut M, - &'a Scheduler, + &'a Context, RecycleBox<()>, ) -> RecycleBox + Send + 'a> + Send, @@ -357,12 +356,12 @@ where fn call_once<'a>( &mut self, model: &'a mut M, - scheduler: &'a Scheduler, + context: &'a Context, recycle_box: RecycleBox<()>, ) -> RecycleBox + Send + 'a> { let closure = self.msg_fn.take().unwrap(); - (closure)(model, scheduler, recycle_box) + (closure)(model, context, recycle_box) } } diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index aeaefad..c9c7c76 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -45,7 +45,7 @@ //! * _input ports_, which are synchronous or asynchronous methods that //! implement the [`InputFn`](ports::InputFn) trait and take an `&mut self` //! argument, a message argument, and an optional -//! [`&Scheduler`](time::Scheduler) argument, +//! [`&Context`](model::Context) argument, //! * _replier ports_, which are similar to input ports but implement the //! [`ReplierFn`](ports::ReplierFn) trait and return a reply. //! @@ -54,12 +54,17 @@ //! are referred to as *requests* and *replies*. //! //! Models must implement the [`Model`](model::Model) trait. The main purpose of -//! this trait is to allow models to specify an `init()` method that is -//! guaranteed to run once and only once when the simulation is initialized, -//! _i.e._ after all models have been connected but before the simulation -//! starts. The `init()` method has a default implementation, so models that do -//! not require initialization can simply implement the trait with a one-liner -//! such as `impl Model for MyModel {}`. +//! this trait is to allow models to specify +//! * a `setup()` method that is called once during model addtion to simulation, +//! this method allows e.g. creation and interconnection of submodels inside +//! the model, +//! * an `init()` method that is guaranteed to run once and only once when the +//! simulation is initialized, _i.e._ after all models have been connected but +//! before the simulation starts. +//! +//! The `setup()` and `init()` methods have default implementations, so models +//! that do not require setup and initialization can simply implement the trait +//! with a one-liner such as `impl Model for MyModel {}`. //! //! #### A simple model //! @@ -93,29 +98,28 @@ //! impl Model for Multiplier {} //! ``` //! -//! #### A model using the local scheduler +//! #### A model using the local context //! //! Models frequently need to schedule actions at a future time or simply get //! access to the current simulation time. To do so, input and replier methods -//! can take an optional argument that gives them access to a local scheduler. +//! can take an optional argument that gives them access to a local context. //! -//! To show how the local scheduler can be used in practice, let us implement +//! To show how the local context can be used in practice, let us implement //! `Delay`, a model which simply forwards its input unmodified after a 1s //! delay: //! //! ``` //! use std::time::Duration; -//! use asynchronix::model::Model; +//! use asynchronix::model::{Context, Model}; //! use asynchronix::ports::Output; -//! use asynchronix::time::Scheduler; //! //! #[derive(Default)] //! pub struct Delay { //! pub output: Output, //! } //! impl Delay { -//! pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! pub fn input(&mut self, value: f64, context: &Context) { +//! context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! } //! //! async fn send(&mut self, value: f64) { @@ -137,7 +141,7 @@ //! [`Address`](simulation::Mailbox)es pointing to that mailbox. //! //! Addresses are used among others to connect models: each output or requestor -//! ports has a `connect()` method that takes as argument a function pointer to +//! port has a `connect()` method that takes as argument a function pointer to //! the corresponding input or replier port method and the address of the //! targeted model. //! @@ -168,9 +172,8 @@ //! ``` //! # mod models { //! # use std::time::Duration; -//! # use asynchronix::model::Model; +//! # use asynchronix::model::{Context, Model}; //! # use asynchronix::ports::Output; -//! # use asynchronix::time::Scheduler; //! # #[derive(Default)] //! # pub struct Multiplier { //! # pub output: Output, @@ -186,8 +189,8 @@ //! # pub output: Output, //! # } //! # impl Delay { -//! # pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! # scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # pub fn input(&mut self, value: f64, context: &Context) { +//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -268,9 +271,8 @@ //! ``` //! # mod models { //! # use std::time::Duration; -//! # use asynchronix::model::Model; +//! # use asynchronix::model::{Context, Model}; //! # use asynchronix::ports::Output; -//! # use asynchronix::time::Scheduler; //! # #[derive(Default)] //! # pub struct Multiplier { //! # pub output: Output, @@ -286,8 +288,8 @@ //! # pub output: Output, //! # } //! # impl Delay { -//! # pub fn input(&mut self, value: f64, scheduler: &Scheduler) { -//! # scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # pub fn input(&mut self, value: f64, context: &Context) { +//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -395,15 +397,14 @@ //! //! * the [`model`] module provides more details about the signatures of input //! and replier port methods and discusses model initialization in the -//! documentation of [`model::Model`], +//! documentation of [`model::Model`] and self-scheduling methods as well as +//! scheduling cancellation in the documentation of [`model::Context`], //! * the [`simulation`] module discusses how the capacity of mailboxes may //! affect the simulation, how connections can be modified after the //! simulation was instantiated, and which pathological situations can lead to //! a deadlock, -//! * the [`time`] module discusses in particular self-scheduling methods and -//! scheduling cancellation in the documentation of [`time::Scheduler`] while -//! the monotonic timestamp format used for simulations is documented in -//! [`time::MonotonicTime`]. +//! * the [`time`] module discusses in particular the monotonic timestamp format +//! used for simulations ([`time::MonotonicTime`]). #![warn(missing_docs, missing_debug_implementations, unreachable_pub)] pub(crate) mod channel; diff --git a/asynchronix/src/model.rs b/asynchronix/src/model.rs index 4fecf87..cda239c 100644 --- a/asynchronix/src/model.rs +++ b/asynchronix/src/model.rs @@ -2,16 +2,19 @@ //! //! # Model trait //! -//! Every model must implement the [`Model`] trait. This trait defines an -//! asynchronous initialization method, [`Model::init()`], which main purpose is -//! to enable models to perform specific actions only once all models have been -//! connected and migrated to the simulation, but before the simulation actually -//! starts. +//! Every model must implement the [`Model`] trait. This trait defines +//! * a setup method, [`Model::setup()`], which main purpose is to create, +//! connect and add to the simulation bench submodels and perform other setup +//! steps, +//! * an asynchronous initialization method, [`Model::init()`], which main +//! purpose is to enable models to perform specific actions only once all +//! models have been connected and migrated to the simulation, but before the +//! simulation actually starts. //! //! #### Examples //! -//! A model that does not require initialization can simply use the default -//! implementation of the `Model` trait: +//! A model that does not require setup and initialization can simply use the +//! default implementation of the `Model` trait: //! //! ``` //! use asynchronix::model::Model; @@ -22,28 +25,31 @@ //! impl Model for MyModel {} //! ``` //! -//! Otherwise, a custom `init()` method can be implemented: +//! Otherwise, custom `setup()` or `init()` methods can be implemented: //! //! ``` //! use std::future::Future; //! use std::pin::Pin; //! -//! use asynchronix::model::{InitializedModel, Model}; -//! use asynchronix::time::Scheduler; +//! use asynchronix::model::{Context, InitializedModel, Model, SetupContext}; //! //! pub struct MyModel { //! // ... //! } //! impl Model for MyModel { -//! fn init( -//! mut self, -//! scheduler: &Scheduler -//! ) -> Pin> + Send + '_>>{ -//! Box::pin(async move { -//! println!("...initialization..."); +//! fn setup( +//! &mut self, +//! setup_context: &SetupContext) { +//! println!("...setup..."); +//! } //! -//! self.into() -//! }) +//! async fn init( +//! mut self, +//! context: &Context +//! ) -> InitializedModel { +//! println!("...initialization..."); +//! +//! self.into() //! } //! } //! ``` @@ -103,17 +109,17 @@ //! ```ignore //! fn(&mut self) // argument elided, implies `T=()` //! fn(&mut self, T) -//! fn(&mut self, T, &Scheduler) +//! fn(&mut self, T, &Context) //! async fn(&mut self) // argument elided, implies `T=()` //! async fn(&mut self, T) -//! async fn(&mut self, T, &Scheduler) +//! async fn(&mut self, T, &Context) //! where //! Self: Model, //! T: Clone + Send + 'static, //! R: Send + 'static, //! ``` //! -//! The scheduler argument is useful for methods that need access to the +//! The context argument is useful for methods that need access to the //! simulation time or that need to schedule an action at a future date. //! //! A replier port for a request of type `T` with a reply of type `R` may in @@ -123,7 +129,7 @@ //! ```ignore //! async fn(&mut self) -> R // argument elided, implies `T=()` //! async fn(&mut self, T) -> R -//! async fn(&mut self, T, &Scheduler) -> R +//! async fn(&mut self, T, &Context) -> R //! where //! Self: Model, //! T: Clone + Send + 'static, @@ -134,7 +140,7 @@ //! can be connected to input and requestor ports when assembling the simulation //! bench. However, input ports may instead be defined as private methods if //! they are only used by the model itself to schedule future actions (see the -//! [`Scheduler`] examples). +//! [`Context`] examples). //! //! Changing the signature of an input or replier port is not considered to //! alter the public interface of a model provided that the event, request and @@ -143,17 +149,16 @@ //! #### Example //! //! ``` -//! use asynchronix::model::Model; -//! use asynchronix::time::Scheduler; +//! use asynchronix::model::{Context, Model}; //! //! pub struct MyModel { //! // ... //! } //! impl MyModel { -//! pub fn my_input(&mut self, input: String, scheduler: &Scheduler) { +//! pub fn my_input(&mut self, input: String, context: &Context) { //! // ... //! } -//! pub async fn my_replier(&mut self, request: u32) -> bool { // scheduler argument elided +//! pub async fn my_replier(&mut self, request: u32) -> bool { // context argument elided //! // ... //! # unimplemented!() //! } @@ -163,14 +168,19 @@ //! use std::future::Future; -use std::pin::Pin; -use crate::time::Scheduler; +pub use context::{Context, SetupContext}; + +mod context; /// Trait to be implemented by all models. /// -/// This trait enables models to perform specific actions in the -/// [`Model::init()`] method only once all models have been connected and +/// This trait enables models to perform specific actions during setup and +/// initialization. The [`Model::setup()`] method is run only once when models +/// are being added to the simulation bench. This method allows in particular +/// sub-models to be created, connected and added to the simulation. +/// +/// The [`Model::init()`] method is run only once all models have been connected and /// migrated to the simulation bench, but before the simulation actually starts. /// A common use for `init` is to send messages to connected models at the /// beginning of the simulation. @@ -179,6 +189,37 @@ use crate::time::Scheduler; /// to prevent an already initialized model from being added to the simulation /// bench. pub trait Model: Sized + Send + 'static { + /// Performs model setup. + /// + /// This method is executed exactly once for all models of the simulation + /// when the [`SimInit::add_model()`](crate::simulation::SimInit::add_model) + /// method is called. + /// + /// The default implementation does nothing. + /// + /// # Examples + /// + /// ``` + /// use std::future::Future; + /// use std::pin::Pin; + /// + /// use asynchronix::model::{InitializedModel, Model, SetupContext}; + /// + /// pub struct MyModel { + /// // ... + /// } + /// + /// impl Model for MyModel { + /// fn setup( + /// &mut self, + /// setup_context: &SetupContext + /// ) { + /// println!("...setup..."); + /// } + /// } + /// ``` + fn setup(&mut self, _: &SetupContext) {} + /// Performs asynchronous model initialization. /// /// This asynchronous method is executed exactly once for all models of the @@ -188,47 +229,31 @@ pub trait Model: Sized + Send + 'static { /// The default implementation simply converts the model to an /// `InitializedModel` without any side effect. /// - /// *Note*: it is currently necessary to box the returned future; this - /// limitation will be lifted once Rust supports `async` methods in traits. - /// /// # Examples /// /// ``` /// use std::future::Future; /// use std::pin::Pin; /// - /// use asynchronix::model::{InitializedModel, Model}; - /// use asynchronix::time::Scheduler; + /// use asynchronix::model::{Context, InitializedModel, Model}; /// /// pub struct MyModel { /// // ... /// } /// /// impl Model for MyModel { - /// fn init( + /// async fn init( /// self, - /// scheduler: &Scheduler - /// ) -> Pin> + Send + '_>>{ - /// Box::pin(async move { - /// println!("...initialization..."); + /// context: &Context + /// ) -> InitializedModel { + /// println!("...initialization..."); /// - /// self.into() - /// }) + /// self.into() /// } /// } /// ``` - - // Removing the boxing constraint requires the - // `return_position_impl_trait_in_trait` and `async_fn_in_trait` features. - // Tracking issue: . - fn init( - self, - scheduler: &Scheduler, - ) -> Pin> + Send + '_>> { - Box::pin(async move { - let _ = scheduler; // suppress the unused argument warning - self.into() - }) + fn init(self, _: &Context) -> impl Future> + Send { + async { self.into() } } } diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs new file mode 100644 index 0000000..24e0c94 --- /dev/null +++ b/asynchronix/src/model/context.rs @@ -0,0 +1,485 @@ +use std::fmt; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::channel::Sender; +use crate::executor::Executor; +use crate::ports::InputFn; +use crate::simulation::{ + self, schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, + schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, ActionKey, + Deadline, Mailbox, SchedulerQueue, SchedulingError, +}; +use crate::time::{MonotonicTime, TearableAtomicTime}; +use crate::util::sync_cell::SyncCellReader; + +use super::Model; + +/// A local context for models. +/// +/// A `Context` is a handle to the global context associated to a model +/// instance. It can be used by the model to retrieve the simulation time or +/// schedule delayed actions on itself. +/// +/// ### Caveat: self-scheduling `async` methods +/// +/// Due to a current rustc issue, `async` methods that schedule themselves will +/// not compile unless an explicit `Send` bound is added to the returned future. +/// This can be done by replacing the `async` signature with a partially +/// desugared signature such as: +/// +/// ```ignore +/// fn self_scheduling_method<'a>( +/// &'a mut self, +/// arg: MyEventType, +/// context: &'a Context +/// ) -> impl Future + Send + 'a { +/// async move { +/// /* implementation */ +/// } +/// } +/// ``` +/// +/// Self-scheduling methods which are not `async` are not affected by this +/// issue. +/// +/// # Examples +/// +/// A model that sends a greeting after some delay. +/// +/// ``` +/// use std::time::Duration; +/// use asynchronix::model::{Context, Model}; +/// use asynchronix::ports::Output; +/// +/// #[derive(Default)] +/// pub struct DelayedGreeter { +/// msg_out: Output, +/// } +/// +/// impl DelayedGreeter { +/// // Triggers a greeting on the output port after some delay [input port]. +/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context) { +/// let time = context.time(); +/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time); +/// +/// if delay.is_zero() { +/// self.msg_out.send(greeting).await; +/// } else { +/// context.schedule_event(delay, Self::send_msg, greeting).unwrap(); +/// } +/// } +/// +/// // Sends a message to the output [private input port]. +/// async fn send_msg(&mut self, msg: String) { +/// self.msg_out.send(msg).await; +/// } +/// } +/// impl Model for DelayedGreeter {} +/// ``` + +// The self-scheduling caveat seems related to this issue: +// https://github.com/rust-lang/rust/issues/78649 +pub struct Context { + sender: Sender, + scheduler_queue: Arc>, + time: SyncCellReader, +} + +impl Context { + /// Creates a new local context. + pub(crate) fn new( + sender: Sender, + scheduler_queue: Arc>, + time: SyncCellReader, + ) -> Self { + Self { + sender, + scheduler_queue, + time, + } + } + + /// Returns the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::time::MonotonicTime; + /// + /// fn is_third_millenium(context: &Context) -> bool { + /// let time = context.time(); + /// time >= MonotonicTime::new(978307200, 0).unwrap() + /// && time < MonotonicTime::new(32535216000, 0).unwrap() + /// } + /// ``` + pub fn time(&self) -> MonotonicTime { + self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") + } + + /// Schedules an event at a future time. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// + /// // A timer. + /// pub struct Timer {} + /// + /// impl Timer { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: Duration, context: &Context) { + /// if context.schedule_event(setting, Self::ring, ()).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Rings [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg"); + /// } + /// } + /// + /// impl Model for Timer {} + /// ``` + pub fn schedule_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + let sender = self.sender.clone(); + schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(()) + } + + /// Schedules a cancellable event at a future time and returns an action + /// key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock that can be cancelled. + /// #[derive(Default)] + /// pub struct CancellableAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { + /// self.cancel(); + /// match context.schedule_keyed_event(setting, Self::ring, ()) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels the current alarm, if any [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Rings the alarm [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg!"); + /// } + /// } + /// + /// impl Model for CancellableAlarmClock {} + /// ``` + pub fn schedule_keyed_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + let sender = self.sender.clone(); + let event_key = + schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); + + Ok(event_key) + } + + /// Schedules a periodically recurring event at a future time. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz. + /// pub struct BeepingAlarmClock {} + /// + /// impl BeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { + /// if context.schedule_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for BeepingAlarmClock {} + /// ``` + pub fn schedule_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = self.sender.clone(); + schedule_periodic_event_at_unchecked( + time, + period, + func, + arg, + sender, + &self.scheduler_queue, + ); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event at a future time + /// and returns an action key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or + /// // stopped after it sets off. + /// #[derive(Default)] + /// pub struct CancellableBeepingAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableBeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { + /// self.cancel(); + /// match context.schedule_keyed_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels or stops the alarm [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for CancellableBeepingAlarmClock {} + /// ``` + pub fn schedule_keyed_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = self.sender.clone(); + let event_key = schedule_periodic_keyed_event_at_unchecked( + time, + period, + func, + arg, + sender, + &self.scheduler_queue, + ); + + Ok(event_key) + } +} + +impl fmt::Debug for Context { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Context").finish_non_exhaustive() + } +} + +/// A setup context for models. +/// +/// A `SetupContext` can be used by models during the setup stage to +/// create submodels and add them to the simulation bench. +/// +/// # Examples +/// +/// A model that contains two connected submodels. +/// +/// ``` +/// use std::time::Duration; +/// use asynchronix::model::{Model, SetupContext}; +/// use asynchronix::ports::Output; +/// use asynchronix::simulation::Mailbox; +/// +/// #[derive(Default)] +/// pub struct SubmodelA { +/// out: Output, +/// } +/// +/// impl Model for SubmodelA {} +/// +/// #[derive(Default)] +/// pub struct SubmodelB {} +/// +/// impl SubmodelB { +/// pub async fn input(&mut self, value: u32) { +/// println!("Received {}", value); +/// } +/// } +/// +/// impl Model for SubmodelB {} +/// +/// #[derive(Default)] +/// pub struct Parent {} +/// +/// impl Model for Parent { +/// fn setup( +/// &mut self, +/// setup_context: &SetupContext) { +/// let mut a = SubmodelA::default(); +/// let b = SubmodelB::default(); +/// let a_mbox = Mailbox::new(); +/// let b_mbox = Mailbox::new(); +/// +/// a.out.connect(SubmodelB::input, &b_mbox); +/// +/// setup_context.add_model(a, a_mbox); +/// setup_context.add_model(b, b_mbox); +/// } +/// } +/// +/// ``` + +#[derive(Debug)] +pub struct SetupContext<'a, M: Model> { + /// Mailbox of the model. + pub mailbox: &'a Mailbox, + context: &'a Context, + executor: &'a Executor, +} + +impl<'a, M: Model> SetupContext<'a, M> { + /// Creates a new local context. + pub(crate) fn new( + mailbox: &'a Mailbox, + context: &'a Context, + executor: &'a Executor, + ) -> Self { + Self { + mailbox, + context, + executor, + } + } + + /// Adds a new model and its mailbox to the simulation bench. + pub fn add_model(&self, model: N, mailbox: Mailbox) { + simulation::add_model( + model, + mailbox, + self.context.scheduler_queue.clone(), + self.context.time.clone(), + self.executor, + ); + } +} diff --git a/asynchronix/src/ports/input/markers.rs b/asynchronix/src/ports/input/markers.rs index d502ca4..44b7f2a 100644 --- a/asynchronix/src/ports/input/markers.rs +++ b/asynchronix/src/ports/input/markers.rs @@ -6,14 +6,14 @@ pub struct WithoutArguments {} /// Marker type for regular simulation model methods that take a mutable -/// reference to the model and a message, without scheduler argument. +/// reference to the model and a message, without context argument. #[derive(Debug)] -pub struct WithoutScheduler {} +pub struct WithoutContext {} /// Marker type for regular simulation model methods that take a mutable -/// reference to the model, a message and an explicit scheduler argument. +/// reference to the model, a message and an explicit context argument. #[derive(Debug)] -pub struct WithScheduler {} +pub struct WithContext {} /// Marker type for asynchronous simulation model methods that take a mutable /// reference to the model, without any other argument. @@ -21,11 +21,11 @@ pub struct WithScheduler {} pub struct AsyncWithoutArguments {} /// Marker type for asynchronous simulation model methods that take a mutable -/// reference to the model and a message, without scheduler argument. +/// reference to the model and a message, without context argument. #[derive(Debug)] -pub struct AsyncWithoutScheduler {} +pub struct AsyncWithoutContext {} /// Marker type for asynchronous simulation model methods that take a mutable -/// reference to the model, a message and an explicit scheduler argument. +/// reference to the model, a message and an explicit context argument. #[derive(Debug)] -pub struct AsyncWithScheduler {} +pub struct AsyncWithContext {} diff --git a/asynchronix/src/ports/input/model_fn.rs b/asynchronix/src/ports/input/model_fn.rs index 5ace206..d9668e5 100644 --- a/asynchronix/src/ports/input/model_fn.rs +++ b/asynchronix/src/ports/input/model_fn.rs @@ -2,8 +2,7 @@ use std::future::{ready, Future, Ready}; -use crate::model::Model; -use crate::time::Scheduler; +use crate::model::{Context, Model}; use super::markers; @@ -15,9 +14,9 @@ use super::markers; /// /// ```ignore /// FnOnce(&mut M, T) -/// FnOnce(&mut M, T, &Scheduler) +/// FnOnce(&mut M, T, &Context) /// async fn(&mut M, T) -/// async fn(&mut M, T, &Scheduler) +/// async fn(&mut M, T, &Context) /// where /// M: Model /// ``` @@ -35,7 +34,7 @@ pub trait InputFn<'a, M: Model, T, S>: Send + 'static { type Future: Future + Send + 'a; /// Calls the method. - fn call(self, model: &'a mut M, arg: T, scheduler: &'a Scheduler) -> Self::Future; + fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future; } impl<'a, M, F> InputFn<'a, M, (), markers::WithoutArguments> for F @@ -45,36 +44,36 @@ where { type Future = Ready<()>; - fn call(self, model: &'a mut M, _arg: (), _scheduler: &'a Scheduler) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { self(model); ready(()) } } -impl<'a, M, T, F> InputFn<'a, M, T, markers::WithoutScheduler> for F +impl<'a, M, T, F> InputFn<'a, M, T, markers::WithoutContext> for F where M: Model, F: FnOnce(&'a mut M, T) + Send + 'static, { type Future = Ready<()>; - fn call(self, model: &'a mut M, arg: T, _scheduler: &'a Scheduler) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { self(model, arg); ready(()) } } -impl<'a, M, T, F> InputFn<'a, M, T, markers::WithScheduler> for F +impl<'a, M, T, F> InputFn<'a, M, T, markers::WithContext> for F where M: Model, - F: FnOnce(&'a mut M, T, &'a Scheduler) + Send + 'static, + F: FnOnce(&'a mut M, T, &'a Context) + Send + 'static, { type Future = Ready<()>; - fn call(self, model: &'a mut M, arg: T, scheduler: &'a Scheduler) -> Self::Future { - self(model, arg, scheduler); + fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { + self(model, arg, context); ready(()) } @@ -88,12 +87,12 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, _arg: (), _scheduler: &'a Scheduler) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { self(model) } } -impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithoutScheduler> for F +impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithoutContext> for F where M: Model, Fut: Future + Send + 'a, @@ -101,21 +100,21 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, _scheduler: &'a Scheduler) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { self(model, arg) } } -impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithScheduler> for F +impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithContext> for F where M: Model, Fut: Future + Send + 'a, - F: FnOnce(&'a mut M, T, &'a Scheduler) -> Fut + Send + 'static, + F: FnOnce(&'a mut M, T, &'a Context) -> Fut + Send + 'static, { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, scheduler: &'a Scheduler) -> Self::Future { - self(model, arg, scheduler) + fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { + self(model, arg, context) } } @@ -127,7 +126,7 @@ where /// /// ```ignore /// async fn(&mut M, T) -> R -/// async fn(&mut M, T, &Scheduler) -> R +/// async fn(&mut M, T, &Context) -> R /// where /// M: Model /// ``` @@ -144,7 +143,7 @@ pub trait ReplierFn<'a, M: Model, T, R, S>: Send + 'static { type Future: Future + Send + 'a; /// Calls the method. - fn call(self, model: &'a mut M, arg: T, scheduler: &'a Scheduler) -> Self::Future; + fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future; } impl<'a, M, R, Fut, F> ReplierFn<'a, M, (), R, markers::AsyncWithoutArguments> for F @@ -155,12 +154,12 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, _arg: (), _scheduler: &'a Scheduler) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { self(model) } } -impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithoutScheduler> for F +impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithoutContext> for F where M: Model, Fut: Future + Send + 'a, @@ -168,20 +167,20 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, _scheduler: &'a Scheduler) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { self(model, arg) } } -impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithScheduler> for F +impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithContext> for F where M: Model, Fut: Future + Send + 'a, - F: FnOnce(&'a mut M, T, &'a Scheduler) -> Fut + Send + 'static, + F: FnOnce(&'a mut M, T, &'a Context) -> Fut + Send + 'static, { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, scheduler: &'a Scheduler) -> Self::Future { - self(model, arg, scheduler) + fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { + self(model, arg, context) } } diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 2f35417..f269312 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -505,7 +505,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::time::Scheduler; + use crate::model::Context; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -563,9 +563,9 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_scheduler = - Scheduler::new(dummy_address, dummy_priority_queue, dummy_time); - block_on(mailbox.recv(&mut counter, &dummy_scheduler)).unwrap(); + let dummy_context = + Context::new(dummy_address, dummy_priority_queue, dummy_time); + block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); } }) }) @@ -614,9 +614,9 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_scheduler = - Scheduler::new(dummy_address, dummy_priority_queue, dummy_time); - block_on(mailbox.recv(&mut counter, &dummy_scheduler)).unwrap(); + let dummy_context = + Context::new(dummy_address, dummy_priority_queue, dummy_time); + block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 5e745ae..6850005 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -8,9 +8,8 @@ use std::time::Duration; use crate::model::Model; use crate::ports::InputFn; use crate::ports::{LineError, LineId}; -use crate::simulation::Address; -use crate::time::{ - Action, ActionKey, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, +use crate::simulation::{ + Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, }; use crate::util::slot; diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index c418b83..d3fb990 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -430,7 +430,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::time::Scheduler; + use crate::model::Context; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -488,9 +488,9 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_scheduler = - Scheduler::new(dummy_address, dummy_priority_queue, dummy_time); - block_on(mailbox.recv(&mut counter, &dummy_scheduler)).unwrap(); + let dummy_context = + Context::new(dummy_address, dummy_priority_queue, dummy_time); + block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); } }) }) @@ -539,9 +539,9 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_scheduler = - Scheduler::new(dummy_address, dummy_priority_queue, dummy_time); - block_on(mailbox.recv(&mut counter, &dummy_scheduler)).unwrap(); + let dummy_context = + Context::new(dummy_address, dummy_priority_queue, dummy_time); + block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) diff --git a/asynchronix/src/rpc/codegen/custom_transport.rs b/asynchronix/src/rpc/codegen/custom_transport.rs index 61eac9d..43a91bd 100644 --- a/asynchronix/src/rpc/codegen/custom_transport.rs +++ b/asynchronix/src/rpc/codegen/custom_transport.rs @@ -11,7 +11,10 @@ pub struct ServerError { #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnyRequest { /// Expects exactly 1 variant. - #[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")] + #[prost( + oneof = "any_request::Request", + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11" + )] pub request: ::core::option::Option, } /// Nested message and enum types in `AnyRequest`. @@ -48,7 +51,10 @@ pub mod any_request { #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnyReply { /// Contains exactly 1 variant. - #[prost(oneof = "any_reply::Reply", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100")] + #[prost( + oneof = "any_reply::Reply", + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100" + )] pub reply: ::core::option::Option, } /// Nested message and enum types in `AnyReply`. diff --git a/asynchronix/src/rpc/codegen/simulation.rs b/asynchronix/src/rpc/codegen/simulation.rs index 26f7518..abe0073 100644 --- a/asynchronix/src/rpc/codegen/simulation.rs +++ b/asynchronix/src/rpc/codegen/simulation.rs @@ -411,31 +411,19 @@ pub mod simulation_server { async fn schedule_event( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn cancel_event( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn process_event( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn process_query( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn read_events( &self, request: tonic::Request, @@ -472,10 +460,7 @@ pub mod simulation_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -531,21 +516,15 @@ pub mod simulation_server { "/simulation.Simulation/Init" => { #[allow(non_camel_case_types)] struct InitSvc(pub Arc); - impl tonic::server::UnaryService - for InitSvc { + impl tonic::server::UnaryService for InitSvc { type Response = super::InitReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::init(&inner, request).await - }; + let fut = async move { ::init(&inner, request).await }; Box::pin(fut) } } @@ -575,21 +554,15 @@ pub mod simulation_server { "/simulation.Simulation/Time" => { #[allow(non_camel_case_types)] struct TimeSvc(pub Arc); - impl tonic::server::UnaryService - for TimeSvc { + impl tonic::server::UnaryService for TimeSvc { type Response = super::TimeReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::time(&inner, request).await - }; + let fut = async move { ::time(&inner, request).await }; Box::pin(fut) } } @@ -619,21 +592,15 @@ pub mod simulation_server { "/simulation.Simulation/Step" => { #[allow(non_camel_case_types)] struct StepSvc(pub Arc); - impl tonic::server::UnaryService - for StepSvc { + impl tonic::server::UnaryService for StepSvc { type Response = super::StepReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::step(&inner, request).await - }; + let fut = async move { ::step(&inner, request).await }; Box::pin(fut) } } @@ -663,23 +630,16 @@ pub mod simulation_server { "/simulation.Simulation/StepUntil" => { #[allow(non_camel_case_types)] struct StepUntilSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for StepUntilSvc { + impl tonic::server::UnaryService for StepUntilSvc { type Response = super::StepUntilReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::step_until(&inner, request).await - }; + let fut = + async move { ::step_until(&inner, request).await }; Box::pin(fut) } } @@ -709,15 +669,11 @@ pub mod simulation_server { "/simulation.Simulation/ScheduleEvent" => { #[allow(non_camel_case_types)] struct ScheduleEventSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ScheduleEventSvc { + impl tonic::server::UnaryService + for ScheduleEventSvc + { type Response = super::ScheduleEventReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -755,15 +711,9 @@ pub mod simulation_server { "/simulation.Simulation/CancelEvent" => { #[allow(non_camel_case_types)] struct CancelEventSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for CancelEventSvc { + impl tonic::server::UnaryService for CancelEventSvc { type Response = super::CancelEventReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -801,15 +751,9 @@ pub mod simulation_server { "/simulation.Simulation/ProcessEvent" => { #[allow(non_camel_case_types)] struct ProcessEventSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ProcessEventSvc { + impl tonic::server::UnaryService for ProcessEventSvc { type Response = super::ProcessEventReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -847,15 +791,9 @@ pub mod simulation_server { "/simulation.Simulation/ProcessQuery" => { #[allow(non_camel_case_types)] struct ProcessQuerySvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ProcessQuerySvc { + impl tonic::server::UnaryService for ProcessQuerySvc { type Response = super::ProcessQueryReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -893,15 +831,9 @@ pub mod simulation_server { "/simulation.Simulation/ReadEvents" => { #[allow(non_camel_case_types)] struct ReadEventsSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ReadEventsSvc { + impl tonic::server::UnaryService for ReadEventsSvc { type Response = super::ReadEventsReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -939,23 +871,16 @@ pub mod simulation_server { "/simulation.Simulation/OpenSink" => { #[allow(non_camel_case_types)] struct OpenSinkSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for OpenSinkSvc { + impl tonic::server::UnaryService for OpenSinkSvc { type Response = super::OpenSinkReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::open_sink(&inner, request).await - }; + let fut = + async move { ::open_sink(&inner, request).await }; Box::pin(fut) } } @@ -985,23 +910,16 @@ pub mod simulation_server { "/simulation.Simulation/CloseSink" => { #[allow(non_camel_case_types)] struct CloseSinkSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for CloseSinkSvc { + impl tonic::server::UnaryService for CloseSinkSvc { type Response = super::CloseSinkReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::close_sink(&inner, request).await - }; + let fut = + async move { ::close_sink(&inner, request).await }; Box::pin(fut) } } @@ -1028,18 +946,14 @@ pub mod simulation_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index af3dfe9..1d8d5f4 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -98,9 +98,9 @@ //! connects or disconnects a port, such as: //! //! ``` -//! # use asynchronix::model::Model; +//! # use asynchronix::model::{Context, Model}; //! # use asynchronix::ports::Output; -//! # use asynchronix::time::{MonotonicTime, Scheduler}; +//! # use asynchronix::time::MonotonicTime; //! # use asynchronix::simulation::{Mailbox, SimInit}; //! # pub struct ModelA { //! # pub output: Output, @@ -123,9 +123,16 @@ //! ); //! ``` mod mailbox; +mod scheduler; mod sim_init; pub use mailbox::{Address, Mailbox}; +pub(crate) use scheduler::{ + schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, + schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, + KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue, +}; +pub use scheduler::{Action, ActionKey, Deadline, SchedulingError}; pub use sim_init::SimInit; use std::error::Error; @@ -137,15 +144,12 @@ use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; use crate::executor::Executor; -use crate::model::Model; +use crate::model::{Context, Model, SetupContext}; use crate::ports::{InputFn, ReplierFn}; -use crate::time::{ - self, Action, ActionKey, Clock, Deadline, MonotonicTime, SchedulerQueue, SchedulingError, - TearableAtomicTime, -}; +use crate::time::{Clock, MonotonicTime, TearableAtomicTime}; use crate::util::seq_futures::SeqFuture; use crate::util::slot; -use crate::util::sync_cell::SyncCell; +use crate::util::sync_cell::{SyncCell, SyncCellReader}; /// Simulation environment. /// @@ -157,9 +161,9 @@ use crate::util::sync_cell::SyncCell; /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation /// itself, but also from models via the optional -/// [`&Scheduler`](time::Scheduler) argument of input and replier port methods. +/// [`&Context`](crate::model::Context) argument of input and replier port methods. /// Likewise, simulation time can be accessed with the [`Simulation::time()`] -/// method, or from models with the [`Scheduler::time()`](time::Scheduler::time) +/// method, or from models with the [`Context::time()`](crate::model::Context::time) /// method. /// /// Events and queries can be scheduled immediately, *i.e.* for the current @@ -177,7 +181,7 @@ use crate::util::sync_cell::SyncCell; /// /// 1. increment simulation time until that of the next scheduled event in /// chronological order, then -/// 2. call [`Clock::synchronize()`](time::Clock::synchronize) which, unless the +/// 2. call [`Clock::synchronize()`](crate::time::Clock::synchronize) which, unless the /// simulation is configured to run as fast as possible, blocks until the /// desired wall clock time, and finally /// 3. run all computations scheduled for the new simulation time. @@ -217,7 +221,7 @@ impl Simulation { /// that event as well as all other event scheduled for the same time. /// /// Processing is gated by a (possibly blocking) call to - /// [`Clock::synchronize()`](time::Clock::synchronize) on the configured + /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured /// simulation clock. This method blocks until all newly processed events /// have completed. pub fn step(&mut self) { @@ -292,7 +296,7 @@ impl Simulation { /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. /// - /// See also: [`time::Scheduler::schedule_event`]. + /// See also: [`Context::schedule_event`](crate::model::Context::schedule_event). pub fn schedule_event( &mut self, deadline: impl Deadline, @@ -311,8 +315,7 @@ impl Simulation { if now >= time { return Err(SchedulingError::InvalidScheduledTime); } - - time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); + schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); Ok(()) } @@ -325,7 +328,7 @@ impl Simulation { /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. /// - /// See also: [`time::Scheduler::schedule_keyed_event`]. + /// See also: [`Context::schedule_keyed_event`](crate::model::Context::schedule_keyed_event). pub fn schedule_keyed_event( &mut self, deadline: impl Deadline, @@ -344,7 +347,7 @@ impl Simulation { if now >= time { return Err(SchedulingError::InvalidScheduledTime); } - let event_key = time::schedule_keyed_event_at_unchecked( + let event_key = schedule_keyed_event_at_unchecked( time, func, arg, @@ -363,7 +366,7 @@ impl Simulation { /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. /// - /// See also: [`time::Scheduler::schedule_periodic_event`]. + /// See also: [`Context::schedule_periodic_event`](crate::model::Context::schedule_periodic_event). pub fn schedule_periodic_event( &mut self, deadline: impl Deadline, @@ -386,7 +389,7 @@ impl Simulation { if period.is_zero() { return Err(SchedulingError::NullRepetitionPeriod); } - time::schedule_periodic_event_at_unchecked( + schedule_periodic_event_at_unchecked( time, period, func, @@ -407,7 +410,7 @@ impl Simulation { /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. /// - /// See also: [`time::Scheduler::schedule_keyed_periodic_event`]. + /// See also: [`Context::schedule_keyed_periodic_event`](crate::model::Context::schedule_keyed_periodic_event). pub fn schedule_keyed_periodic_event( &mut self, deadline: impl Deadline, @@ -430,7 +433,7 @@ impl Simulation { if period.is_zero() { return Err(SchedulingError::NullRepetitionPeriod); } - let event_key = time::schedule_periodic_keyed_event_at_unchecked( + let event_key = schedule_periodic_keyed_event_at_unchecked( time, period, func, @@ -658,3 +661,25 @@ impl fmt::Display for QueryError { } impl Error for QueryError {} + +/// Adds a model and its mailbox to the simulation bench. +pub(crate) fn add_model( + mut model: M, + mailbox: Mailbox, + scheduler_queue: Arc>, + time: SyncCellReader, + executor: &Executor, +) { + let sender = mailbox.0.sender(); + + let context = Context::new(sender, scheduler_queue, time); + let setup_context = SetupContext::new(&mailbox, &context, executor); + + model.setup(&setup_context); + + let mut receiver = mailbox.0; + executor.spawn_and_forget(async move { + let mut model = model.init(&context).await.0; + while receiver.recv(&mut model, &context).await.is_ok() {} + }); +} diff --git a/asynchronix/src/time/scheduler.rs b/asynchronix/src/simulation/scheduler.rs similarity index 57% rename from asynchronix/src/time/scheduler.rs rename to asynchronix/src/simulation/scheduler.rs index 740f644..79a4682 100644 --- a/asynchronix/src/time/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -17,9 +17,8 @@ use crate::channel::Sender; use crate::executor::Executor; use crate::model::Model; use crate::ports::InputFn; -use crate::time::{MonotonicTime, TearableAtomicTime}; +use crate::time::MonotonicTime; use crate::util::priority_queue::PriorityQueue; -use crate::util::sync_cell::SyncCellReader; /// Shorthand for the scheduler queue type. @@ -55,387 +54,6 @@ impl Deadline for MonotonicTime { } } -/// A local scheduler for models. -/// -/// A `Scheduler` is a handle to the global scheduler associated to a model -/// instance. It can be used by the model to retrieve the simulation time or -/// schedule delayed actions on itself. -/// -/// ### Caveat: self-scheduling `async` methods -/// -/// Due to a current rustc issue, `async` methods that schedule themselves will -/// not compile unless an explicit `Send` bound is added to the returned future. -/// This can be done by replacing the `async` signature with a partially -/// desugared signature such as: -/// -/// ```ignore -/// fn self_scheduling_method<'a>( -/// &'a mut self, -/// arg: MyEventType, -/// scheduler: &'a Scheduler -/// ) -> impl Future + Send + 'a { -/// async move { -/// /* implementation */ -/// } -/// } -/// ``` -/// -/// Self-scheduling methods which are not `async` are not affected by this -/// issue. -/// -/// # Examples -/// -/// A model that sends a greeting after some delay. -/// -/// ``` -/// use std::time::Duration; -/// use asynchronix::model::Model; -/// use asynchronix::ports::Output; -/// use asynchronix::time::Scheduler; -/// -/// #[derive(Default)] -/// pub struct DelayedGreeter { -/// msg_out: Output, -/// } -/// -/// impl DelayedGreeter { -/// // Triggers a greeting on the output port after some delay [input port]. -/// pub async fn greet_with_delay(&mut self, delay: Duration, scheduler: &Scheduler) { -/// let time = scheduler.time(); -/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time); -/// -/// if delay.is_zero() { -/// self.msg_out.send(greeting).await; -/// } else { -/// scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap(); -/// } -/// } -/// -/// // Sends a message to the output [private input port]. -/// async fn send_msg(&mut self, msg: String) { -/// self.msg_out.send(msg).await; -/// } -/// } -/// impl Model for DelayedGreeter {} -/// ``` - -// The self-scheduling caveat seems related to this issue: -// https://github.com/rust-lang/rust/issues/78649 -pub struct Scheduler { - sender: Sender, - scheduler_queue: Arc>, - time: SyncCellReader, -} - -impl Scheduler { - /// Creates a new local scheduler. - pub(crate) fn new( - sender: Sender, - scheduler_queue: Arc>, - time: SyncCellReader, - ) -> Self { - Self { - sender, - scheduler_queue, - time, - } - } - - /// Returns the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::Model; - /// use asynchronix::time::{MonotonicTime, Scheduler}; - /// - /// fn is_third_millenium(scheduler: &Scheduler) -> bool { - /// let time = scheduler.time(); - /// time >= MonotonicTime::new(978307200, 0).unwrap() - /// && time < MonotonicTime::new(32535216000, 0).unwrap() - /// } - /// ``` - pub fn time(&self) -> MonotonicTime { - self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") - } - - /// Schedules an event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::Model; - /// use asynchronix::time::Scheduler; - /// - /// // A timer. - /// pub struct Timer {} - /// - /// impl Timer { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: Duration, scheduler: &Scheduler) { - /// if scheduler.schedule_event(setting, Self::ring, ()).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Rings [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg"); - /// } - /// } - /// - /// impl Model for Timer {} - /// ``` - pub fn schedule_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = self.sender.clone(); - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - - Ok(()) - } - - /// Schedules a cancellable event at a future time and returns an action - /// key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::Model; - /// use asynchronix::time::{ActionKey, MonotonicTime, Scheduler}; - /// - /// // An alarm clock that can be cancelled. - /// #[derive(Default)] - /// pub struct CancellableAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { - /// self.cancel(); - /// match scheduler.schedule_keyed_event(setting, Self::ring, ()) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels the current alarm, if any [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Rings the alarm [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg!"); - /// } - /// } - /// - /// impl Model for CancellableAlarmClock {} - /// ``` - pub fn schedule_keyed_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = self.sender.clone(); - let event_key = - schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - - Ok(event_key) - } - - /// Schedules a periodically recurring event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::Model; - /// use asynchronix::time::{MonotonicTime, Scheduler}; - /// - /// // An alarm clock beeping at 1Hz. - /// pub struct BeepingAlarmClock {} - /// - /// impl BeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { - /// if scheduler.schedule_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for BeepingAlarmClock {} - /// ``` - pub fn schedule_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = self.sender.clone(); - schedule_periodic_event_at_unchecked( - time, - period, - func, - arg, - sender, - &self.scheduler_queue, - ); - - Ok(()) - } - - /// Schedules a cancellable, periodically recurring event at a future time - /// and returns an action key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::Model; - /// use asynchronix::time::{ActionKey, MonotonicTime, Scheduler}; - /// - /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or - /// // stopped after it sets off. - /// #[derive(Default)] - /// pub struct CancellableBeepingAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableBeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { - /// self.cancel(); - /// match scheduler.schedule_keyed_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels or stops the alarm [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for CancellableBeepingAlarmClock {} - /// ``` - pub fn schedule_keyed_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = self.sender.clone(); - let event_key = schedule_periodic_keyed_event_at_unchecked( - time, - period, - func, - arg, - sender, - &self.scheduler_queue, - ); - - Ok(event_key) - } -} - -impl fmt::Debug for Scheduler { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Scheduler").finish_non_exhaustive() - } -} - /// Handle to a scheduled action. /// /// An `ActionKey` can be used to cancel a scheduled action. @@ -468,7 +86,7 @@ impl PartialEq for ActionKey { /// Implements equality by considering clones to be equivalent, rather than /// keys with the same `is_cancelled` value. fn eq(&self, other: &Self) -> bool { - ptr::addr_eq(&*self.is_cancelled, &*other.is_cancelled) + ptr::eq(&*self.is_cancelled, &*other.is_cancelled) } } diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 11c774f..ae22589 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -3,12 +3,12 @@ use std::sync::{Arc, Mutex}; use crate::executor::Executor; use crate::model::Model; -use crate::time::{Clock, NoClock, Scheduler}; -use crate::time::{MonotonicTime, SchedulerQueue, TearableAtomicTime}; +use crate::time::{Clock, NoClock}; +use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{Mailbox, Simulation}; +use super::{add_model, Mailbox, SchedulerQueue, Simulation}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -44,15 +44,8 @@ impl SimInit { pub fn add_model(self, model: M, mailbox: Mailbox) -> Self { let scheduler_queue = self.scheduler_queue.clone(); let time = self.time.reader(); - let mut receiver = mailbox.0; - self.executor.spawn_and_forget(async move { - let sender = receiver.sender(); - let scheduler = Scheduler::new(sender, scheduler_queue, time); - let mut model = model.init(&scheduler).await.0; - - while receiver.recv(&mut model, &scheduler).await.is_ok() {} - }); + add_model(model, mailbox, scheduler_queue, time, &self.executor); self } diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index acc2f50..df5a613 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -4,9 +4,7 @@ //! //! * [`MonotonicTime`]: a monotonic timestamp based on the [TAI] time standard, //! * [`Clock`]: a trait for types that can synchronize a simulation, -//! implemented for instance by [`SystemClock`] and [`AutoSystemClock`], -//! * [`Scheduler`]: a model-local handle to the global scheduler that can be -//! used by models to schedule future actions onto themselves. +//! implemented for instance by [`SystemClock`] and [`AutoSystemClock`]. //! //! [TAI]: https://en.wikipedia.org/wiki/International_Atomic_Time //! @@ -17,8 +15,8 @@ //! the specified timestamp. //! //! ``` -//! use asynchronix::model::Model; -//! use asynchronix::time::{MonotonicTime, Scheduler}; +//! use asynchronix::model::{Context, Model}; +//! use asynchronix::time::MonotonicTime; //! //! // An alarm clock model. //! pub struct AlarmClock { @@ -32,8 +30,8 @@ //! } //! //! // Sets an alarm [input port]. -//! pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler) { -//! if scheduler.schedule_event(setting, Self::ring, ()).is_err() { +//! pub fn set(&mut self, setting: MonotonicTime, context: &Context) { +//! if context.schedule_event(setting, Self::ring, ()).is_err() { //! println!("The alarm clock can only be set for a future time"); //! } //! } @@ -49,15 +47,8 @@ mod clock; mod monotonic_time; -mod scheduler; pub use tai_time::MonotonicTime; pub use clock::{AutoSystemClock, Clock, NoClock, SyncStatus, SystemClock}; pub(crate) use monotonic_time::TearableAtomicTime; -pub(crate) use scheduler::{ - schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, - schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, - KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue, -}; -pub use scheduler::{Action, ActionKey, Deadline, Scheduler, SchedulingError}; diff --git a/asynchronix/src/util/sync_cell.rs b/asynchronix/src/util/sync_cell.rs index e4a84e6..01bc8ec 100644 --- a/asynchronix/src/util/sync_cell.rs +++ b/asynchronix/src/util/sync_cell.rs @@ -143,7 +143,6 @@ impl SyncCell { /// A handle to a `SyncCell` that enables synchronized reads from multiple /// threads. -#[derive(Clone)] pub(crate) struct SyncCellReader { inner: Arc>, } @@ -186,6 +185,14 @@ impl SyncCellReader { } } +impl Clone for SyncCellReader { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + /// An error returned when attempting to perform a read operation concurrently /// with a write operation. #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/asynchronix/tests/model_scheduling.rs b/asynchronix/tests/model_scheduling.rs index 50bdce4..3f4afce 100644 --- a/asynchronix/tests/model_scheduling.rs +++ b/asynchronix/tests/model_scheduling.rs @@ -2,10 +2,10 @@ use std::time::Duration; -use asynchronix::model::Model; +use asynchronix::model::{Context, Model}; use asynchronix::ports::{EventBuffer, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; -use asynchronix::time::{ActionKey, MonotonicTime, Scheduler}; +use asynchronix::simulation::{ActionKey, Mailbox, SimInit}; +use asynchronix::time::MonotonicTime; #[test] fn model_schedule_event() { @@ -14,9 +14,9 @@ fn model_schedule_event() { output: Output<()>, } impl TestModel { - fn trigger(&mut self, _: (), scheduler: &Scheduler) { - scheduler - .schedule_event(scheduler.time() + Duration::from_secs(2), Self::action, ()) + fn trigger(&mut self, _: (), context: &Context) { + context + .schedule_event(context.time() + Duration::from_secs(2), Self::action, ()) .unwrap(); } async fn action(&mut self) { @@ -51,12 +51,12 @@ fn model_cancel_future_keyed_event() { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), scheduler: &Scheduler) { - scheduler - .schedule_event(scheduler.time() + Duration::from_secs(1), Self::action1, ()) + fn trigger(&mut self, _: (), context: &Context) { + context + .schedule_event(context.time() + Duration::from_secs(1), Self::action1, ()) .unwrap(); - self.key = scheduler - .schedule_keyed_event(scheduler.time() + Duration::from_secs(2), Self::action2, ()) + self.key = context + .schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ()) .ok(); } async fn action1(&mut self) { @@ -97,12 +97,12 @@ fn model_cancel_same_time_keyed_event() { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), scheduler: &Scheduler) { - scheduler - .schedule_event(scheduler.time() + Duration::from_secs(2), Self::action1, ()) + fn trigger(&mut self, _: (), context: &Context) { + context + .schedule_event(context.time() + Duration::from_secs(2), Self::action1, ()) .unwrap(); - self.key = scheduler - .schedule_keyed_event(scheduler.time() + Duration::from_secs(2), Self::action2, ()) + self.key = context + .schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ()) .ok(); } async fn action1(&mut self) { @@ -142,10 +142,10 @@ fn model_schedule_periodic_event() { output: Output, } impl TestModel { - fn trigger(&mut self, _: (), scheduler: &Scheduler) { - scheduler + fn trigger(&mut self, _: (), context: &Context) { + context .schedule_periodic_event( - scheduler.time() + Duration::from_secs(2), + context.time() + Duration::from_secs(2), Duration::from_secs(3), Self::action, 42, @@ -190,10 +190,10 @@ fn model_cancel_periodic_event() { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), scheduler: &Scheduler) { - self.key = scheduler + fn trigger(&mut self, _: (), context: &Context) { + self.key = context .schedule_keyed_periodic_event( - scheduler.time() + Duration::from_secs(2), + context.time() + Duration::from_secs(2), Duration::from_secs(3), Self::action, (), diff --git a/asynchronix/tests/simulation_scheduling.rs b/asynchronix/tests/simulation_scheduling.rs index 70956a1..6919091 100644 --- a/asynchronix/tests/simulation_scheduling.rs +++ b/asynchronix/tests/simulation_scheduling.rs @@ -2,6 +2,8 @@ use std::time::Duration; +#[cfg(not(miri))] +use asynchronix::model::Context; use asynchronix::model::Model; use asynchronix::ports::{EventBuffer, Output}; use asynchronix::simulation::{Address, Mailbox, SimInit, Simulation}; @@ -219,21 +221,9 @@ impl TimestampModel { } #[cfg(not(miri))] impl Model for TimestampModel { - fn init( - mut self, - _scheduler: &asynchronix::time::Scheduler, - ) -> std::pin::Pin< - Box< - dyn futures_util::Future> - + Send - + '_, - >, - > { - Box::pin(async { - self.stamp.send((Instant::now(), SystemTime::now())).await; - - self.into() - }) + async fn init(mut self, _: &Context) -> asynchronix::model::InitializedModel { + self.stamp.send((Instant::now(), SystemTime::now())).await; + self.into() } } @@ -267,7 +257,7 @@ fn timestamp_bench( #[test] fn simulation_system_clock_from_instant() { let t0 = MonotonicTime::EPOCH; - const TOLERANCE: f64 = 0.0005; // [s] + const TOLERANCE: f64 = 0.005; // [s] // The reference simulation time is set in the past of t0 so that the // simulation starts in the future when the reference wall clock time is