diff --git a/asynchronix-util/examples/observables.rs b/asynchronix-util/examples/observables.rs index 6403d18..7f5ae96 100644 --- a/asynchronix-util/examples/observables.rs +++ b/asynchronix-util/examples/observables.rs @@ -126,17 +126,11 @@ impl Processor { } /// Process data for dt milliseconds. - pub async fn process(&mut self, dt: u64, context: &Context) { + pub async fn process(&mut self, dt: u64, cx: &mut Context) { if matches!(self.state.observe(), ModeId::Idle | ModeId::Processing) { self.state .set(State::Processing( - context - .scheduler - .schedule_keyed_event( - Duration::from_millis(dt), - Self::finish_processing, - (), - ) + cx.schedule_keyed_event(Duration::from_millis(dt), Self::finish_processing, ()) .unwrap() .into_auto(), )) @@ -155,7 +149,7 @@ impl Processor { impl Model for Processor { /// Propagate all internal states. - async fn init(mut self, _: &Context) -> InitializedModel { + async fn init(mut self, _: &mut Context) -> InitializedModel { self.state.propagate().await; self.acc.propagate().await; self.elc.propagate().await; @@ -188,7 +182,10 @@ fn main() -> Result<(), SimulationError> { let t0 = MonotonicTime::EPOCH; // Assembly and initialization. - let mut simu = SimInit::new().add_model(proc, proc_mbox, "proc").init(t0)?; + let mut simu = SimInit::new() + .add_model(proc, proc_mbox, "proc") + .init(t0)? + .0; // ---------- // Simulation. diff --git a/asynchronix/examples/assembly.rs b/asynchronix/examples/assembly.rs index 00b0dbb..a2e1e44 100644 --- a/asynchronix/examples/assembly.rs +++ b/asynchronix/examples/assembly.rs @@ -85,7 +85,7 @@ impl Model for MotorAssembly {} impl ProtoModel for ProtoMotorAssembly { type Model = MotorAssembly; - fn build(self, ctx: &mut BuildContext) -> MotorAssembly { + fn build(self, cx: &mut BuildContext) -> MotorAssembly { let mut assembly = MotorAssembly::new(); let mut motor = Motor::new(self.init_pos); let mut driver = Driver::new(1.0); @@ -105,8 +105,8 @@ impl ProtoModel for ProtoMotorAssembly { motor.position = self.position; // Add the submodels to the simulation. - ctx.add_submodel(driver, driver_mbox, "driver"); - ctx.add_submodel(motor, motor_mbox, "motor"); + cx.add_submodel(driver, driver_mbox, "driver"); + cx.add_submodel(motor, motor_mbox, "motor"); assembly } @@ -133,12 +133,10 @@ fn main() -> Result<(), SimulationError> { let t0 = MonotonicTime::EPOCH; // Assembly and initialization. - let mut simu = SimInit::new() + let (mut simu, scheduler) = SimInit::new() .add_model(assembly, assembly_mbox, "assembly") .init(t0)?; - let scheduler = simu.scheduler(); - // ---------- // Simulation. // ---------- diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index 235dd4f..0904c01 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -120,7 +120,7 @@ impl Controller { } /// Starts brewing or cancels the current brew -- input port. - pub async fn brew_cmd(&mut self, _: (), context: &Context) { + pub async fn brew_cmd(&mut self, _: (), cx: &mut Context) { // If a brew was ongoing, sending the brew command is interpreted as a // request to cancel it. if let Some(key) = self.stop_brew_key.take() { @@ -139,9 +139,7 @@ impl Controller { // Schedule the `stop_brew()` method and turn on the pump. self.stop_brew_key = Some( - context - .scheduler - .schedule_keyed_event(self.brew_time, Self::stop_brew, ()) + cx.schedule_keyed_event(self.brew_time, Self::stop_brew, ()) .unwrap(), ); self.pump_cmd.send(PumpCommand::On).await; @@ -189,7 +187,7 @@ impl Tank { } /// Water volume added [m³] -- input port. - pub async fn fill(&mut self, added_volume: f64, context: &Context) { + pub async fn fill(&mut self, added_volume: f64, cx: &mut Context) { // Ignore zero and negative values. We could also impose a maximum based // on tank capacity. if added_volume <= 0.0 { @@ -207,11 +205,11 @@ impl Tank { state.set_empty_key.cancel(); // Update the volume, saturating at 0 in case of rounding errors. - let time = context.scheduler.time(); + let time = cx.time(); let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); - self.schedule_empty(state.flow_rate, time, context).await; + self.schedule_empty(state.flow_rate, time, cx).await; // There is no need to broadcast the state of the water sense since // it could not be previously `Empty` (otherwise the dynamic state @@ -229,10 +227,10 @@ impl Tank { /// # Panics /// /// This method will panic if the flow rate is negative. - pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context) { + pub async fn set_flow_rate(&mut self, flow_rate: f64, cx: &mut Context) { assert!(flow_rate >= 0.0); - let time = context.scheduler.time(); + let time = cx.time(); // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { @@ -244,7 +242,7 @@ impl Tank { self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); } - self.schedule_empty(flow_rate, time, context).await; + self.schedule_empty(flow_rate, time, cx).await; } /// Schedules a callback for when the tank becomes empty. @@ -257,7 +255,7 @@ impl Tank { &mut self, flow_rate: f64, time: MonotonicTime, - context: &Context, + cx: &mut Context, ) { // Determine when the tank will be empty at the current flow rate. let duration_until_empty = if self.volume == 0.0 { @@ -274,10 +272,7 @@ impl Tank { let duration_until_empty = Duration::from_secs_f64(duration_until_empty); // Schedule the next update. - match context - .scheduler - .schedule_keyed_event(duration_until_empty, Self::set_empty, ()) - { + match cx.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) { Ok(set_empty_key) => { let state = TankDynamicState { last_volume_update: time, @@ -304,7 +299,7 @@ impl Tank { impl Model for Tank { /// Broadcasts the initial state of the water sense. - async fn init(mut self, _: &Context) -> InitializedModel { + async fn init(mut self, _: &mut Context) -> InitializedModel { self.water_sense .send(if self.volume == 0.0 { WaterSenseState::Empty @@ -371,14 +366,12 @@ fn main() -> Result<(), SimulationError> { let t0 = MonotonicTime::EPOCH; // Assembly and initialization. - let mut simu = SimInit::new() + let (mut simu, scheduler) = SimInit::new() .add_model(controller, controller_mbox, "controller") .add_model(pump, pump_mbox, "pump") .add_model(tank, tank_mbox, "tank") .init(t0)?; - let scheduler = simu.scheduler(); - // ---------- // Simulation. // ---------- diff --git a/asynchronix/examples/external_input.rs b/asynchronix/examples/external_input.rs index b72bd7b..b66db9e 100644 --- a/asynchronix/examples/external_input.rs +++ b/asynchronix/examples/external_input.rs @@ -136,11 +136,9 @@ impl Listener { impl Model for Listener { /// Initialize model. - async fn init(self, context: &Context) -> InitializedModel { + async fn init(self, cx: &mut Context) -> InitializedModel { // Schedule periodic function that processes external events. - context - .scheduler - .schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) + cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) .unwrap(); self.into() @@ -212,7 +210,8 @@ fn main() -> Result<(), SimulationError> { let mut simu = SimInit::new() .add_model(listener, listener_mbox, "listener") .set_clock(AutoSystemClock::new()) - .init(t0)?; + .init(t0)? + .0; // ---------- // Simulation. diff --git a/asynchronix/examples/power_supply.rs b/asynchronix/examples/power_supply.rs index e566f33..9d359f2 100644 --- a/asynchronix/examples/power_supply.rs +++ b/asynchronix/examples/power_supply.rs @@ -144,7 +144,8 @@ fn main() -> Result<(), SimulationError> { .add_model(load1, load1_mbox, "load1") .add_model(load2, load2_mbox, "load2") .add_model(load3, load3_mbox, "load3") - .init(t0)?; + .init(t0)? + .0; // ---------- // Simulation. diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 470af09..e1b1ad4 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -90,7 +90,7 @@ impl Motor { impl Model for Motor { /// Broadcasts the initial position of the motor. - async fn init(mut self, _: &Context) -> InitializedModel { + async fn init(mut self, _: &mut Context) -> InitializedModel { self.position.send(self.pos).await; self.into() } @@ -126,7 +126,7 @@ impl Driver { } /// Pulse rate (sign = direction) [Hz] -- input port. - pub async fn pulse_rate(&mut self, pps: f64, context: &Context) { + pub async fn pulse_rate(&mut self, pps: f64, cx: &mut Context) { let pps = pps.signum() * pps.abs().clamp(Self::MIN_PPS, Self::MAX_PPS); if pps == self.pps { return; @@ -138,7 +138,7 @@ impl Driver { // Trigger the rotation if the motor is currently idle. Otherwise the // new value will be accounted for at the next pulse. if is_idle { - self.send_pulse((), context).await; + self.send_pulse((), cx).await; } } @@ -149,7 +149,7 @@ impl Driver { fn send_pulse<'a>( &'a mut self, _: (), - context: &'a Context, + cx: &'a mut Context, ) -> impl Future + Send + 'a { async move { let current_out = match self.next_phase { @@ -170,9 +170,7 @@ impl Driver { let pulse_duration = Duration::from_secs_f64(1.0 / self.pps.abs()); // Schedule the next pulse. - context - .scheduler - .schedule_event(pulse_duration, Self::send_pulse, ()) + cx.schedule_event(pulse_duration, Self::send_pulse, ()) .unwrap(); } } @@ -208,13 +206,11 @@ fn main() -> Result<(), asynchronix::simulation::SimulationError> { let t0 = MonotonicTime::EPOCH; // Assembly and initialization. - let mut simu = SimInit::new() + let (mut simu, scheduler) = SimInit::new() .add_model(driver, driver_mbox, "driver") .add_model(motor, motor_mbox, "motor") .init(t0)?; - let scheduler = simu.scheduler(); - // ---------- // Simulation. // ---------- diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index d7eb413..ccd4c82 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -53,7 +53,7 @@ impl Inner { } /// A receiver which can asynchronously execute `async` message that take an -/// argument of type `&mut M` and an optional `&Context` argument. +/// argument of type `&mut M` and an optional `&mut Context` argument. pub(crate) struct Receiver { /// Shared data. inner: Arc>, @@ -105,7 +105,7 @@ impl Receiver { pub(crate) async fn recv( &mut self, model: &mut M, - context: &Context, + cx: &mut Context, ) -> Result<(), RecvError> { let msg = unsafe { self.inner @@ -124,7 +124,7 @@ impl Receiver { THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1)); // Take the message to obtain a boxed future. - let fut = msg.call_once(model, context, self.future_box.take().unwrap()); + let fut = msg.call_once(model, cx, self.future_box.take().unwrap()); // Now that the message was taken, drop `msg` to free its slot // in the queue and signal to one awaiting sender that a slot is @@ -207,7 +207,7 @@ impl Sender { where F: for<'a> FnOnce( &'a mut M, - &'a Context, + &'a mut Context, RecycleBox<()>, ) -> RecycleBox + Send + 'a> + Send @@ -364,7 +364,7 @@ impl fmt::Debug for Sender { } /// A closure that can be called once to create a future boxed in a `RecycleBox` -/// from an `&mut M`, a `&Context` and an empty `RecycleBox`. +/// from an `&mut M`, a `&mut Context` and an empty `RecycleBox`. /// /// This is basically a workaround to emulate an `FnOnce` with the equivalent of /// an `FnMut` so that it is possible to call it as a `dyn` trait stored in a @@ -380,7 +380,7 @@ trait MessageFn: Send { fn call_once<'a>( &mut self, model: &'a mut M, - context: &'a Context, + cx: &'a mut Context, recycle_box: RecycleBox<()>, ) -> RecycleBox + Send + 'a>; } @@ -402,7 +402,7 @@ impl MessageFn for MessageFnOnce where F: for<'a> FnOnce( &'a mut M, - &'a Context, + &'a mut Context, RecycleBox<()>, ) -> RecycleBox + Send + 'a> + Send, @@ -410,12 +410,12 @@ where fn call_once<'a>( &mut self, model: &'a mut M, - context: &'a Context, + cx: &'a mut Context, recycle_box: RecycleBox<()>, ) -> RecycleBox + Send + 'a> { let closure = self.msg_fn.take().unwrap(); - (closure)(model, context, recycle_box) + (closure)(model, cx, recycle_box) } } diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 69f02c7..418dc0d 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -16,7 +16,7 @@ impl Executor { /// /// The maximum number of threads is set with the `pool_size` parameter. pub fn new(pool_size: usize) -> Self { - let dummy_context = crate::executor::SimulationContext { + let dummy_cx = crate::executor::SimulationContext { #[cfg(feature = "tracing")] time_reader: crate::util::sync_cell::SyncCell::new( crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH), @@ -25,7 +25,7 @@ impl Executor { }; Self(executor::Executor::new_multi_threaded( pool_size, - dummy_context, + dummy_cx, executor::Signal::new(), )) } diff --git a/asynchronix/src/grpc/run.rs b/asynchronix/src/grpc/run.rs index 964e351..656cc5d 100644 --- a/asynchronix/src/grpc/run.rs +++ b/asynchronix/src/grpc/run.rs @@ -102,9 +102,10 @@ impl simulation_server::Simulation for GrpcSimulationService { let (reply, bench) = self.initializer().init(request); - if let Some((simulation, endpoint_registry)) = bench { + if let Some((simulation, scheduler, endpoint_registry)) = bench { *self.controller() = ControllerService::Started { simulation, + scheduler, event_source_registry: endpoint_registry.event_source_registry, query_source_registry: endpoint_registry.query_source_registry, key_registry: KeyRegistry::default(), diff --git a/asynchronix/src/grpc/services.rs b/asynchronix/src/grpc/services.rs index e00613a..2627409 100644 --- a/asynchronix/src/grpc/services.rs +++ b/asynchronix/src/grpc/services.rs @@ -8,7 +8,7 @@ use prost_types::Timestamp; use tai_time::MonotonicTime; use super::codegen::simulation::{Error, ErrorCode}; -use crate::simulation::ExecutionError; +use crate::simulation::{ExecutionError, SchedulingError}; pub(crate) use controller_service::ControllerService; pub(crate) use init_service::InitService; @@ -47,6 +47,18 @@ fn map_execution_error(error: ExecutionError) -> Error { to_error(error_code, error_message) } +/// Map a `SchedulingError` to a Protobuf error. +fn map_scheduling_error(error: SchedulingError) -> Error { + let error_code = match error { + SchedulingError::InvalidScheduledTime => ErrorCode::InvalidDeadline, + SchedulingError::NullRepetitionPeriod => ErrorCode::InvalidPeriod, + }; + + let error_message = error.to_string(); + + to_error(error_code, error_message) +} + /// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. /// /// This will fail if the time is outside the protobuf-specified range for diff --git a/asynchronix/src/grpc/services/controller_service.rs b/asynchronix/src/grpc/services/controller_service.rs index c3f4c83..32f5ac7 100644 --- a/asynchronix/src/grpc/services/controller_service.rs +++ b/asynchronix/src/grpc/services/controller_service.rs @@ -4,12 +4,13 @@ use prost_types::Timestamp; use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::registry::{EventSourceRegistry, QuerySourceRegistry}; -use crate::simulation::Simulation; +use crate::simulation::{Scheduler, Simulation}; use super::super::codegen::simulation::*; use super::{ - map_execution_error, monotonic_to_timestamp, simulation_not_started_error, - timestamp_to_monotonic, to_error, to_positive_duration, to_strictly_positive_duration, + map_execution_error, map_scheduling_error, monotonic_to_timestamp, + simulation_not_started_error, timestamp_to_monotonic, to_error, to_positive_duration, + to_strictly_positive_duration, }; /// Protobuf-based simulation manager. @@ -24,6 +25,7 @@ pub(crate) enum ControllerService { NotStarted, Started { simulation: Simulation, + scheduler: Scheduler, event_source_registry: EventSourceRegistry, query_source_registry: QuerySourceRegistry, key_registry: KeyRegistry, @@ -147,6 +149,7 @@ impl ControllerService { let reply = match self { Self::Started { simulation, + scheduler, event_source_registry, key_registry, .. @@ -224,7 +227,9 @@ impl ControllerService { } }); - simulation.process(action).map_err(map_execution_error)?; + scheduler + .schedule(deadline, action) + .map_err(map_scheduling_error)?; Ok(key_id) }(), diff --git a/asynchronix/src/grpc/services/init_service.rs b/asynchronix/src/grpc/services/init_service.rs index 1e485b0..3e351d2 100644 --- a/asynchronix/src/grpc/services/init_service.rs +++ b/asynchronix/src/grpc/services/init_service.rs @@ -2,8 +2,7 @@ use ciborium; use serde::de::DeserializeOwned; use crate::registry::EndpointRegistry; -use crate::simulation::SimInit; -use crate::simulation::Simulation; +use crate::simulation::{Scheduler, SimInit, Simulation}; use super::{map_execution_error, timestamp_to_monotonic, to_error}; @@ -51,7 +50,7 @@ impl InitService { pub(crate) fn init( &mut self, request: InitRequest, - ) -> (InitReply, Option<(Simulation, EndpointRegistry)>) { + ) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) { let start_time = request.time.unwrap_or_default(); let reply = (self.sim_gen)(&request.cfg) @@ -73,7 +72,7 @@ impl InitService { sim_init .init(start_time) .map_err(map_execution_error) - .map(|sim| (sim, registry)) + .map(|(sim, sched)| (sim, sched, registry)) }) }); diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index 115c70e..028364e 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -45,7 +45,7 @@ //! * _input ports_, which are synchronous or asynchronous methods that //! implement the [`InputFn`](ports::InputFn) trait and take an `&mut self` //! argument, a message argument, and an optional -//! [`&Context`](model::Context) argument, +//! [`&mut Context`](model::Context) argument, //! * _replier ports_, which are similar to input ports but implement the //! [`ReplierFn`](ports::ReplierFn) trait and return a reply. //! @@ -118,8 +118,8 @@ //! pub output: Output, //! } //! impl Delay { -//! pub fn input(&mut self, value: f64, context: &Context) { -//! context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! pub fn input(&mut self, value: f64, cx: &mut Context) { +//! cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! } //! //! async fn send(&mut self, value: f64) { @@ -189,8 +189,8 @@ //! # pub output: Output, //! # } //! # impl Delay { -//! # pub fn input(&mut self, value: f64, context: &Context) { -//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # pub fn input(&mut self, value: f64, cx: &mut Context) { +//! # cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -290,8 +290,8 @@ //! # pub output: Output, //! # } //! # impl Delay { -//! # pub fn input(&mut self, value: f64, context: &Context) { -//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # pub fn input(&mut self, value: f64, cx: &mut Context) { +//! # cx.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -325,7 +325,8 @@ //! # .add_model(multiplier2, multiplier2_mbox, "multiplier2") //! # .add_model(delay1, delay1_mbox, "delay1") //! # .add_model(delay2, delay2_mbox, "delay2") -//! # .init(t0)?; +//! # .init(t0)? +//! # .0; //! // Send a value to the first multiplier. //! simu.process_event(Multiplier::input, 21.0, &input_address)?; //! diff --git a/asynchronix/src/model.rs b/asynchronix/src/model.rs index 1976472..35793fb 100644 --- a/asynchronix/src/model.rs +++ b/asynchronix/src/model.rs @@ -56,7 +56,7 @@ //! impl Model for MyModel { //! async fn init( //! mut self, -//! ctx: &Context +//! ctx: &mut Context //! ) -> InitializedModel { //! println!("...initialization..."); //! @@ -173,10 +173,10 @@ //! ```ignore //! fn(&mut self) // argument elided, implies `T=()` //! fn(&mut self, T) -//! fn(&mut self, T, &Context) +//! fn(&mut self, T, &mut Context) //! async fn(&mut self) // argument elided, implies `T=()` //! async fn(&mut self, T) -//! async fn(&mut self, T, &Context) +//! async fn(&mut self, T, &mut Context) //! where //! Self: Model, //! T: Clone + Send + 'static, @@ -193,7 +193,7 @@ //! ```ignore //! async fn(&mut self) -> R // argument elided, implies `T=()` //! async fn(&mut self, T) -> R -//! async fn(&mut self, T, &Context) -> R +//! async fn(&mut self, T, &mut Context) -> R //! where //! Self: Model, //! T: Clone + Send + 'static, @@ -219,7 +219,7 @@ //! // ... //! } //! impl MyModel { -//! pub fn my_input(&mut self, input: String, context: &Context) { +//! pub fn my_input(&mut self, input: String, cx: &mut Context) { //! // ... //! } //! pub async fn my_replier(&mut self, request: u32) -> bool { // context argument elided @@ -273,7 +273,7 @@ pub trait Model: Sized + Send + 'static { /// impl Model for MyModel { /// async fn init( /// self, - /// context: &Context + /// cx: &mut Context /// ) -> InitializedModel { /// println!("...initialization..."); /// @@ -281,7 +281,7 @@ pub trait Model: Sized + Send + 'static { /// } /// } /// ``` - fn init(self, _: &Context) -> impl Future> + Send { + fn init(self, _: &mut Context) -> impl Future> + Send { async { self.into() } } } @@ -322,7 +322,7 @@ pub trait ProtoModel: Sized { /// This method is invoked when the /// [`SimInit::add_model()`](crate::simulation::SimInit::add_model) or /// [`BuildContext::add_submodel`] method is called. - fn build(self, ctx: &mut BuildContext) -> Self::Model; + fn build(self, cx: &mut BuildContext) -> Self::Model; } // Every model can be used as a prototype for itself. diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index c45ac15..6e8c70b 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -1,7 +1,10 @@ use std::fmt; +use std::time::Duration; use crate::executor::{Executor, Signal}; -use crate::simulation::{self, LocalScheduler, Mailbox}; +use crate::ports::InputFn; +use crate::simulation::{self, ActionKey, Address, GlobalScheduler, Mailbox, SchedulingError}; +use crate::time::{Deadline, MonotonicTime}; use super::{Model, ProtoModel}; @@ -22,7 +25,7 @@ use super::{Model, ProtoModel}; /// fn self_scheduling_method<'a>( /// &'a mut self, /// arg: MyEventType, -/// context: &'a Context +/// cx: &'a mut Context /// ) -> impl Future + Send + 'a { /// async move { /// /* implementation */ @@ -49,14 +52,14 @@ use super::{Model, ProtoModel}; /// /// impl DelayedGreeter { /// // Triggers a greeting on the output port after some delay [input port]. -/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context) { -/// let time = context.scheduler.time(); +/// pub async fn greet_with_delay(&mut self, delay: Duration, cx: &mut Context) { +/// let time = cx.time(); /// let greeting = format!("Hello, this message was scheduled at: {:?}.", time); /// /// if delay.is_zero() { /// self.msg_out.send(greeting).await; /// } else { -/// context.scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap(); +/// cx.schedule_event(delay, Self::send_msg, greeting).unwrap(); /// } /// } /// @@ -72,26 +75,293 @@ use super::{Model, ProtoModel}; // https://github.com/rust-lang/rust/issues/78649 pub struct Context { name: String, - - /// Local scheduler. - pub scheduler: LocalScheduler, + scheduler: GlobalScheduler, + address: Address, + origin_id: usize, } impl Context { /// Creates a new local context. - pub(crate) fn new(name: String, scheduler: LocalScheduler) -> Self { - Self { name, scheduler } + pub(crate) fn new(name: String, scheduler: GlobalScheduler, address: Address) -> Self { + // The only requirement for the origin ID is that it must be (i) + // specific to each model and (ii) different from 0 (which is reserved + // for the global scheduler). The channel ID of the model mailbox + // fulfills this requirement. + let origin_id = address.0.channel_id(); + + Self { + name, + scheduler, + address, + origin_id, + } } - /// Returns the model instance name. + /// Returns the fully qualified model instance name. + /// + /// The fully qualified name is made of the unqualified model name, if + /// relevant prepended by the dot-separated names of all parent models. pub fn name(&self) -> &str { &self.name } + + /// Returns the current simulation time. + pub fn time(&self) -> MonotonicTime { + self.scheduler.time() + } + + /// Schedules an event at a future time on this model. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// + /// // A timer. + /// pub struct Timer {} + /// + /// impl Timer { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: Duration, cx: &mut Context) { + /// if cx.schedule_event(setting, Self::ring, ()).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Rings [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg"); + /// } + /// } + /// + /// impl Model for Timer {} + /// ``` + pub fn schedule_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + self.scheduler + .schedule_event_from(deadline, func, arg, &self.address, self.origin_id) + } + + /// Schedules a cancellable event at a future time on this model and returns + /// an action key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock that can be cancelled. + /// #[derive(Default)] + /// pub struct CancellableAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { + /// self.cancel(); + /// match cx.schedule_keyed_event(setting, Self::ring, ()) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels the current alarm, if any [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Rings the alarm [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg!"); + /// } + /// } + /// + /// impl Model for CancellableAlarmClock {} + /// ``` + pub fn schedule_keyed_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = self.scheduler.schedule_keyed_event_from( + deadline, + func, + arg, + &self.address, + self.origin_id, + )?; + + Ok(event_key) + } + + /// Schedules a periodically recurring event on this model at a future time. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz. + /// pub struct BeepingAlarmClock {} + /// + /// impl BeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { + /// if cx.schedule_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for BeepingAlarmClock {} + /// ``` + pub fn schedule_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + self.scheduler.schedule_periodic_event_from( + deadline, + period, + func, + arg, + &self.address, + self.origin_id, + ) + } + + /// Schedules a cancellable, periodically recurring event on this model at a + /// future time and returns an action key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or + /// // stopped after it sets off. + /// #[derive(Default)] + /// pub struct CancellableBeepingAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableBeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { + /// self.cancel(); + /// match cx.schedule_keyed_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels or stops the alarm [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for CancellableBeepingAlarmClock {} + /// ``` + pub fn schedule_keyed_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = self.scheduler.schedule_keyed_periodic_event_from( + deadline, + period, + func, + arg, + &self.address, + self.origin_id, + )?; + + Ok(event_key) + } } impl fmt::Debug for Context { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Context").finish_non_exhaustive() + f.debug_struct("Context") + .field("name", &self.name()) + .field("time", &self.time()) + .field("address", &self.address) + .field("origin_id", &self.origin_id) + .finish_non_exhaustive() } } @@ -153,7 +423,7 @@ impl fmt::Debug for Context { /// /// fn build( /// self, -/// ctx: &mut BuildContext) +/// cx: &mut BuildContext) /// -> MultiplyBy4 { /// let mut mult = MultiplyBy4 { forward: Output::default() }; /// let mut submult1 = MultiplyBy2::default(); @@ -170,8 +440,8 @@ impl fmt::Debug for Context { /// submult1.output.connect(MultiplyBy2::input, &submult2_mbox); /// /// // Add the submodels to the simulation. -/// ctx.add_submodel(submult1, submult1_mbox, "submultiplier 1"); -/// ctx.add_submodel(submult2, submult2_mbox, "submultiplier 2"); +/// cx.add_submodel(submult1, submult1_mbox, "submultiplier 1"); +/// cx.add_submodel(submult2, submult2_mbox, "submultiplier 2"); /// /// mult /// } @@ -180,9 +450,9 @@ impl fmt::Debug for Context { /// ``` #[derive(Debug)] pub struct BuildContext<'a, P: ProtoModel> { - /// Mailbox of the model. - pub mailbox: &'a Mailbox, - context: &'a Context, + mailbox: &'a Mailbox, + name: &'a String, + scheduler: &'a GlobalScheduler, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, @@ -192,14 +462,16 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { /// Creates a new local context. pub(crate) fn new( mailbox: &'a Mailbox, - context: &'a Context, + name: &'a String, + scheduler: &'a GlobalScheduler, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, ) -> Self { Self { mailbox, - context, + name, + scheduler, executor, abort_signal, model_names, @@ -211,7 +483,12 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { /// The fully qualified name is made of the unqualified model name, if /// relevant prepended by the dot-separated names of all parent models. pub fn name(&self) -> &str { - &self.context.name + self.name + } + + /// Returns a handle to the model's mailbox. + pub fn address(&self) -> Address { + self.mailbox.address() } /// Adds a sub-model to the simulation bench. @@ -232,13 +509,13 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { if submodel_name.is_empty() { submodel_name = String::from(""); }; - submodel_name = self.context.name().to_string() + "." + &submodel_name; + submodel_name = self.name.to_string() + "." + &submodel_name; simulation::add_model( model, mailbox, submodel_name, - self.context.scheduler.scheduler.clone(), + self.scheduler.clone(), self.executor, self.abort_signal, self.model_names, diff --git a/asynchronix/src/ports.rs b/asynchronix/src/ports.rs index 76b691a..f57f93e 100644 --- a/asynchronix/src/ports.rs +++ b/asynchronix/src/ports.rs @@ -69,10 +69,10 @@ //! impl ProtoModel for ProtoParentModel { //! type Model = ParentModel; //! -//! fn build(self, ctx: &mut BuildContext) -> ParentModel { +//! fn build(self, cx: &mut BuildContext) -> ParentModel { //! let mut child = ChildModel::new(self.output.clone()); //! -//! ctx.add_submodel(child, Mailbox::new(), "child"); +//! cx.add_submodel(child, Mailbox::new(), "child"); //! //! ParentModel { output: self.output } //! } @@ -91,11 +91,3 @@ pub use sink::{ event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter, }; pub use source::{EventSource, QuerySource, ReplyReceiver}; - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -/// Unique identifier for a connection between two ports. -pub struct LineId(u64); - -/// Error raised when the specified line cannot be found. -#[derive(Copy, Clone, Debug)] -pub struct LineError {} diff --git a/asynchronix/src/ports/input/model_fn.rs b/asynchronix/src/ports/input/model_fn.rs index d9668e5..550bb85 100644 --- a/asynchronix/src/ports/input/model_fn.rs +++ b/asynchronix/src/ports/input/model_fn.rs @@ -14,9 +14,9 @@ use super::markers; /// /// ```ignore /// FnOnce(&mut M, T) -/// FnOnce(&mut M, T, &Context) +/// FnOnce(&mut M, T, &mut Context) /// async fn(&mut M, T) -/// async fn(&mut M, T, &Context) +/// async fn(&mut M, T, &mut Context) /// where /// M: Model /// ``` @@ -34,7 +34,7 @@ pub trait InputFn<'a, M: Model, T, S>: Send + 'static { type Future: Future + Send + 'a; /// Calls the method. - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future; + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future; } impl<'a, M, F> InputFn<'a, M, (), markers::WithoutArguments> for F @@ -44,7 +44,7 @@ where { type Future = Ready<()>; - fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context) -> Self::Future { self(model); ready(()) @@ -58,7 +58,7 @@ where { type Future = Ready<()>; - fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context) -> Self::Future { self(model, arg); ready(()) @@ -68,12 +68,12 @@ where impl<'a, M, T, F> InputFn<'a, M, T, markers::WithContext> for F where M: Model, - F: FnOnce(&'a mut M, T, &'a Context) + Send + 'static, + F: FnOnce(&'a mut M, T, &'a mut Context) + Send + 'static, { type Future = Ready<()>; - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { - self(model, arg, context); + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future { + self(model, arg, cx); ready(()) } @@ -87,7 +87,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context) -> Self::Future { self(model) } } @@ -100,7 +100,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context) -> Self::Future { self(model, arg) } } @@ -109,12 +109,12 @@ impl<'a, M, T, Fut, F> InputFn<'a, M, T, markers::AsyncWithContext> for F where M: Model, Fut: Future + Send + 'a, - F: FnOnce(&'a mut M, T, &'a Context) -> Fut + Send + 'static, + F: FnOnce(&'a mut M, T, &'a mut Context) -> Fut + Send + 'static, { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { - self(model, arg, context) + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future { + self(model, arg, cx) } } @@ -126,7 +126,7 @@ where /// /// ```ignore /// async fn(&mut M, T) -> R -/// async fn(&mut M, T, &Context) -> R +/// async fn(&mut M, T, &mut Context) -> R /// where /// M: Model /// ``` @@ -143,7 +143,7 @@ pub trait ReplierFn<'a, M: Model, T, R, S>: Send + 'static { type Future: Future + Send + 'a; /// Calls the method. - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future; + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future; } impl<'a, M, R, Fut, F> ReplierFn<'a, M, (), R, markers::AsyncWithoutArguments> for F @@ -154,7 +154,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, _arg: (), _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, _arg: (), _cx: &'a mut Context) -> Self::Future { self(model) } } @@ -167,7 +167,7 @@ where { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, _context: &'a Context) -> Self::Future { + fn call(self, model: &'a mut M, arg: T, _cx: &'a mut Context) -> Self::Future { self(model, arg) } } @@ -176,11 +176,11 @@ impl<'a, M, T, R, Fut, F> ReplierFn<'a, M, T, R, markers::AsyncWithContext> for where M: Model, Fut: Future + Send + 'a, - F: FnOnce(&'a mut M, T, &'a Context) -> Fut + Send + 'static, + F: FnOnce(&'a mut M, T, &'a mut Context) -> Fut + Send + 'static, { type Future = Fut; - fn call(self, model: &'a mut M, arg: T, context: &'a Context) -> Self::Future { - self(model, arg, context) + fn call(self, model: &'a mut M, arg: T, cx: &'a mut Context) -> Self::Future { + self(model, arg, cx) } } diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index 574f6bf..e8dff63 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -4,7 +4,7 @@ mod sender; use std::fmt; use crate::model::Model; -use crate::ports::{EventSink, LineError, LineId}; +use crate::ports::EventSink; use crate::ports::{InputFn, ReplierFn}; use crate::simulation::Address; use crate::util::cached_rw_lock::CachedRwLock; @@ -43,20 +43,20 @@ impl Output { /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of type `T` plus, optionally, a scheduler /// reference. - pub fn connect(&mut self, input: F, address: impl Into>) -> LineId + pub fn connect(&mut self, input: F, address: impl Into>) where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, S: Send + 'static, { let sender = Box::new(InputSender::new(input, address.into().0)); - self.broadcaster.write().unwrap().add(sender) + self.broadcaster.write().unwrap().add(sender); } /// Adds a connection to an event sink such as an /// [`EventSlot`](crate::ports::EventSlot) or /// [`EventBuffer`](crate::ports::EventBuffer). - pub fn connect_sink>(&mut self, sink: &S) -> LineId { + pub fn connect_sink>(&mut self, sink: &S) { let sender = Box::new(EventSinkSender::new(sink.writer())); self.broadcaster.write().unwrap().add(sender) } @@ -70,12 +70,7 @@ impl Output { /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of the type returned by the mapping /// closure plus, optionally, a context reference. - pub fn map_connect( - &mut self, - map: C, - input: F, - address: impl Into>, - ) -> LineId + pub fn map_connect(&mut self, map: C, input: F, address: impl Into>) where M: Model, C: Fn(&T) -> U + Send + Sync + 'static, @@ -84,7 +79,7 @@ impl Output { S: Send + 'static, { let sender = Box::new(MapInputSender::new(map, input, address.into().0)); - self.broadcaster.write().unwrap().add(sender) + self.broadcaster.write().unwrap().add(sender); } /// Adds an auto-converting connection to an event sink such as an @@ -93,14 +88,14 @@ impl Output { /// /// Events are mapped to another type using the closure provided in /// argument. - pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId + pub fn map_connect_sink(&mut self, map: C, sink: &S) where C: Fn(&T) -> U + Send + Sync + 'static, U: Send + 'static, S: EventSink, { let sender = Box::new(MapEventSinkSender::new(map, sink.writer())); - self.broadcaster.write().unwrap().add(sender) + self.broadcaster.write().unwrap().add(sender); } /// Adds an auto-converting, filtered connection to an input port of the @@ -117,8 +112,7 @@ impl Output { filter_map: C, input: F, address: impl Into>, - ) -> LineId - where + ) where M: Model, C: Fn(&T) -> Option + Send + Sync + 'static, F: for<'a> InputFn<'a, M, U, S> + Clone, @@ -130,7 +124,7 @@ impl Output { input, address.into().0, )); - self.broadcaster.write().unwrap().add(sender) + self.broadcaster.write().unwrap().add(sender); } /// Adds an auto-converting connection to an event sink such as an @@ -139,33 +133,14 @@ impl Output { /// /// Events are mapped to another type using the closure provided in /// argument. - pub fn filter_map_connect_sink(&mut self, filter_map: C, sink: &S) -> LineId + pub fn filter_map_connect_sink(&mut self, filter_map: C, sink: &S) where C: Fn(&T) -> Option + Send + Sync + 'static, U: Send + 'static, S: EventSink, { let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer())); - self.broadcaster.write().unwrap().add(sender) - } - - /// Removes the connection specified by the `LineId` parameter. - /// - /// It is a logic error to specify a line identifier from another - /// [`Output`], [`Requestor`], [`EventSource`](crate::ports::EventSource) or - /// [`QuerySource`](crate::ports::QuerySource) instance and may result in - /// the disconnection of an arbitrary endpoint. - pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> { - if self.broadcaster.write().unwrap().remove(line_id) { - Ok(()) - } else { - Err(LineError {}) - } - } - - /// Removes all connections. - pub fn disconnect_all(&mut self) { - self.broadcaster.write().unwrap().clear(); + self.broadcaster.write().unwrap().add(sender); } /// Broadcasts an event to all connected input ports. @@ -219,14 +194,14 @@ impl Requestor { /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` /// plus, optionally, a context reference. - pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId + pub fn connect(&mut self, replier: F, address: impl Into>) where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, S: Send + 'static, { let sender = Box::new(ReplierSender::new(replier, address.into().0)); - self.broadcaster.write().unwrap().add(sender) + self.broadcaster.write().unwrap().add(sender); } /// Adds an auto-converting connection to a replier port of the model @@ -245,8 +220,7 @@ impl Requestor { reply_map: D, replier: F, address: impl Into>, - ) -> LineId - where + ) where M: Model, C: Fn(&T) -> U + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static, @@ -261,7 +235,7 @@ impl Requestor { replier, address.into().0, )); - self.broadcaster.write().unwrap().add(sender) + self.broadcaster.write().unwrap().add(sender); } /// Adds an auto-converting, filtered connection to a replier port of the @@ -280,8 +254,7 @@ impl Requestor { reply_map: D, replier: F, address: impl Into>, - ) -> LineId - where + ) where M: Model, C: Fn(&T) -> Option + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static, @@ -296,26 +269,7 @@ impl Requestor { replier, address.into().0, )); - self.broadcaster.write().unwrap().add(sender) - } - - /// Removes the connection specified by the `LineId` parameter. - /// - /// It is a logic error to specify a line identifier from another - /// [`Requestor`], [`Output`], [`EventSource`](crate::ports::EventSource) or - /// [`QuerySource`](crate::ports::QuerySource) instance and may result in - /// the disconnection of an arbitrary endpoint. - pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> { - if self.broadcaster.write().unwrap().remove(line_id) { - Ok(()) - } else { - Err(LineError {}) - } - } - - /// Removes all connections. - pub fn disconnect_all(&mut self) { - self.broadcaster.write().unwrap().clear(); + self.broadcaster.write().unwrap().add(sender); } /// Broadcasts a query to all connected replier ports. diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 5592406..f271414 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -6,7 +6,6 @@ use std::task::{Context, Poll}; use diatomic_waker::WakeSink; use super::sender::{RecycledFuture, SendError, Sender}; -use super::LineId; use crate::util::task_set::TaskSet; /// An object that can efficiently broadcast messages to several addresses. @@ -24,10 +23,8 @@ use crate::util::task_set::TaskSet; /// - the outputs of all sender futures are returned all at once rather than /// with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { - /// Line identifier for the next port to be connected. - next_line_id: u64, /// The list of senders with their associated line identifier. - senders: Vec<(LineId, Box>)>, + senders: Vec>>, /// Fields explicitly borrowed by the `BroadcastFuture`. shared: Shared, } @@ -38,42 +35,17 @@ impl BroadcasterInner { /// # Panics /// /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>) -> LineId { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; - - self.senders.push((line_id, sender)); + /// `u32::MAX - 1` due to limitations inherent to the task set + /// implementation. + pub(super) fn add(&mut self, sender: Box>) { + assert!(self.senders.len() < (u32::MAX as usize - 2)); + self.senders.push(sender); self.shared.outputs.push(None); // The storage is alway an empty vector so we just book some capacity. if let Some(storage) = self.shared.storage.as_mut() { let _ = storage.try_reserve(self.senders.len()); }; - - line_id - } - - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { - self.senders.swap_remove(pos); - self.shared.outputs.truncate(self.senders.len()); - - return true; - } - - false - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.senders.clear(); - self.shared.outputs.clear(); } /// Returns the number of connected senders. @@ -98,13 +70,13 @@ impl BroadcasterInner { while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - if let Some(fut) = sender.1.send_owned(arg) { + if let Some(fut) = sender.send_owned(arg) { futures.push(fut); } break; } - if let Some(fut) = sender.1.send(&arg) { + if let Some(fut) = sender.send(&arg) { futures.push(fut); } } @@ -120,7 +92,6 @@ impl Default for BroadcasterInner { let wake_src = wake_sink.source(); Self { - next_line_id: 0, senders: Vec::new(), shared: Shared { wake_sink, @@ -135,7 +106,6 @@ impl Default for BroadcasterInner { impl Clone for BroadcasterInner { fn clone(&self) -> Self { Self { - next_line_id: self.next_line_id, senders: self.senders.clone(), shared: self.shared.clone(), } @@ -160,24 +130,12 @@ impl EventBroadcaster { /// # Panics /// /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>) -> LineId { + /// `u32::MAX - 1` due to limitations inherent to the task set + /// implementation. + pub(super) fn add(&mut self, sender: Box>) { self.inner.add(sender) } - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - self.inner.remove(id) - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.inner.clear(); - } - /// Returns the number of connected senders. pub(super) fn len(&self) -> usize { self.inner.len() @@ -190,7 +148,7 @@ impl EventBroadcaster { [] => Ok(()), // One sender at most. - [sender] => match sender.1.send_owned(arg) { + [sender] => match sender.send_owned(arg) { None => Ok(()), Some(fut) => fut.await.map_err(|_| BroadcastError {}), }, @@ -233,24 +191,12 @@ impl QueryBroadcaster { /// # Panics /// /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>) -> LineId { + /// `u32::MAX - 1` due to limitations inherent to the task set + /// implementation. + pub(super) fn add(&mut self, sender: Box>) { self.inner.add(sender) } - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - self.inner.remove(id) - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.inner.clear(); - } - /// Returns the number of connected senders. pub(super) fn len(&self) -> usize { self.inner.len() @@ -267,7 +213,7 @@ impl QueryBroadcaster { // One sender at most. [sender] => { - if let Some(fut) = sender.1.send_owned(arg) { + if let Some(fut) = sender.send_owned(arg) { let output = fut.await.map_err(|_| BroadcastError {})?; self.inner.shared.outputs[0] = Some(output); @@ -567,7 +513,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, Scheduler}; + use crate::simulation::{Address, GlobalScheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -635,14 +581,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); } }) }) @@ -707,17 +651,15 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); }); } }) @@ -769,14 +711,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) @@ -856,25 +796,23 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); }); diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 3a92481..53bbd44 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -7,7 +7,6 @@ use std::time::Duration; use crate::model::Model; use crate::ports::InputFn; -use crate::ports::{LineError, LineId}; use crate::simulation::{ Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, }; @@ -43,14 +42,14 @@ impl EventSource { /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of type `T` plus, optionally, a scheduler /// reference. - pub fn connect(&mut self, input: F, address: impl Into>) -> LineId + pub fn connect(&mut self, input: F, address: impl Into>) where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, S: Send + 'static, { let sender = Box::new(InputSender::new(input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender) + self.broadcaster.lock().unwrap().add(sender); } /// Adds an auto-converting connection to an input port of the model @@ -62,12 +61,7 @@ impl EventSource { /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of the type returned by the mapping closure /// plus, optionally, a context reference. - pub fn map_connect( - &mut self, - map: C, - input: F, - address: impl Into>, - ) -> LineId + pub fn map_connect(&mut self, map: C, input: F, address: impl Into>) where M: Model, C: for<'a> Fn(&'a T) -> U + Send + 'static, @@ -76,7 +70,7 @@ impl EventSource { S: Send + 'static, { let sender = Box::new(MapInputSender::new(map, input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender) + self.broadcaster.lock().unwrap().add(sender); } /// Adds an auto-converting, filtered connection to an input port of the @@ -93,8 +87,7 @@ impl EventSource { map: C, input: F, address: impl Into>, - ) -> LineId - where + ) where M: Model, C: for<'a> Fn(&'a T) -> Option + Send + 'static, F: for<'a> InputFn<'a, M, U, S> + Clone, @@ -102,26 +95,7 @@ impl EventSource { S: Send + 'static, { let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender) - } - - /// Removes the connection specified by the `LineId` parameter. - /// - /// It is a logic error to specify a line identifier from another - /// [`EventSource`], [`QuerySource`], [`Output`](crate::ports::Output) or - /// [`Requestor`](crate::ports::Requestor) instance and may result in the - /// disconnection of an arbitrary endpoint. - pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> { - if self.broadcaster.lock().unwrap().remove(line_id) { - Ok(()) - } else { - Err(LineError {}) - } - } - - /// Removes all connections. - pub fn disconnect_all(&mut self) { - self.broadcaster.lock().unwrap().clear(); + self.broadcaster.lock().unwrap().add(sender); } /// Returns an action which, when processed, broadcasts an event to all @@ -248,14 +222,14 @@ impl QuerySource { /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` /// plus, optionally, a context reference. - pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId + pub fn connect(&mut self, replier: F, address: impl Into>) where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, S: Send + 'static, { let sender = Box::new(ReplierSender::new(replier, address.into().0)); - self.broadcaster.lock().unwrap().add(sender) + self.broadcaster.lock().unwrap().add(sender); } /// Adds an auto-converting connection to a replier port of the model @@ -274,8 +248,7 @@ impl QuerySource { reply_map: D, replier: F, address: impl Into>, - ) -> LineId - where + ) where M: Model, C: for<'a> Fn(&'a T) -> U + Send + 'static, D: Fn(Q) -> R + Send + Sync + 'static, @@ -290,7 +263,7 @@ impl QuerySource { replier, address.into().0, )); - self.broadcaster.lock().unwrap().add(sender) + self.broadcaster.lock().unwrap().add(sender); } /// Adds an auto-converting, filtered connection to a replier port of the @@ -309,8 +282,7 @@ impl QuerySource { reply_map: D, replier: F, address: impl Into>, - ) -> LineId - where + ) where M: Model, C: for<'a> Fn(&'a T) -> Option + Send + 'static, D: Fn(Q) -> R + Send + Sync + 'static, @@ -325,26 +297,7 @@ impl QuerySource { replier, address.into().0, )); - self.broadcaster.lock().unwrap().add(sender) - } - - /// Removes the connection specified by the `LineId` parameter. - /// - /// It is a logic error to specify a line identifier from another - /// [`QuerySource`], [`EventSource`], [`Output`](crate::ports::Output) or - /// [`Requestor`](crate::ports::Requestor) instance and may result in the - /// disconnection of an arbitrary endpoint. - pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> { - if self.broadcaster.lock().unwrap().remove(line_id) { - Ok(()) - } else { - Err(LineError {}) - } - } - - /// Removes all connections. - pub fn disconnect_all(&mut self) { - self.broadcaster.lock().unwrap().clear(); + self.broadcaster.lock().unwrap().add(sender); } /// Returns an action which, when processed, broadcasts a query to all diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 998f693..55cfe4d 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -10,7 +10,6 @@ use diatomic_waker::WakeSink; use super::sender::{Sender, SenderFuture}; -use crate::ports::LineId; use crate::util::task_set::TaskSet; /// An object that can efficiently broadcast messages to several addresses. @@ -24,10 +23,8 @@ use crate::util::task_set::TaskSet; /// does, but the outputs of all sender futures are returned all at once rather /// than with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { - /// Line identifier for the next port to be connected. - next_line_id: u64, /// The list of senders with their associated line identifier. - senders: Vec<(LineId, Box>)>, + senders: Vec>>, } impl BroadcasterInner { @@ -36,34 +33,11 @@ impl BroadcasterInner { /// # Panics /// /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>) -> LineId { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; - - self.senders.push((line_id, sender)); - - line_id - } - - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { - self.senders.swap_remove(pos); - - return true; - } - - false - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.senders.clear(); + /// `u32::MAX - 1` due to limitations inherent to the task set + /// implementation. + pub(super) fn add(&mut self, sender: Box>) { + assert!(self.senders.len() < (u32::MAX as usize - 2)); + self.senders.push(sender); } /// Returns the number of connected senders. @@ -81,12 +55,12 @@ impl BroadcasterInner { while let Some(sender) = iter.next() { // Move the argument for the last future to avoid undue cloning. if iter.len() == 0 { - if let Some(fut) = sender.1.send_owned(arg) { + if let Some(fut) = sender.send_owned(arg) { future_states.push(SenderFutureState::Pending(fut)); } break; } - if let Some(fut) = sender.1.send(&arg) { + if let Some(fut) = sender.send(&arg) { future_states.push(SenderFutureState::Pending(fut)); } } @@ -98,7 +72,6 @@ impl BroadcasterInner { impl Default for BroadcasterInner { fn default() -> Self { Self { - next_line_id: 0, senders: Vec::new(), } } @@ -121,22 +94,10 @@ impl EventBroadcaster { /// # Panics /// /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>) -> LineId { - self.inner.add(sender) - } - - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - self.inner.remove(id) - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.inner.clear(); + /// `u32::MAX - 1` due to limitations inherent to the task set + /// implementation. + pub(super) fn add(&mut self, sender: Box>) { + self.inner.add(sender); } /// Returns the number of connected senders. @@ -159,7 +120,7 @@ impl EventBroadcaster { // No sender. [] => Fut::Empty, // One sender at most. - [sender] => Fut::Single(sender.1.send_owned(arg)), + [sender] => Fut::Single(sender.send_owned(arg)), // Possibly multiple senders. _ => Fut::Multiple(self.inner.futures(arg)), }; @@ -209,22 +170,10 @@ impl QueryBroadcaster { /// # Panics /// /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>) -> LineId { - self.inner.add(sender) - } - - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - self.inner.remove(id) - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.inner.clear(); + /// `u32::MAX - 1` due to limitations inherent to the task set + /// implementation. + pub(super) fn add(&mut self, sender: Box>) { + self.inner.add(sender); } /// Returns the number of connected senders. @@ -247,7 +196,7 @@ impl QueryBroadcaster { // No sender. [] => Fut::Empty, // One sender at most. - [sender] => Fut::Single(sender.1.send_owned(arg)), + [sender] => Fut::Single(sender.send_owned(arg)), // Possibly multiple senders. _ => Fut::Multiple(self.inner.futures(arg)), }; @@ -468,7 +417,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, Scheduler}; + use crate::simulation::{Address, GlobalScheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -536,14 +485,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); } }) }) @@ -608,17 +555,15 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); - mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); + mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); }); } }) @@ -670,14 +615,12 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); - block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) @@ -757,25 +700,23 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_context = Context::new( + let mut dummy_cx = Context::new( String::new(), - LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), - Address(dummy_address), - ), + GlobalScheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), ); block_on(async { mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); mailbox - .recv(&mut double_model, &dummy_context) + .recv(&mut double_model, &mut dummy_cx) .await .unwrap(); }); diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index f4a00b3..e3c949b 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -76,56 +76,18 @@ //! Any deadlocks will be reported as an [`ExecutionError::Deadlock`] error, //! which identifies all involved models and the amount of unprocessed messages //! (events or requests) in their mailboxes. -//! -//! ## Modifying connections during simulation -//! -//! Although uncommon, there is sometimes a need for connecting and/or -//! disconnecting models after they have been migrated to the simulation. -//! Likewise, one may want to connect or disconnect an -//! [`EventSlot`](crate::ports::EventSlot) or -//! [`EventBuffer`](crate::ports::EventBuffer) after the simulation has been -//! instantiated. -//! -//! There is actually a very simple solution to this problem: since the -//! [`InputFn`] trait also matches closures of type `FnOnce(&mut impl Model)`, -//! it is enough to invoke [`Simulation::process_event()`] with a closure that -//! connects or disconnects a port, such as: -//! -//! ``` -//! # use asynchronix::model::{Context, Model}; -//! # use asynchronix::ports::Output; -//! # use asynchronix::time::MonotonicTime; -//! # use asynchronix::simulation::{Mailbox, SimInit}; -//! # pub struct ModelA { -//! # pub output: Output, -//! # } -//! # impl Model for ModelA {}; -//! # pub struct ModelB {} -//! # impl ModelB { -//! # pub fn input(&mut self, value: i32) {} -//! # } -//! # impl Model for ModelB {}; -//! # let modelA_addr = Mailbox::::new().address(); -//! # let modelB_addr = Mailbox::::new().address(); -//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH)?; -//! simu.process_event( -//! |m: &mut ModelA| { -//! m.output.connect(ModelB::input, modelB_addr); -//! }, -//! (), -//! &modelA_addr -//! )?; -//! # Ok::<(), asynchronix::simulation::SimulationError>(()) -//! ``` mod mailbox; mod scheduler; mod sim_init; -pub use mailbox::{Address, Mailbox}; -pub use scheduler::{Action, ActionKey, AutoActionKey, LocalScheduler, Scheduler, SchedulingError}; +use scheduler::SchedulerQueue; + pub(crate) use scheduler::{ - KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue, + GlobalScheduler, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, }; + +pub use mailbox::{Address, Mailbox}; +pub use scheduler::{Action, ActionKey, AutoActionKey, Scheduler, SchedulingError}; pub use sim_init::SimInit; use std::any::Any; @@ -161,11 +123,11 @@ thread_local! { pub(crate) static CURRENT_MODEL_ID: Cell = const { Cell /// /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation -/// itself, but also from models via the optional -/// [`&Context`](crate::model::Context) argument of input and replier port -/// methods. Likewise, simulation time can be accessed with the -/// [`Simulation::time()`] method, or from models with the -/// [`LocalScheduler::time()`](crate::simulation::LocalScheduler::time) method. +/// itself, but also from models via the optional [`&mut +/// Context`](crate::model::Context) argument of input and replier port methods. +/// Likewise, simulation time can be accessed with the [`Simulation::time()`] +/// method, or from models with the +/// [`Context::time()`](crate::simulation::Context::time) method. /// /// Events and queries can be scheduled immediately, *i.e.* for the current /// simulation time, using [`process_event()`](Simulation::process_event) and @@ -273,11 +235,6 @@ impl Simulation { self.step_until_unchecked(target_time) } - /// Returns an owned scheduler handle. - pub fn scheduler(&self) -> Scheduler { - Scheduler::new(self.scheduler_queue.clone(), self.time.reader()) - } - /// Processes an action immediately, blocking until completion. /// /// Simulation time remains unchanged. The periodicity of the action, if @@ -461,13 +418,13 @@ impl Simulation { let action = pull_next_action(&mut scheduler_queue); let mut next_key = peek_next_key(&mut scheduler_queue); if next_key != Some(current_key) { - // Since there are no other actions targeting the same mailbox - // and the same time, the action is spawned immediately. + // Since there are no other actions with the same origin and the + // same time, the action is spawned immediately. action.spawn_and_forget(&self.executor); } else { // To ensure that their relative order of execution is - // preserved, all actions targeting the same mailbox are - // executed sequentially within a single compound future. + // preserved, all actions with the same origin are executed + // sequentially within a single compound future. let mut action_sequence = SeqFuture::new(); action_sequence.push(action.into_future()); loop { @@ -717,7 +674,7 @@ pub(crate) fn add_model( model: P, mailbox: Mailbox, name: String, - scheduler: Scheduler, + scheduler: GlobalScheduler, executor: &Executor, abort_signal: &Signal, model_names: &mut Vec, @@ -725,21 +682,23 @@ pub(crate) fn add_model( #[cfg(feature = "tracing")] let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); - let context = Context::new( - name.clone(), - LocalScheduler::new(scheduler, mailbox.address()), + let mut build_cx = BuildContext::new( + &mailbox, + &name, + &scheduler, + executor, + abort_signal, + model_names, ); - let mut build_context = - BuildContext::new(&mailbox, &context, executor, abort_signal, model_names); - - let model = model.build(&mut build_context); + let model = model.build(&mut build_cx); + let address = mailbox.address(); let mut receiver = mailbox.0; let abort_signal = abort_signal.clone(); - + let mut cx = Context::new(name.clone(), scheduler, address); let fut = async move { - let mut model = model.init(&context).await.0; - while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {} + let mut model = model.init(&mut cx).await.0; + while !abort_signal.is_set() && receiver.recv(&mut model, &mut cx).await.is_ok() {} }; let model_id = ModelId::new(model_names.len()); diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index 1cfc394..597a6fb 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -1,5 +1,4 @@ //! Scheduling functions and types. - use std::error::Error; use std::future::Future; use std::hash::{Hash, Hasher}; @@ -21,19 +20,15 @@ use crate::simulation::Address; use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; use crate::util::priority_queue::PriorityQueue; -/// Scheduler. +const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; + +/// A global scheduler. #[derive(Clone)] -pub struct Scheduler { - scheduler_queue: Arc>, - time: AtomicTimeReader, -} +pub struct Scheduler(GlobalScheduler); impl Scheduler { pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { - Self { - scheduler_queue, - time, - } + Self(GlobalScheduler::new(scheduler_queue, time)) } /// Returns the current simulation time. @@ -51,7 +46,7 @@ impl Scheduler { /// } /// ``` pub fn time(&self) -> MonotonicTime { - self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") + self.0.time() } /// Schedules an action at a future time. @@ -63,29 +58,8 @@ impl Scheduler { /// model, these events are guaranteed to be processed according to the /// scheduling order of the actions. pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> { - // The scheduler queue must always be locked when reading the time, - // otherwise the following race could occur: - // 1) this method reads the time and concludes that it is not too late - // to schedule the action, - // 2) the `Simulation` object takes the lock, increments simulation time - // and runs the simulation step, - // 3) this method takes the lock and schedules the now-outdated action. - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - // The channel ID is set to the same value for all actions. This - // ensures that the relative scheduling order of all source events is - // preserved, which is important if some of them target the same models. - // The value 0 was chosen as it prevents collisions with channel IDs as - // the latter are always non-zero. - scheduler_queue.insert((time, 0), action); - - Ok(()) + self.0 + .schedule_from(deadline, action, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules an event at a future time. @@ -95,8 +69,6 @@ impl Scheduler { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`LocalScheduler::schedule_event`](LocalScheduler::schedule_event). pub fn schedule_event( &self, deadline: impl Deadline, @@ -110,19 +82,8 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(OnceAction::new(process_event(func, arg, sender))); - - scheduler_queue.insert((time, channel_id), action); - - Ok(()) + self.0 + .schedule_event_from(deadline, func, arg, address, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules a cancellable event at a future time and returns an event key. @@ -132,8 +93,6 @@ impl Scheduler { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`LocalScheduler::schedule_keyed_event`](LocalScheduler::schedule_keyed_event). pub fn schedule_keyed_event( &self, deadline: impl Deadline, @@ -147,23 +106,8 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(KeyedOnceAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - event_key.clone(), - )); - - scheduler_queue.insert((time, channel_id), action); - - Ok(event_key) + self.0 + .schedule_keyed_event_from(deadline, func, arg, address, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules a periodically recurring event at a future time. @@ -173,8 +117,6 @@ impl Scheduler { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`LocalScheduler::schedule_periodic_event`](LocalScheduler::schedule_periodic_event). pub fn schedule_periodic_event( &self, deadline: impl Deadline, @@ -189,26 +131,14 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = address.into().0; - let channel_id = sender.channel_id(); - - let action = Action::new(PeriodicAction::new( - || process_event(func, arg, sender), + self.0.schedule_periodic_event_from( + deadline, period, - )); - - scheduler_queue.insert((time, channel_id), action); - - Ok(()) + func, + arg, + address, + GLOBAL_SCHEDULER_ORIGIN_ID, + ) } /// Schedules a cancellable, periodically recurring event at a future time @@ -219,8 +149,6 @@ impl Scheduler { /// /// Events scheduled for the same time and targeting the same model are /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`LocalScheduler::schedule_keyed_periodic_event`](LocalScheduler::schedule_keyed_periodic_event). pub fn schedule_keyed_periodic_event( &self, deadline: impl Deadline, @@ -235,26 +163,14 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(KeyedPeriodicAction::new( - |ek| send_keyed_event(ek, func, arg, sender), + self.0.schedule_keyed_periodic_event_from( + deadline, period, - event_key.clone(), - )); - scheduler_queue.insert((time, channel_id), action); - - Ok(event_key) + func, + arg, + address, + GLOBAL_SCHEDULER_ORIGIN_ID, + ) } } @@ -266,297 +182,6 @@ impl fmt::Debug for Scheduler { } } -/// Local scheduler. -pub struct LocalScheduler { - pub(crate) scheduler: Scheduler, - address: Address, -} - -impl LocalScheduler { - pub(crate) fn new(scheduler: Scheduler, address: Address) -> Self { - Self { scheduler, address } - } - - /// Returns the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::Model; - /// use asynchronix::simulation::LocalScheduler; - /// use asynchronix::time::MonotonicTime; - /// - /// fn is_third_millenium(scheduler: &LocalScheduler) -> bool { - /// let time = scheduler.time(); - /// time >= MonotonicTime::new(978307200, 0).unwrap() - /// && time < MonotonicTime::new(32535216000, 0).unwrap() - /// } - /// ``` - pub fn time(&self) -> MonotonicTime { - self.scheduler.time() - } - - /// Schedules an event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// - /// // A timer. - /// pub struct Timer {} - /// - /// impl Timer { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: Duration, context: &Context) { - /// if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Rings [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg"); - /// } - /// } - /// - /// impl Model for Timer {} - /// ``` - pub fn schedule_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - self.scheduler - .schedule_event(deadline, func, arg, &self.address) - } - - /// Schedules a cancellable event at a future time and returns an action - /// key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::simulation::ActionKey; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock that can be cancelled. - /// #[derive(Default)] - /// pub struct CancellableAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// self.cancel(); - /// match context.scheduler.schedule_keyed_event(setting, Self::ring, ()) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels the current alarm, if any [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Rings the alarm [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg!"); - /// } - /// } - /// - /// impl Model for CancellableAlarmClock {} - /// ``` - pub fn schedule_keyed_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let event_key = self - .scheduler - .schedule_keyed_event(deadline, func, arg, &self.address)?; - - Ok(event_key) - } - - /// Schedules a periodically recurring event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock beeping at 1Hz. - /// pub struct BeepingAlarmClock {} - /// - /// impl BeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// if context.scheduler.schedule_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for BeepingAlarmClock {} - /// ``` - pub fn schedule_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - self.scheduler - .schedule_periodic_event(deadline, period, func, arg, &self.address) - } - - /// Schedules a cancellable, periodically recurring event at a future time - /// and returns an action key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::simulation::ActionKey; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or - /// // stopped after it sets off. - /// #[derive(Default)] - /// pub struct CancellableBeepingAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableBeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// self.cancel(); - /// match context.scheduler.schedule_keyed_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels or stops the alarm [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for CancellableBeepingAlarmClock {} - /// ``` - pub fn schedule_keyed_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let event_key = self.scheduler.schedule_keyed_periodic_event( - deadline, - period, - func, - arg, - &self.address, - )?; - - Ok(event_key) - } -} - -impl Clone for LocalScheduler { - fn clone(&self) -> Self { - Self { - scheduler: self.scheduler.clone(), - address: self.address.clone(), - } - } -} - -impl fmt::Debug for LocalScheduler { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LocalScheduler") - .field("time", &self.time()) - .field("address", &self.address) - .finish_non_exhaustive() - } -} - -/// Shorthand for the scheduler queue type. - -// Why use both time and channel ID as the key? The short answer is that this -// ensures that events targeting the same model are sent in the order they were -// scheduled. More precisely, this ensures that events targeting the same model -// are ordered contiguously in the priority queue, which in turns allows the -// event loop to easily aggregate such events into single futures and thus -// control their relative order of execution. -pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; - /// Managed handle to a scheduled action. /// /// An `AutoActionKey` is a managed handle to a scheduled action that cancels @@ -698,6 +323,228 @@ impl fmt::Debug for Action { } } +/// Alias for the scheduler queue type. +/// +/// Why use both time and origin ID as the key? The short answer is that this +/// allows to preserve the relative ordering of events which have the same +/// origin (where the origin is either a model instance or the global +/// scheduler). The preservation of this ordering is implemented by the event +/// loop, which aggregate events with the same origin into single sequential +/// futures, thus ensuring that they are not executed concurrently. +pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; + +/// Internal implementation of the global scheduler. +#[derive(Clone)] +pub(crate) struct GlobalScheduler { + scheduler_queue: Arc>, + time: AtomicTimeReader, +} + +impl GlobalScheduler { + pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { + Self { + scheduler_queue, + time, + } + } + + /// Returns the current simulation time. + pub(crate) fn time(&self) -> MonotonicTime { + // We use `read` rather than `try_read` because the scheduler can be + // sent to another thread than the simulator's and could thus + // potentially see a torn read if the simulator increments time + // concurrently. The chances of this happening are very small since + // simulation time is not changed frequently. + self.time.read() + } + + /// Schedules an action identified by its origin at a future time. + pub(crate) fn schedule_from( + &self, + deadline: impl Deadline, + action: Action, + origin_id: usize, + ) -> Result<(), SchedulingError> { + // The scheduler queue must always be locked when reading the time, + // otherwise the following race could occur: + // 1) this method reads the time and concludes that it is not too late + // to schedule the action, + // 2) the `Simulation` object takes the lock, increments simulation time + // and runs the simulation step, + // 3) this method takes the lock and schedules the now-outdated action. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules an event identified by its origin at a future time. + pub(crate) fn schedule_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let sender = address.into().0; + let action = Action::new(OnceAction::new(process_event(func, arg, sender))); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable event identified by its origin at a future time + /// and returns an event key. + pub(crate) fn schedule_keyed_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedOnceAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } + + /// Schedules a periodically recurring event identified by its origin at a + /// future time. + pub(crate) fn schedule_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = address.into().0; + let action = Action::new(PeriodicAction::new( + || process_event(func, arg, sender), + period, + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event identified by its + /// origin at a future time and returns an event key. + pub(crate) fn schedule_keyed_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedPeriodicAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + period, + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } +} + +impl fmt::Debug for GlobalScheduler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SchedulerInner") + .field("time", &self.time()) + .finish_non_exhaustive() + } +} + /// Trait abstracting over the inner type of an action. pub(crate) trait ActionInner: Send + 'static { /// Reports whether the action was cancelled. @@ -717,7 +564,6 @@ pub(crate) trait ActionInner: Send + 'static { fn spawn_and_forget(self: Box, executor: &Executor); } -#[pin_project] /// An object that can be converted to a future performing a single /// non-cancellable action. /// @@ -725,6 +571,7 @@ pub(crate) trait ActionInner: Send + 'static { /// future cannot be cancelled and the action does not need to be cloned, /// there is no need to defer the construction of the future. This makes /// `into_future` a trivial cast, which saves a boxing operation. +#[pin_project] pub(crate) struct OnceAction { #[pin] fut: F, diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 793b635..273b256 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -10,7 +10,10 @@ use crate::time::{Clock, NoClock}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Signal, Simulation}; +use super::{ + add_model, ExecutionError, GlobalScheduler, Mailbox, Scheduler, SchedulerQueue, Signal, + Simulation, +}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -88,7 +91,7 @@ impl SimInit { }; self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); - let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); + let scheduler = GlobalScheduler::new(self.scheduler_queue.clone(), self.time.reader()); add_model( model, @@ -144,10 +147,17 @@ impl SimInit { /// Builds a simulation initialized at the specified simulation time, /// executing the [`Model::init()`](crate::model::Model::init) method on all /// model initializers. - pub fn init(mut self, start_time: MonotonicTime) -> Result { + /// + /// The simulation object and its associated scheduler are returned upon + /// success. + pub fn init( + mut self, + start_time: MonotonicTime, + ) -> Result<(Simulation, Scheduler), ExecutionError> { self.time.write(start_time); self.clock.synchronize(start_time); + let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); let mut simulation = Simulation::new( self.executor, self.scheduler_queue, @@ -160,7 +170,7 @@ impl SimInit { ); simulation.run()?; - Ok(simulation) + Ok((simulation, scheduler)) } } diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index ff91963..5bcdebb 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -30,8 +30,8 @@ //! } //! //! // Sets an alarm [input port]. -//! pub fn set(&mut self, setting: MonotonicTime, context: &Context) { -//! if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() { +//! pub fn set(&mut self, setting: MonotonicTime, cx: &mut Context) { +//! if cx.schedule_event(setting, Self::ring, ()).is_err() { //! println!("The alarm clock can only be set for a future time"); //! } //! } diff --git a/asynchronix/src/util/sync_cell.rs b/asynchronix/src/util/sync_cell.rs index 01bc8ec..075d2b6 100644 --- a/asynchronix/src/util/sync_cell.rs +++ b/asynchronix/src/util/sync_cell.rs @@ -183,6 +183,15 @@ impl SyncCellReader { Err(SyncCellReadError {}) } } + + /// Performs a synchronized read by spinning on `try_read`. + pub(crate) fn read(&self) -> T::Value { + loop { + if let Ok(value) = self.try_read() { + return value; + } + } + } } impl Clone for SyncCellReader { diff --git a/asynchronix/tests/integration/model_scheduling.rs b/asynchronix/tests/integration/model_scheduling.rs index 6d841f2..da405d5 100644 --- a/asynchronix/tests/integration/model_scheduling.rs +++ b/asynchronix/tests/integration/model_scheduling.rs @@ -15,14 +15,8 @@ fn model_schedule_event(num_threads: usize) { output: Output<()>, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action, - (), - ) + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_event(Duration::from_secs(2), Self::action, ()) .unwrap(); } async fn action(&mut self) { @@ -42,7 +36,8 @@ fn model_schedule_event(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .init(t0) - .unwrap(); + .unwrap() + .0; simu.process_event(TestModel::trigger, (), addr).unwrap(); simu.step().unwrap(); @@ -59,22 +54,11 @@ fn model_cancel_future_keyed_event(num_threads: usize) { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_event( - context.scheduler.time() + Duration::from_secs(1), - Self::action1, - (), - ) + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_event(Duration::from_secs(1), Self::action1, ()) .unwrap(); - self.key = context - .scheduler - .schedule_keyed_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action2, - (), - ) + self.key = cx + .schedule_keyed_event(Duration::from_secs(2), Self::action2, ()) .ok(); } async fn action1(&mut self) { @@ -99,7 +83,8 @@ fn model_cancel_future_keyed_event(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .init(t0) - .unwrap(); + .unwrap() + .0; simu.process_event(TestModel::trigger, (), addr).unwrap(); simu.step().unwrap(); @@ -117,22 +102,11 @@ fn model_cancel_same_time_keyed_event(num_threads: usize) { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action1, - (), - ) + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_event(Duration::from_secs(2), Self::action1, ()) .unwrap(); - self.key = context - .scheduler - .schedule_keyed_event( - context.scheduler.time() + Duration::from_secs(2), - Self::action2, - (), - ) + self.key = cx + .schedule_keyed_event(Duration::from_secs(2), Self::action2, ()) .ok(); } async fn action1(&mut self) { @@ -157,7 +131,8 @@ fn model_cancel_same_time_keyed_event(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .init(t0) - .unwrap(); + .unwrap() + .0; simu.process_event(TestModel::trigger, (), addr).unwrap(); simu.step().unwrap(); @@ -174,16 +149,14 @@ fn model_schedule_periodic_event(num_threads: usize) { output: Output, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - context - .scheduler - .schedule_periodic_event( - context.scheduler.time() + Duration::from_secs(2), - Duration::from_secs(3), - Self::action, - 42, - ) - .unwrap(); + fn trigger(&mut self, _: (), cx: &mut Context) { + cx.schedule_periodic_event( + Duration::from_secs(2), + Duration::from_secs(3), + Self::action, + 42, + ) + .unwrap(); } async fn action(&mut self, payload: i32) { self.output.send(payload).await; @@ -202,7 +175,8 @@ fn model_schedule_periodic_event(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .init(t0) - .unwrap(); + .unwrap() + .0; simu.process_event(TestModel::trigger, (), addr).unwrap(); @@ -225,11 +199,10 @@ fn model_cancel_periodic_event(num_threads: usize) { key: Option, } impl TestModel { - fn trigger(&mut self, _: (), context: &Context) { - self.key = context - .scheduler + fn trigger(&mut self, _: (), cx: &mut Context) { + self.key = cx .schedule_keyed_periodic_event( - context.scheduler.time() + Duration::from_secs(2), + Duration::from_secs(2), Duration::from_secs(3), Self::action, (), @@ -255,7 +228,8 @@ fn model_cancel_periodic_event(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .init(t0) - .unwrap(); + .unwrap() + .0; simu.process_event(TestModel::trigger, (), addr).unwrap(); diff --git a/asynchronix/tests/integration/simulation_clock_sync.rs b/asynchronix/tests/integration/simulation_clock_sync.rs index 1fbe9cc..b819247 100644 --- a/asynchronix/tests/integration/simulation_clock_sync.rs +++ b/asynchronix/tests/integration/simulation_clock_sync.rs @@ -39,14 +39,13 @@ fn clock_sync( let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::with_num_threads(num_threads) + let (mut simu, scheduler) = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "test") .set_clock(clock) .set_clock_tolerance(clock_tolerance) .init(t0) .unwrap(); - let scheduler = simu.scheduler(); let mut delta = Duration::ZERO; for tick_ms in ticks_ms { let tick = Duration::from_millis(*tick_ms); diff --git a/asynchronix/tests/integration/simulation_deadlock.rs b/asynchronix/tests/integration/simulation_deadlock.rs index b4e933d..0616c5f 100644 --- a/asynchronix/tests/integration/simulation_deadlock.rs +++ b/asynchronix/tests/integration/simulation_deadlock.rs @@ -45,7 +45,8 @@ fn deadlock_on_mailbox_overflow(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, MODEL_NAME) .init(t0) - .unwrap(); + .unwrap() + .0; match simu.process_event(TestModel::activate_output, (), addr) { Err(ExecutionError::Deadlock(deadlock_info)) => { @@ -80,7 +81,8 @@ fn deadlock_on_query_loopback(num_threads: usize) { let mut simu = SimInit::with_num_threads(num_threads) .add_model(model, mbox, MODEL_NAME) .init(t0) - .unwrap(); + .unwrap() + .0; match simu.process_query(TestModel::activate_requestor, (), addr) { Err(ExecutionError::Deadlock(deadlock_info)) => { @@ -124,7 +126,8 @@ fn deadlock_on_transitive_query_loopback(num_threads: usize) { .add_model(model1, mbox1, MODEL1_NAME) .add_model(model2, mbox2, MODEL2_NAME) .init(t0) - .unwrap(); + .unwrap() + .0; match simu.process_query(TestModel::activate_requestor, (), addr1) { Err(ExecutionError::Deadlock(deadlock_info)) => { @@ -181,7 +184,8 @@ fn deadlock_on_multiple_query_loopback(num_threads: usize) { .add_model(model1, mbox1, MODEL1_NAME) .add_model(model2, mbox2, MODEL2_NAME) .init(t0) - .unwrap(); + .unwrap() + .0; match simu.process_query(TestModel::activate_requestor, (), addr0) { Err(ExecutionError::Deadlock(deadlock_info)) => { diff --git a/asynchronix/tests/integration/simulation_panic.rs b/asynchronix/tests/integration/simulation_panic.rs index 170aaf2..f779d2f 100644 --- a/asynchronix/tests/integration/simulation_panic.rs +++ b/asynchronix/tests/integration/simulation_panic.rs @@ -48,7 +48,7 @@ fn model_panic(num_threads: usize) { // Run the simulation. let t0 = MonotonicTime::EPOCH; - let mut simu = siminit.init(t0).unwrap(); + let mut simu = siminit.init(t0).unwrap().0; match simu.process_event(TestModel::countdown_in, INIT_COUNTDOWN, addr0) { Err(ExecutionError::Panic { model, payload }) => { diff --git a/asynchronix/tests/integration/simulation_scheduling.rs b/asynchronix/tests/integration/simulation_scheduling.rs index 6552d20..0057071 100644 --- a/asynchronix/tests/integration/simulation_scheduling.rs +++ b/asynchronix/tests/integration/simulation_scheduling.rs @@ -6,7 +6,7 @@ use std::time::Duration; use asynchronix::model::Context; use asynchronix::model::Model; use asynchronix::ports::{EventBuffer, Output}; -use asynchronix::simulation::{Address, Mailbox, SimInit, Simulation}; +use asynchronix::simulation::{Address, Mailbox, Scheduler, SimInit, Simulation}; use asynchronix::time::MonotonicTime; const MT_NUM_THREADS: usize = 4; @@ -32,7 +32,12 @@ impl Model for PassThroughModel {} fn passthrough_bench( num_threads: usize, t0: MonotonicTime, -) -> (Simulation, Address>, EventBuffer) { +) -> ( + Simulation, + Scheduler, + Address>, + EventBuffer, +) { // Bench assembly. let mut model = PassThroughModel::new(); let mbox = Mailbox::new(); @@ -41,19 +46,17 @@ fn passthrough_bench( model.output.connect_sink(&out_stream); let addr = mbox.address(); - let simu = SimInit::with_num_threads(num_threads) + let (simu, scheduler) = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .init(t0) .unwrap(); - (simu, addr, out_stream) + (simu, scheduler, addr, out_stream) } fn schedule_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; - let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); - - let scheduler = simu.scheduler(); + let (mut simu, scheduler, addr, mut output) = passthrough_bench(num_threads, t0); // Queue 2 events at t0+3s and t0+2s, in reverse order. scheduler @@ -92,9 +95,7 @@ fn schedule_events(num_threads: usize) { fn schedule_keyed_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; - let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); - - let scheduler = simu.scheduler(); + let (mut simu, scheduler, addr, mut output) = passthrough_bench(num_threads, t0); let event_t1 = scheduler .schedule_keyed_event( @@ -133,9 +134,7 @@ fn schedule_keyed_events(num_threads: usize) { fn schedule_periodic_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; - let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); - - let scheduler = simu.scheduler(); + let (mut simu, scheduler, addr, mut output) = passthrough_bench(num_threads, t0); // Queue 2 periodic events at t0 + 3s + k*2s. scheduler @@ -172,9 +171,7 @@ fn schedule_periodic_events(num_threads: usize) { fn schedule_periodic_keyed_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; - let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); - - let scheduler = simu.scheduler(); + let (mut simu, scheduler, addr, mut output) = passthrough_bench(num_threads, t0); // Queue 2 periodic events at t0 + 3s + k*2s. scheduler @@ -278,7 +275,7 @@ impl TimestampModel { } #[cfg(not(miri))] impl Model for TimestampModel { - async fn init(mut self, _: &Context) -> asynchronix::model::InitializedModel { + async fn init(mut self, _: &mut Context) -> asynchronix::model::InitializedModel { self.stamp.send((Instant::now(), SystemTime::now())).await; self.into() } @@ -292,6 +289,7 @@ fn timestamp_bench( clock: impl Clock + 'static, ) -> ( Simulation, + Scheduler, Address, EventBuffer<(Instant, SystemTime)>, ) { @@ -303,13 +301,13 @@ fn timestamp_bench( model.stamp.connect_sink(&stamp_stream); let addr = mbox.address(); - let simu = SimInit::with_num_threads(num_threads) + let (simu, scheduler) = SimInit::with_num_threads(num_threads) .add_model(model, mbox, "") .set_clock(clock) .init(t0) .unwrap(); - (simu, addr, stamp_stream) + (simu, scheduler, addr, stamp_stream) } #[cfg(not(miri))] @@ -335,9 +333,7 @@ fn system_clock_from_instant(num_threads: usize) { let clock = SystemClock::from_instant(simulation_ref, wall_clock_ref); - let (mut simu, addr, mut stamp) = timestamp_bench(num_threads, t0, clock); - - let scheduler = simu.scheduler(); + let (mut simu, scheduler, addr, mut stamp) = timestamp_bench(num_threads, t0, clock); // Queue a single event at t0 + 0.1s. scheduler @@ -391,9 +387,7 @@ fn system_clock_from_system_time(num_threads: usize) { let clock = SystemClock::from_system_time(simulation_ref, wall_clock_ref); - let (mut simu, addr, mut stamp) = timestamp_bench(num_threads, t0, clock); - - let scheduler = simu.scheduler(); + let (mut simu, scheduler, addr, mut stamp) = timestamp_bench(num_threads, t0, clock); // Queue a single event at t0 + 0.1s. scheduler @@ -435,11 +429,10 @@ fn auto_system_clock(num_threads: usize) { let t0 = MonotonicTime::EPOCH; const TOLERANCE: f64 = 0.005; // [s] - let (mut simu, addr, mut stamp) = timestamp_bench(num_threads, t0, AutoSystemClock::new()); + let (mut simu, scheduler, addr, mut stamp) = + timestamp_bench(num_threads, t0, AutoSystemClock::new()); let instant_t0 = Instant::now(); - let scheduler = simu.scheduler(); - // Queue a periodic event at t0 + 0.2s + k*0.2s. scheduler .schedule_periodic_event( diff --git a/asynchronix/tests/integration/simulation_timeout.rs b/asynchronix/tests/integration/simulation_timeout.rs index 9558681..f5bc825 100644 --- a/asynchronix/tests/integration/simulation_timeout.rs +++ b/asynchronix/tests/integration/simulation_timeout.rs @@ -51,7 +51,8 @@ fn timeout_untriggered(num_threads: usize) { .add_model(model, mbox, "test") .set_timeout(Duration::from_secs(1)) .init(t0) - .unwrap(); + .unwrap() + .0; assert!(simu.process_event(TestModel::input, (), addr).is_ok()); } @@ -69,7 +70,8 @@ fn timeout_triggered(num_threads: usize) { .add_model(model, mbox, "test") .set_timeout(Duration::from_secs(1)) .init(t0) - .unwrap(); + .unwrap() + .0; assert!(matches!( simu.process_event(TestModel::input, (), addr),