From b1896dbde97ad957369ef226a8f5eb531604a9a9 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Fri, 15 Nov 2024 14:39:51 +0100 Subject: [PATCH] Order scheduled messages by their origin Previously, the scheduler key used the target model as subkey to order messages that target the same model. Now this subkey is the origin model rather than the target, or in the case of the global scheduler, 0. This doesn't change anythin in practice for the local scheduler since the origin and target models were the same, but for the global scheduler this provides additional guarranties. For instance, if the global scheduler is used to schedule an event targetting model A and then an event targetting model B where the latter triggers a message to A, it is now guarranteed that the first message will get to A before the second. --- asynchronix/src/model/context.rs | 17 +- asynchronix/src/ports/output/broadcaster.rs | 10 +- asynchronix/src/ports/source/broadcaster.rs | 10 +- asynchronix/src/simulation.rs | 35 +- asynchronix/src/simulation/scheduler.rs | 462 +++------------- asynchronix/src/simulation/scheduler/inner.rs | 508 ++++++++++++++++++ asynchronix/src/simulation/sim_init.rs | 6 +- asynchronix/src/util/sync_cell.rs | 9 + 8 files changed, 629 insertions(+), 428 deletions(-) create mode 100644 asynchronix/src/simulation/scheduler/inner.rs diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index c45ac15..85f8dcd 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -1,7 +1,7 @@ use std::fmt; use crate::executor::{Executor, Signal}; -use crate::simulation::{self, LocalScheduler, Mailbox}; +use crate::simulation::{self, LocalScheduler, Mailbox, SchedulerInner}; use super::{Model, ProtoModel}; @@ -182,7 +182,8 @@ impl fmt::Debug for Context { pub struct BuildContext<'a, P: ProtoModel> { /// Mailbox of the model. pub mailbox: &'a Mailbox, - context: &'a Context, + name: &'a String, + scheduler: &'a SchedulerInner, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, @@ -192,14 +193,16 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { /// Creates a new local context. pub(crate) fn new( mailbox: &'a Mailbox, - context: &'a Context, + name: &'a String, + scheduler: &'a SchedulerInner, executor: &'a Executor, abort_signal: &'a Signal, model_names: &'a mut Vec, ) -> Self { Self { mailbox, - context, + name, + scheduler, executor, abort_signal, model_names, @@ -211,7 +214,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { /// The fully qualified name is made of the unqualified model name, if /// relevant prepended by the dot-separated names of all parent models. pub fn name(&self) -> &str { - &self.context.name + self.name } /// Adds a sub-model to the simulation bench. @@ -232,13 +235,13 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { if submodel_name.is_empty() { submodel_name = String::from(""); }; - submodel_name = self.context.name().to_string() + "." + &submodel_name; + submodel_name = self.name.to_string() + "." + &submodel_name; simulation::add_model( model, mailbox, submodel_name, - self.context.scheduler.scheduler.clone(), + self.scheduler.clone(), self.executor, self.abort_signal, self.model_names, diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 5592406..980bbe8 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -567,7 +567,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, Scheduler}; + use crate::simulation::{Address, LocalScheduler, SchedulerInner}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -638,7 +638,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -710,7 +710,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -772,7 +772,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -859,7 +859,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 998f693..162f23c 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -468,7 +468,7 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, LocalScheduler, Scheduler}; + use crate::simulation::{Address, LocalScheduler, SchedulerInner}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -539,7 +539,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -611,7 +611,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -673,7 +673,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); @@ -760,7 +760,7 @@ mod tests { let dummy_context = Context::new( String::new(), LocalScheduler::new( - Scheduler::new(dummy_priority_queue, dummy_time), + SchedulerInner::new(dummy_priority_queue, dummy_time), Address(dummy_address), ), ); diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index f4a00b3..023521b 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -121,11 +121,14 @@ mod mailbox; mod scheduler; mod sim_init; +use scheduler::SchedulerQueue; + +pub(crate) use scheduler::{ + KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner, +}; + pub use mailbox::{Address, Mailbox}; pub use scheduler::{Action, ActionKey, AutoActionKey, LocalScheduler, Scheduler, SchedulingError}; -pub(crate) use scheduler::{ - KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue, -}; pub use sim_init::SimInit; use std::any::Any; @@ -461,13 +464,13 @@ impl Simulation { let action = pull_next_action(&mut scheduler_queue); let mut next_key = peek_next_key(&mut scheduler_queue); if next_key != Some(current_key) { - // Since there are no other actions targeting the same mailbox - // and the same time, the action is spawned immediately. + // Since there are no other actions with the same origin and the + // same time, the action is spawned immediately. action.spawn_and_forget(&self.executor); } else { // To ensure that their relative order of execution is - // preserved, all actions targeting the same mailbox are - // executed sequentially within a single compound future. + // preserved, all actions with the same origin are executed + // sequentially within a single compound future. let mut action_sequence = SeqFuture::new(); action_sequence.push(action.into_future()); loop { @@ -717,7 +720,7 @@ pub(crate) fn add_model( model: P, mailbox: Mailbox, name: String, - scheduler: Scheduler, + scheduler: SchedulerInner, executor: &Executor, abort_signal: &Signal, model_names: &mut Vec, @@ -725,18 +728,20 @@ pub(crate) fn add_model( #[cfg(feature = "tracing")] let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); - let context = Context::new( - name.clone(), - LocalScheduler::new(scheduler, mailbox.address()), + let mut build_context = BuildContext::new( + &mailbox, + &name, + &scheduler, + executor, + abort_signal, + model_names, ); - let mut build_context = - BuildContext::new(&mailbox, &context, executor, abort_signal, model_names); - let model = model.build(&mut build_context); + let address = mailbox.address(); let mut receiver = mailbox.0; let abort_signal = abort_signal.clone(); - + let context = Context::new(name.clone(), LocalScheduler::new(scheduler, address)); let fut = async move { let mut model = model.init(&context).await.0; while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {} diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index 1cfc394..c830e48 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -1,4 +1,5 @@ //! Scheduling functions and types. +mod inner; use std::error::Error; use std::future::Future; @@ -6,34 +7,31 @@ use std::hash::{Hash, Hasher}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; use std::time::Duration; use std::{fmt, ptr}; -use pin_project::pin_project; -use recycle_box::{coerce_box, RecycleBox}; - -use crate::channel::Sender; use crate::executor::Executor; use crate::model::Model; use crate::ports::InputFn; use crate::simulation::Address; use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; -use crate::util::priority_queue::PriorityQueue; -/// Scheduler. +use inner::ActionInner; + +pub(crate) use inner::{ + KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerInner, + SchedulerQueue, +}; + +const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; + +/// A global scheduler. #[derive(Clone)] -pub struct Scheduler { - scheduler_queue: Arc>, - time: AtomicTimeReader, -} +pub struct Scheduler(SchedulerInner); impl Scheduler { pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { - Self { - scheduler_queue, - time, - } + Self(SchedulerInner::new(scheduler_queue, time)) } /// Returns the current simulation time. @@ -51,7 +49,7 @@ impl Scheduler { /// } /// ``` pub fn time(&self) -> MonotonicTime { - self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") + self.0.time() } /// Schedules an action at a future time. @@ -63,29 +61,8 @@ impl Scheduler { /// model, these events are guaranteed to be processed according to the /// scheduling order of the actions. pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> { - // The scheduler queue must always be locked when reading the time, - // otherwise the following race could occur: - // 1) this method reads the time and concludes that it is not too late - // to schedule the action, - // 2) the `Simulation` object takes the lock, increments simulation time - // and runs the simulation step, - // 3) this method takes the lock and schedules the now-outdated action. - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - // The channel ID is set to the same value for all actions. This - // ensures that the relative scheduling order of all source events is - // preserved, which is important if some of them target the same models. - // The value 0 was chosen as it prevents collisions with channel IDs as - // the latter are always non-zero. - scheduler_queue.insert((time, 0), action); - - Ok(()) + self.0 + .schedule_from(deadline, action, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules an event at a future time. @@ -110,19 +87,8 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(OnceAction::new(process_event(func, arg, sender))); - - scheduler_queue.insert((time, channel_id), action); - - Ok(()) + self.0 + .schedule_event_from(deadline, func, arg, address, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules a cancellable event at a future time and returns an event key. @@ -147,23 +113,8 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(KeyedOnceAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - event_key.clone(), - )); - - scheduler_queue.insert((time, channel_id), action); - - Ok(event_key) + self.0 + .schedule_keyed_event_from(deadline, func, arg, address, GLOBAL_SCHEDULER_ORIGIN_ID) } /// Schedules a periodically recurring event at a future time. @@ -189,26 +140,14 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = address.into().0; - let channel_id = sender.channel_id(); - - let action = Action::new(PeriodicAction::new( - || process_event(func, arg, sender), + self.0.schedule_periodic_event_from( + deadline, period, - )); - - scheduler_queue.insert((time, channel_id), action); - - Ok(()) + func, + arg, + address, + GLOBAL_SCHEDULER_ORIGIN_ID, + ) } /// Schedules a cancellable, periodically recurring event at a future time @@ -235,26 +174,14 @@ impl Scheduler { T: Send + Clone + 'static, S: Send + 'static, { - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let event_key = ActionKey::new(); - let sender = address.into().0; - let channel_id = sender.channel_id(); - let action = Action::new(KeyedPeriodicAction::new( - |ek| send_keyed_event(ek, func, arg, sender), + self.0.schedule_keyed_periodic_event_from( + deadline, period, - event_key.clone(), - )); - scheduler_queue.insert((time, channel_id), action); - - Ok(event_key) + func, + arg, + address, + GLOBAL_SCHEDULER_ORIGIN_ID, + ) } } @@ -268,13 +195,24 @@ impl fmt::Debug for Scheduler { /// Local scheduler. pub struct LocalScheduler { - pub(crate) scheduler: Scheduler, + scheduler: SchedulerInner, address: Address, + origin_id: usize, } impl LocalScheduler { - pub(crate) fn new(scheduler: Scheduler, address: Address) -> Self { - Self { scheduler, address } + pub(crate) fn new(scheduler: SchedulerInner, address: Address) -> Self { + // The only requirement for the origin ID is that it must be (i) + // specific to each model and (ii) different from 0 (which is reserved + // for the global scheduler). The channel ID of the model mailbox + // fulfills this requirement. + let origin_id = address.0.channel_id(); + + Self { + scheduler, + address, + origin_id, + } } /// Returns the current simulation time. @@ -339,7 +277,7 @@ impl LocalScheduler { S: Send + 'static, { self.scheduler - .schedule_event(deadline, func, arg, &self.address) + .schedule_event_from(deadline, func, arg, &self.address, self.origin_id) } /// Schedules a cancellable event at a future time and returns an action @@ -395,9 +333,13 @@ impl LocalScheduler { T: Send + Clone + 'static, S: Send + 'static, { - let event_key = self - .scheduler - .schedule_keyed_event(deadline, func, arg, &self.address)?; + let event_key = self.scheduler.schedule_keyed_event_from( + deadline, + func, + arg, + &self.address, + self.origin_id, + )?; Ok(event_key) } @@ -451,8 +393,14 @@ impl LocalScheduler { T: Send + Clone + 'static, S: Send + 'static, { - self.scheduler - .schedule_periodic_event(deadline, period, func, arg, &self.address) + self.scheduler.schedule_periodic_event_from( + deadline, + period, + func, + arg, + &self.address, + self.origin_id, + ) } /// Schedules a cancellable, periodically recurring event at a future time @@ -517,12 +465,13 @@ impl LocalScheduler { T: Send + Clone + 'static, S: Send + 'static, { - let event_key = self.scheduler.schedule_keyed_periodic_event( + let event_key = self.scheduler.schedule_keyed_periodic_event_from( deadline, period, func, arg, &self.address, + self.origin_id, )?; Ok(event_key) @@ -534,6 +483,7 @@ impl Clone for LocalScheduler { Self { scheduler: self.scheduler.clone(), address: self.address.clone(), + origin_id: self.origin_id, } } } @@ -543,20 +493,11 @@ impl fmt::Debug for LocalScheduler { f.debug_struct("LocalScheduler") .field("time", &self.time()) .field("address", &self.address) + .field("origin_id", &self.origin_id) .finish_non_exhaustive() } } -/// Shorthand for the scheduler queue type. - -// Why use both time and channel ID as the key? The short answer is that this -// ensures that events targeting the same model are sent in the order they were -// scheduled. More precisely, this ensures that events targeting the same model -// are ordered contiguously in the priority queue, which in turns allows the -// event loop to easily aggregate such events into single futures and thus -// control their relative order of execution. -pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; - /// Managed handle to a scheduled action. /// /// An `AutoActionKey` is a managed handle to a scheduled action that cancels @@ -697,270 +638,3 @@ impl fmt::Debug for Action { f.debug_struct("SchedulableEvent").finish_non_exhaustive() } } - -/// Trait abstracting over the inner type of an action. -pub(crate) trait ActionInner: Send + 'static { - /// Reports whether the action was cancelled. - fn is_cancelled(&self) -> bool; - - /// If this is a periodic action, returns a boxed clone of this action and - /// its repetition period; otherwise returns `None`. - fn next(&self) -> Option<(Box, Duration)>; - - /// Returns a boxed future that performs the action. - fn into_future(self: Box) -> Pin + Send>>; - - /// Spawns the future that performs the action onto the provided executor. - /// - /// This method is typically more efficient that spawning the boxed future - /// from `into_future` since it can directly spawn the unboxed future. - fn spawn_and_forget(self: Box, executor: &Executor); -} - -#[pin_project] -/// An object that can be converted to a future performing a single -/// non-cancellable action. -/// -/// Note that this particular action is in fact already a future: since the -/// future cannot be cancelled and the action does not need to be cloned, -/// there is no need to defer the construction of the future. This makes -/// `into_future` a trivial cast, which saves a boxing operation. -pub(crate) struct OnceAction { - #[pin] - fut: F, -} - -impl OnceAction -where - F: Future + Send + 'static, -{ - /// Constructs a new `OnceAction`. - pub(crate) fn new(fut: F) -> Self { - OnceAction { fut } - } -} - -impl Future for OnceAction -where - F: Future, -{ - type Output = F::Output; - - #[inline(always)] - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} - -impl ActionInner for OnceAction -where - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - false - } - fn next(&self) -> Option<(Box, Duration)> { - None - } - fn into_future(self: Box) -> Pin + Send>> { - // No need for boxing, type coercion is enough here. - Box::into_pin(self) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget(*self); - } -} - -/// An object that can be converted to a future performing a non-cancellable, -/// periodic action. -pub(crate) struct PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// A clonable generator for the associated future. - gen: G, - /// The action repetition period. - period: Duration, -} - -impl PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `PeriodicAction`. - pub(crate) fn new(gen: G, period: Duration) -> Self { - Self { gen, period } - } -} - -impl ActionInner for PeriodicAction -where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - false - } - fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new(self.gen.clone(), self.period)); - - Some((event, self.period)) - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)()) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)()); - } -} - -/// An object that can be converted to a future performing a single, cancellable -/// action. -pub(crate) struct KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - /// A generator for the associated future. - gen: G, - /// The event cancellation key. - event_key: ActionKey, -} - -impl KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `KeyedOnceAction`. - pub(crate) fn new(gen: G, event_key: ActionKey) -> Self { - Self { gen, event_key } - } -} - -impl ActionInner for KeyedOnceAction -where - G: (FnOnce(ActionKey) -> F) + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - self.event_key.is_cancelled() - } - fn next(&self) -> Option<(Box, Duration)> { - None - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); - } -} - -/// An object that can be converted to a future performing a periodic, -/// cancellable action. -pub(crate) struct KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// A clonable generator for associated future. - gen: G, - /// The repetition period. - period: Duration, - /// The event cancellation key. - event_key: ActionKey, -} - -impl KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - /// Constructs a new `KeyedPeriodicAction`. - pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self { - Self { - gen, - period, - event_key, - } - } -} - -impl ActionInner for KeyedPeriodicAction -where - G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, -{ - fn is_cancelled(&self) -> bool { - self.event_key.is_cancelled() - } - fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new( - self.gen.clone(), - self.period, - self.event_key.clone(), - )); - - Some((event, self.period)) - } - fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) - } - fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); - } -} - -/// Asynchronously sends a non-cancellable event to a model input. -pub(crate) async fn process_event(func: F, arg: T, sender: Sender) -where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, -{ - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = func.call(model, arg, scheduler); - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; -} - -/// Asynchronously sends a cancellable event to a model input. -pub(crate) async fn send_keyed_event( - event_key: ActionKey, - func: F, - arg: T, - sender: Sender, -) where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, -{ - let _ = sender - .send( - move |model: &mut M, - scheduler, - recycle_box: RecycleBox<()>| - -> RecycleBox + Send + '_> { - let fut = async move { - // Only perform the call if the event wasn't cancelled. - if !event_key.is_cancelled() { - func.call(model, arg, scheduler).await; - } - }; - - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }, - ) - .await; -} diff --git a/asynchronix/src/simulation/scheduler/inner.rs b/asynchronix/src/simulation/scheduler/inner.rs new file mode 100644 index 0000000..6cb633d --- /dev/null +++ b/asynchronix/src/simulation/scheduler/inner.rs @@ -0,0 +1,508 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use pin_project::pin_project; +use recycle_box::{coerce_box, RecycleBox}; + +use crate::channel::Sender; +use crate::executor::Executor; +use crate::model::Model; +use crate::ports::InputFn; +use crate::simulation::Address; +use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; +use crate::util::priority_queue::PriorityQueue; + +use super::{Action, ActionKey, SchedulingError}; + +/// Alias for the scheduler queue type. +/// +/// Why use both time and origin ID as the key? The short answer is that this +/// allows to preserve the relative ordering of events which have the same +/// origin (where the origin is either a model instance or the global +/// scheduler). The preservation of this ordering is implemented by the event +/// loop, which aggregate events with the same origin into single sequential +/// futures, thus ensuring that they are not executed concurrently. +pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; + +/// Internal scheduler implementation. +#[derive(Clone)] +pub(crate) struct SchedulerInner { + scheduler_queue: Arc>, + time: AtomicTimeReader, +} + +impl SchedulerInner { + pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { + Self { + scheduler_queue, + time, + } + } + + /// Returns the current simulation time. + pub(crate) fn time(&self) -> MonotonicTime { + // We use `read` rather than `try_read` because the scheduler can be + // sent to another thread than the simulator's and could thus + // potentially see a torn read if the simulator increments time + // concurrently. The chances of this happening are very small since + // simulation time is not changed frequently. + self.time.read() + } + + /// Schedules an action identified by its origin at a future time. + pub(crate) fn schedule_from( + &self, + deadline: impl Deadline, + action: Action, + origin_id: usize, + ) -> Result<(), SchedulingError> { + // The scheduler queue must always be locked when reading the time, + // otherwise the following race could occur: + // 1) this method reads the time and concludes that it is not too late + // to schedule the action, + // 2) the `Simulation` object takes the lock, increments simulation time + // and runs the simulation step, + // 3) this method takes the lock and schedules the now-outdated action. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules an event identified by its origin at a future time. + pub(crate) fn schedule_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let sender = address.into().0; + let action = Action::new(OnceAction::new(process_event(func, arg, sender))); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable event identified by its origin at a future time + /// and returns an event key. + pub(crate) fn schedule_keyed_event_from( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedOnceAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } + + /// Schedules a periodically recurring event identified by its origin at a + /// future time. + pub(crate) fn schedule_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = address.into().0; + let action = Action::new(PeriodicAction::new( + || process_event(func, arg, sender), + period, + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event identified by its + /// origin at a future time and returns an event key. + pub(crate) fn schedule_keyed_periodic_event_from( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + origin_id: usize, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let event_key = ActionKey::new(); + let sender = address.into().0; + let action = Action::new(KeyedPeriodicAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + period, + event_key.clone(), + )); + + // The scheduler queue must always be locked when reading the time (see + // `schedule_from`). + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + scheduler_queue.insert((time, origin_id), action); + + Ok(event_key) + } +} + +impl fmt::Debug for SchedulerInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SchedulerInner") + .field("time", &self.time()) + .finish_non_exhaustive() + } +} + +/// Trait abstracting over the inner type of an action. +pub(crate) trait ActionInner: Send + 'static { + /// Reports whether the action was cancelled. + fn is_cancelled(&self) -> bool; + + /// If this is a periodic action, returns a boxed clone of this action and + /// its repetition period; otherwise returns `None`. + fn next(&self) -> Option<(Box, Duration)>; + + /// Returns a boxed future that performs the action. + fn into_future(self: Box) -> Pin + Send>>; + + /// Spawns the future that performs the action onto the provided executor. + /// + /// This method is typically more efficient that spawning the boxed future + /// from `into_future` since it can directly spawn the unboxed future. + fn spawn_and_forget(self: Box, executor: &Executor); +} + +/// An object that can be converted to a future performing a single +/// non-cancellable action. +/// +/// Note that this particular action is in fact already a future: since the +/// future cannot be cancelled and the action does not need to be cloned, +/// there is no need to defer the construction of the future. This makes +/// `into_future` a trivial cast, which saves a boxing operation. +#[pin_project] +pub(crate) struct OnceAction { + #[pin] + fut: F, +} + +impl OnceAction +where + F: Future + Send + 'static, +{ + /// Constructs a new `OnceAction`. + pub(crate) fn new(fut: F) -> Self { + OnceAction { fut } + } +} + +impl Future for OnceAction +where + F: Future, +{ + type Output = F::Output; + + #[inline(always)] + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} + +impl ActionInner for OnceAction +where + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + false + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + // No need for boxing, type coercion is enough here. + Box::into_pin(self) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget(*self); + } +} + +/// An object that can be converted to a future performing a non-cancellable, +/// periodic action. +pub(crate) struct PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// A clonable generator for the associated future. + gen: G, + /// The action repetition period. + period: Duration, +} + +impl PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `PeriodicAction`. + pub(crate) fn new(gen: G, period: Duration) -> Self { + Self { gen, period } + } +} + +impl ActionInner for PeriodicAction +where + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + false + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new(self.gen.clone(), self.period)); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)()) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)()); + } +} + +/// An object that can be converted to a future performing a single, cancellable +/// action. +pub(crate) struct KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + /// A generator for the associated future. + gen: G, + /// The event cancellation key. + event_key: ActionKey, +} + +impl KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `KeyedOnceAction`. + pub(crate) fn new(gen: G, event_key: ActionKey) -> Self { + Self { gen, event_key } + } +} + +impl ActionInner for KeyedOnceAction +where + G: (FnOnce(ActionKey) -> F) + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + None + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)(self.event_key)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)(self.event_key)); + } +} + +/// An object that can be converted to a future performing a periodic, +/// cancellable action. +pub(crate) struct KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// A clonable generator for associated future. + gen: G, + /// The repetition period. + period: Duration, + /// The event cancellation key. + event_key: ActionKey, +} + +impl KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + /// Constructs a new `KeyedPeriodicAction`. + pub(crate) fn new(gen: G, period: Duration, event_key: ActionKey) -> Self { + Self { + gen, + period, + event_key, + } + } +} + +impl ActionInner for KeyedPeriodicAction +where + G: (FnOnce(ActionKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, +{ + fn is_cancelled(&self) -> bool { + self.event_key.is_cancelled() + } + fn next(&self) -> Option<(Box, Duration)> { + let event = Box::new(Self::new( + self.gen.clone(), + self.period, + self.event_key.clone(), + )); + + Some((event, self.period)) + } + fn into_future(self: Box) -> Pin + Send>> { + Box::pin((self.gen)(self.event_key)) + } + fn spawn_and_forget(self: Box, executor: &Executor) { + executor.spawn_and_forget((self.gen)(self.event_key)); + } +} + +/// Asynchronously sends a non-cancellable event to a model input. +pub(crate) async fn process_event(func: F, arg: T, sender: Sender) +where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} + +/// Asynchronously sends a cancellable event to a model input. +pub(crate) async fn send_keyed_event( + event_key: ActionKey, + func: F, + arg: T, + sender: Sender, +) where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, +{ + let _ = sender + .send( + move |model: &mut M, + scheduler, + recycle_box: RecycleBox<()>| + -> RecycleBox + Send + '_> { + let fut = async move { + // Only perform the call if the event wasn't cancelled. + if !event_key.is_cancelled() { + func.call(model, arg, scheduler).await; + } + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }, + ) + .await; +} diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 793b635..fb72641 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -10,7 +10,9 @@ use crate::time::{Clock, NoClock}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Signal, Simulation}; +use super::{ + add_model, ExecutionError, Mailbox, SchedulerInner, SchedulerQueue, Signal, Simulation, +}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -88,7 +90,7 @@ impl SimInit { }; self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); - let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); + let scheduler = SchedulerInner::new(self.scheduler_queue.clone(), self.time.reader()); add_model( model, diff --git a/asynchronix/src/util/sync_cell.rs b/asynchronix/src/util/sync_cell.rs index 01bc8ec..075d2b6 100644 --- a/asynchronix/src/util/sync_cell.rs +++ b/asynchronix/src/util/sync_cell.rs @@ -183,6 +183,15 @@ impl SyncCellReader { Err(SyncCellReadError {}) } } + + /// Performs a synchronized read by spinning on `try_read`. + pub(crate) fn read(&self) -> T::Value { + loop { + if let Ok(value) = self.try_read() { + return value; + } + } + } } impl Clone for SyncCellReader {