diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index c45ac15..85f8dcd 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -1,7 +1,7 @@ use std::fmt; use crate::executor::{Executor, Signal}; -use crate::simulation::{self, LocalScheduler, Mailbox}; +use crate::simulation::{self, LocalScheduler, Mailbox, SchedulerInner}; use super::{Model, ProtoModel}; @@ -182,7 +182,8 @@ impl fmt::Debug for Context { pub struct BuildContext<'a, P: ProtoModel> { /// Mailbox of the model. pub mailbox: &'a Mailbox, - context: &'a Context, + name: &'a String, + scheduler: &'a SchedulerInner, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, @@ -192,14 +193,16 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { /// Creates a new local context. pub(crate) fn new( mailbox: &'a Mailbox, - context: &'a Context, + name: &'a String, + scheduler: &'a SchedulerInner, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, ) -> Self { Self { mailbox, - context, + name, + scheduler, executor, abort_signal, model_names, @@ -211,7 +214,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { /// The fully qualified name is made of the unqualified model name, if /// relevant prepended by the dot-separated names of all parent models. pub fn name(&self) -> &str { - &self.context.name + self.name } /// Adds a sub-model to the simulation bench. @@ -232,13 +235,13 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { if submodel_name.is_empty() { submodel_name = String::from(""); }; - submodel_name = self.context.name().to_string() + "." + &submodel_name; + submodel_name = self.name.to_string() + "." + &submodel_name; simulation::add_model( model, mailbox, submodel_name, - self.context.scheduler.scheduler.clone(), + self.scheduler.clone(), self.executor, self.abort_signal, self.model_names, diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 5592406..980bbe8 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -567,7 +567,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, Scheduler}; + use crate::simulation::{Address, LocalScheduler, SchedulerInner}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -638,7 +638,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -710,7 +710,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -772,7 +772,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -859,7 +859,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 998f693..162f23c 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -468,7 +468,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, Scheduler}; + use crate::simulation::{Address, LocalScheduler, SchedulerInner}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -539,7 +539,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -611,7 +611,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -673,7 +673,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -760,7 +760,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index f4a00b3..023521b 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -121,11 +121,14 @@ mod mailbox; mod scheduler; mod sim_init; +use scheduler::SchedulerQueue; + +pub(crate) use scheduler::{ + KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner, +}; + pub use mailbox::{Address, Mailbox}; pub use scheduler::{Action, ActionKey, AutoActionKey, LocalScheduler, Scheduler, SchedulingError}; -pub(crate) use scheduler::{ - KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue, -}; pub use sim_init::SimInit; use std::any::Any; @@ -461,13 +464,13 @@ impl Simulation { let action = pull_next_action(&mut scheduler_queue); let mut next_key = peek_next_key(&mut scheduler_queue); if next_key != Some(current_key) { - // Since there are no other actions targeting the same mailbox - // and the same time, the action is spawned immediately. + // Since there are no other actions with the same origin and the + // same time, the action is spawned immediately. action.spawn_and_forget(&self.executor); } else { // To ensure that their relative order of execution is - // preserved, all actions targeting the same mailbox are - // executed sequentially within a single compound future. + // preserved, all actions with the same origin are executed + // sequentially within a single compound future. let mut action_sequence = SeqFuture::new(); action_sequence.push(action.into_future()); loop { @@ -717,7 +720,7 @@ pub(crate) fn add_model( model: P, mailbox: Mailbox, name: String, - scheduler: Scheduler, + scheduler: SchedulerInner, executor: &Executor, abort_signal: &Signal, model_names: &mut Vec, @@ -725,18 +728,20 @@ pub(crate) fn add_model( #[cfg(feature = "tracing")] let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); - let context = Context::new( - name.clone(), - LocalScheduler::new(scheduler, mailbox.address()), + let mut build_context = BuildContext::new( + &mailbox, + &name, + &scheduler, + executor, + abort_signal, + model_names, ); - let mut build_context = - BuildContext::new(&mailbox, &context, executor, abort_signal, model_names); - let model = model.build(&mut build_context); + let address = mailbox.address(); let mut receiver = mailbox.0; let abort_signal = abort_signal.clone(); - + let context = Context::new(name.clone(), LocalScheduler::new(scheduler, address)); let fut = async move { let mut model = model.init(&context).await.0; while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {} diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index 1cfc394..c830e48 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -1,4 +1,5 @@ //! Scheduling functions and types. +mod inner; use std::error::Error; use std::future::Future; @@ -6,34 +7,31 @@ use std::hash::{Hash, Hasher}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; use std::time::Duration; use std::{fmt, ptr}; -use pin_project::pin_project; -use recycle_box::{coerce_box, RecycleBox}; - -use crate::channel::Sender; use crate::executor::Executor; use crate::model::Model; use crate::ports::InputFn; use crate::simulation::Address; use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; -use crate::util::priority_queue::PriorityQueue; -/// Scheduler. +use inner::ActionInner; + +pub(crate) use inner::{ + KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner, + SchedulerQueue, +}; + +const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; + +/// A global scheduler. #[derive(Clone)] -pub struct Scheduler { - scheduler_queue: Arc>, - time: AtomicTimeReader, -} +pub struct Scheduler(SchedulerInner); impl Scheduler { pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { - Self { - scheduler_queue, - time, - } + Self(SchedulerInner::new(scheduler_queue, time)) } /// Returns the current simulation time. @@ -51,7 +49,7 @@ impl Scheduler { /// } /// ``` pub fn time(&self) -> MonotonicTime { - self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") + self.0.time() } /// Schedules an action at a future time. @@ -63,29 +61,8 @@ impl Scheduler { /// model, these events are guaranteed to be processed according to the /// scheduling order of the actions. pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> { - // The scheduler queue must always be locked when reading the time, - // otherwise the following race could occur: - // 1) this method reads the time and concludes that it is not too late - // to schedule the action, - // 2) the `Simulation` object takes the lock, increments simulation time - // and runs the simulation step, - // 3) this method takes the lock and schedules the now-outdated action. - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - // The channel ID is set to the same value for all actions. This - // ensures that the relative scheduling order of all source events is - // preserved, which is important if some of them target the same models. - // The value 0 was chosen as it prevents collisions with channel IDs as - // the latter are always non-zero. - scheduler_queue.insert((time, 0), action); - - Ok(()) + self.0 + .schedule_from(deadline, action, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules an event at a future time. @@ -110,19 +87,8 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(OnceAction::new(process_event(func, arg, sender))); - - scheduler_queue.insert((time, channel_id), action); - - Ok(()) + self.0 + .schedule_event_from(deadline, func, arg, address, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules a cancellable event at a future time and returns an event key. @@ -147,23 +113,8 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(KeyedOnceAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - event_key.clone(), - )); - - scheduler_queue.insert((time, channel_id), action); - - Ok(event_key) + self.0 + .schedule_keyed_event_from(deadline, func, arg, address, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules a periodically recurring event at a future time. @@ -189,26 +140,14 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = address.into().0; - let channel_id = sender.channel_id(); - - let action = Action::new(PeriodicAction::new( - || process_event(func, arg, sender), + self.0.schedule_periodic_event_from( + deadline, period, - )); - - scheduler_queue.insert((time, channel_id), action); - - Ok(()) + func, + arg, + address, + GLOBAL_SCHEDULER_ORIGIN_ID, + ) } /// Schedules a cancellable, periodically recurring event at a future time @@ -235,26 +174,14 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(KeyedPeriodicAction::new( - |ek| send_keyed_event(ek, func, arg, sender), + self.0.schedule_keyed_periodic_event_from( + deadline, period, - event_key.clone(), - )); - scheduler_queue.insert((time, channel_id), action); - - Ok(event_key) + func, + arg, + address, + GLOBAL_SCHEDULER_ORIGIN_ID, + ) } } @@ -268,13 +195,24 @@ impl fmt::Debug for Scheduler { /// Local scheduler. pub struct LocalScheduler { - pub(crate) scheduler: Scheduler, + scheduler: SchedulerInner, address: Address, + origin_id: usize, } impl LocalScheduler { - pub(crate) fn new(scheduler: Scheduler, address: Address) -> Self { - Self { scheduler, address } + pub(crate) fn new(scheduler: SchedulerInner, address: Address) -> Self { + // The only requirement for the origin ID is that it must be (i) + // specific to each model and (ii) different from 0 (which is reserved + // for the global scheduler). The channel ID of the model mailbox + // fulfills this requirement. + let origin_id = address.0.channel_id(); + + Self { + scheduler, + address, + origin_id, + } } /// Returns the current simulation time. @@ -339,7 +277,7 @@ impl LocalScheduler { S: Send + 'static, { self.scheduler - .schedule_event(deadline, func, arg, &self.address) + .schedule_event_from(deadline, func, arg, &self.address, self.origin_id) } /// Schedules a cancellable event at a future time and returns an action @@ -395,9 +333,13 @@ impl LocalScheduler { T: Send + Clone + 'static, S: Send + 'static, { - let event_key = self - .scheduler - .schedule_keyed_event(deadline, func, arg, &self.address)?; + let event_key = self.scheduler.schedule_keyed_event_from( + deadline, + func, + arg, + &self.address, + self.origin_id, + )?; Ok(event_key) } @@ -451,8 +393,14 @@ impl LocalScheduler { T: Send + Clone + 'static, S: Send + 'static, { - self.scheduler - .schedule_periodic_event(deadline, period, func, arg, &self.address) + self.scheduler.schedule_periodic_event_from( + deadline, + period, + func, + arg, + &self.address, + self.origin_id, + ) } /// Schedules a cancellable, periodically recurring event at a future time @@ -517,12 +465,13 @@ impl LocalScheduler { T: Send + Clone + 'static, S: Send + 'static, { - let event_key = self.scheduler.schedule_keyed_periodic_event( + let event_key = self.scheduler.schedule_keyed_periodic_event_from( deadline, period, func, arg, &self.address, + self.origin_id, )?; Ok(event_key) @@ -534,6 +483,7 @@ impl Clone for LocalScheduler { Self { scheduler: self.scheduler.clone(), address: self.address.clone(), + origin_id: self.origin_id, } } } @@ -543,20 +493,11 @@ impl fmt::Debug for LocalScheduler { f.debug_struct("LocalScheduler") .field("time", &self.time()) .field("address", &self.address) + .field("origin_id", &self.origin_id) .finish_non_exhaustive() } } -/// Shorthand for the scheduler queue type. - -// Why use both time and channel ID as the key? The short answer is that this -// ensures that events targeting the same model are sent in the order they were -// scheduled. More precisely, this ensures that events targeting the same model -// are ordered contiguously in the priority queue, which in turns allows the -// event loop to easily aggregate such events into single futures and thus -// control their relative order of execution. -pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; - /// Managed handle to a scheduled action. /// /// An `AutoActionKey` is a managed handle to a scheduled action that cancels @@ -697,270 +638,3 @@ impl fmt::Debug for Action { f.debug_struct("SchedulableEvent").finish_non_exhaustive() } } - -/// Trait abstracting over the inner type of an action. -pub(crate) trait ActionInner: Send + 'static { - /// Reports whether the action was cancelled. - fn is_cancelled(&self) -> bool; - - /// If this is a periodic action, returns a boxed clone of this action and - /// its repetition period; otherwise returns `None`. - fn next(&self) -> Option<(Box, Duration)>; - - /// Returns a boxed future that performs the action. - fn into_future(self: Box) -> Pin + Send>>; - - /// Spawns the future that performs the action onto the provided executor. - /// - /// This method is typically more efficient that spawning the boxed future - /// from `into_future` since it can directly spawn the unboxed future. - fn spawn_and_forget(self: Box, executor: &Executor); -} - -#[pin_project] -/// An object that can be converted to a future performing a single -/// non-cancellable action. -/// -/// Note that this particular action is in fact already a future: since the -/// future cannot be cancelled and the action does not need to be cloned, -/// there is no need to defer the construction of the future. This makes -/// `into_future` a trivial cast, which saves a boxing operation. -pub(crate) struct OnceAction { - #[pin] - fut: F, -} - -impl OnceAction -where - F: Future + Send + 'static, -{ - /// Constructs a new `OnceAction`. - pub(crate) fn new(fut: F) -> Self { - OnceAction { fut } - } -} - -impl Future for OnceAction -where - F: Future, -{ - type Output = F::Output; - - #[inline(always)] - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} - -impl ActionInner for OnceAction -where - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - false - } - fn next(&self) -> Option<(Box, Duration)> { - None - } - fn into_future(self: Box) -> Pin + Send>> { - // No need for boxing, type coercion is enough here. - Box::into_pin(self) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget(*self); - } -} - -/// An object that can be converted to a future performing a non-cancellable, -/// periodic action. -pub(crate) struct PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// A clonable generator for the associated future. - gen: G, - /// The action repetition period. - period: Duration, -} - -impl PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `PeriodicAction`. - pub(crate) fn new(gen: G, period: Duration) -> Self { - Self { gen, period } - } -} - -impl ActionInner for PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - false - } - fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new(self.gen.clone(), self.period)); - - Some((event, self.period)) - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)()) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)()); - } -} - -/// An object that can be converted to a future performing a single, cancellable -/// action. -pub(crate) struct KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - /// A generator for the associated future. - gen: G, - /// The event cancellation key. - event_key: ActionKey, -} - -impl KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `KeyedOnceAction`. - pub(crate) fn new(gen: G, event_key: ActionKey) -> Self { - Self { gen, event_key } - } -} - -impl ActionInner for KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - self.event_key.is_cancelled() - } - fn next(&self) -> Option<(Box, Duration)> { - None - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); - } -} - -/// An object that can be converted to a future performing a periodic, -/// cancellable action. -pub(crate) struct KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// A clonable generator for associated future. - gen: G, - /// The repetition period. - period: Duration, - /// The event cancellation key. - event_key: ActionKey, -} - -impl KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `KeyedPeriodicAction`. - pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self { - Self { - gen, - period, - event_key, - } - } -} - -impl ActionInner for KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - self.event_key.is_cancelled() - } - fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new( - self.gen.clone(), - self.period, - self.event_key.clone(), - )); - - Some((event, self.period)) - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); - } -} - -/// Asynchronously sends a non-cancellable event to a model input. -pub(crate) async fn process_event(func: F, arg: T, sender: Sender) -where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, -{ - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = func.call(model, arg, scheduler); - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; -} - -/// Asynchronously sends a cancellable event to a model input. -pub(crate) async fn send_keyed_event( - event_key: ActionKey, - func: F, - arg: T, - sender: Sender, -) where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, -{ - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = async move { - // Only perform the call if the event wasn't cancelled. - if !event_key.is_cancelled() { - func.call(model, arg, scheduler).await; - } - }; - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; -} diff --git a/asynchronix/src/simulation/scheduler/inner.rs b/asynchronix/src/simulation/scheduler/inner.rs new file mode 100644 index 0000000..6cb633d --- /dev/null +++ b/asynchronix/src/simulation/scheduler/inner.rs @@ -0,0 +1,508 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use pin_project::pin_project; +use recycle_box::{coerce_box, RecycleBox}; + +use crate::channel::Sender; +use crate::executor::Executor; +use crate::model::Model; +use crate::ports::InputFn; +use crate::simulation::Address; +use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; +use crate::util::priority_queue::PriorityQueue; + +use super::{Action, ActionKey, SchedulingError}; + +/// Alias for the scheduler queue type. +/// +/// Why use both time and origin ID as the key? The short answer is that this +/// allows to preserve the relative ordering of events which have the same +/// origin (where the origin is either a model instance or the global +/// scheduler). The preservation of this ordering is implemented by the event +/// loop, which aggregate events with the same origin into single sequential +/// futures, thus ensuring that they are not executed concurrently. +pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; + +/// Internal scheduler implementation. +#[derive(Clone)] +pub(crate) struct SchedulerInner { + scheduler_queue: Arc>, + time: AtomicTimeReader, +} + +impl SchedulerInner { + pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { + Self { + scheduler_queue, + time, + } + } + + /// Returns the current simulation time. + pub(crate) fn time(&self) -> MonotonicTime { + // We use `read` rather than `try_read` because the scheduler can be + // sent to another thread than the simulator's and could thus + // potentially see a torn read if the simulator increments time + // concurrently. The chances of this happening are very small since + // simulation time is not changed frequently. + self.time.read() + } + + /// Schedules an action identified by its origin at a future time. + pub(crate) fn schedule_from( + &self, + deadline: impl Deadline, + action: Action, + origin_id: usize, + ) -> Result<(), SchedulingError> { + // The scheduler queue must always be locked when reading the time, + // otherwise the following race could occur: + // 1) this method reads the time and concludes that it is not too late + // to schedule the action, + // 2) the `Simulation` object takes the lock, increments simulation time + // and runs the simulation step, + // 3) this method takes the lock and schedules the now-outdated action. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules an event identified by its origin at a future time. + pub(crate) fn schedule_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let sender = address.into().0; + let action = Action::new(OnceAction::new(process_event(func, arg, sender))); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable event identified by its origin at a future time + /// and returns an event key. + pub(crate) fn schedule_keyed_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedOnceAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } + + /// Schedules a periodically recurring event identified by its origin at a + /// future time. + pub(crate) fn schedule_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = address.into().0; + let action = Action::new(PeriodicAction::new( + || process_event(func, arg, sender), + period, + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event identified by its + /// origin at a future time and returns an event key. + pub(crate) fn schedule_keyed_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedPeriodicAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + period, + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } +} + +impl fmt::Debug for SchedulerInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SchedulerInner") + .field("time", &self.time()) + .finish_non_exhaustive() + } +} + +/// Trait abstracting over the inner type of an action. +pub(crate) trait ActionInner: Send + 'static { + /// Reports whether the action was cancelled. + fn is_cancelled(&self) -> bool; + + /// If this is a periodic action, returns a boxed clone of this action and + /// its repetition period; otherwise returns `None`. + fn next(&self) -> Option<(Box, Duration)>; + + /// Returns a boxed future that performs the action. + fn into_future(self: Box) -> Pin + Send>>; + + /// Spawns the future that performs the action onto the provided executor. + /// + /// This method is typically more efficient that spawning the boxed future + /// from `into_future` since it can directly spawn the unboxed future. + fn spawn_and_forget(self: Box, executor: &Executor); +} + +/// An object that can be converted to a future performing a single +/// non-cancellable action. +/// +/// Note that this particular action is in fact already a future: since the +/// future cannot be cancelled and the action does not need to be cloned, +/// there is no need to defer the construction of the future. This makes +/// `into_future` a trivial cast, which saves a boxing operation. +#[pin_project] +pub(crate) struct OnceAction { + #[pin] + fut: F, +} + +impl OnceAction +where + F: Future + Send + 'static, +{ + /// Constructs a new `OnceAction`. + pub(crate) fn new(fut: F) -> Self { + OnceAction { fut } + } +} + +impl Future for OnceAction +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl ActionInner for OnceAction +where + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + false + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + // No need for boxing, type coercion is enough here. + Box::into_pin(self) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget(*self); + } +} + +/// An object that can be converted to a future performing a non-cancellable, +/// periodic action. +pub(crate) struct PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// A clonable generator for the associated future. + gen: G, + /// The action repetition period. + period: Duration, +} + +impl PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `PeriodicAction`. + pub(crate) fn new(gen: G, period: Duration) -> Self { + Self { gen, period } + } +} + +impl ActionInner for PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + false + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new(self.gen.clone(), self.period)); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)()) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)()); + } +} + +/// An object that can be converted to a future performing a single, cancellable +/// action. +pub(crate) struct KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + /// A generator for the associated future. + gen: G, + /// The event cancellation key. + event_key: ActionKey, +} + +impl KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `KeyedOnceAction`. + pub(crate) fn new(gen: G, event_key: ActionKey) -> Self { + Self { gen, event_key } + } +} + +impl ActionInner for KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)(self.event_key)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)(self.event_key)); + } +} + +/// An object that can be converted to a future performing a periodic, +/// cancellable action. +pub(crate) struct KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// A clonable generator for associated future. + gen: G, + /// The repetition period. + period: Duration, + /// The event cancellation key. + event_key: ActionKey, +} + +impl KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `KeyedPeriodicAction`. + pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self { + Self { + gen, + period, + event_key, + } + } +} + +impl ActionInner for KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new( + self.gen.clone(), + self.period, + self.event_key.clone(), + )); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)(self.event_key)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)(self.event_key)); + } +} + +/// Asynchronously sends a non-cancellable event to a model input. +pub(crate) async fn process_event(func: F, arg: T, sender: Sender) +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} + +/// Asynchronously sends a cancellable event to a model input. +pub(crate) async fn send_keyed_event( + event_key: ActionKey, + func: F, + arg: T, + sender: Sender, +) where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = async move { + // Only perform the call if the event wasn't cancelled. + if !event_key.is_cancelled() { + func.call(model, arg, scheduler).await; + } + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 793b635..fb72641 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -10,7 +10,9 @@ use crate::time::{Clock, NoClock}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Signal, Simulation}; +use super::{ + add_model, ExecutionError, Mailbox, SchedulerInner, SchedulerQueue, Signal, Simulation, +}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -88,7 +90,7 @@ impl SimInit { }; self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); - let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); + let scheduler = SchedulerInner::new(self.scheduler_queue.clone(), self.time.reader()); add_model( model, diff --git a/asynchronix/src/util/sync_cell.rs b/asynchronix/src/util/sync_cell.rs index 01bc8ec..075d2b6 100644 --- a/asynchronix/src/util/sync_cell.rs +++ b/asynchronix/src/util/sync_cell.rs @@ -183,6 +183,15 @@ impl SyncCellReader { Err(SyncCellReadError {}) } } + + /// Performs a synchronized read by spinning on `try_read`. + pub(crate) fn read(&self) -> T::Value { + loop { + if let Ok(value) = self.try_read() { + return value; + } + } + } } impl Clone for SyncCellReader {