diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index df3a772..46a0fe0 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -40,6 +40,7 @@ futures-channel = "0.3" futures-task = "0.3" multishot = "0.3.2" num_cpus = "1.13" +parking = "2" pin-project-lite = "0.2" recycle-box = "0.2" slab = "0.4" diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 8bb7bd3..69f02c7 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -3,6 +3,7 @@ //! Not for production use! use std::future::Future; +use std::time::Duration; use crate::executor; @@ -25,6 +26,7 @@ impl Executor { Self(executor::Executor::new_multi_threaded( pool_size, dummy_context, + executor::Signal::new(), )) } @@ -43,6 +45,6 @@ impl Executor { /// Let the executor run, blocking until all futures have completed or until /// the executor deadlocks. pub fn run(&mut self) { - self.0.run().unwrap(); + self.0.run(Duration::ZERO).unwrap(); } } diff --git a/asynchronix/src/executor.rs b/asynchronix/src/executor.rs index 630c053..d7bafc0 100644 --- a/asynchronix/src/executor.rs +++ b/asynchronix/src/executor.rs @@ -5,7 +5,11 @@ mod st_executor; mod task; use std::future::Future; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use crossbeam_utils::CachePadded; use crate::macros::scoped_thread_local::scoped_thread_local; #[cfg(feature = "tracing")] @@ -19,11 +23,14 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); pub(crate) enum ExecutorError { /// The simulation has deadlocked. Deadlock, + /// The simulation has timed out. + Timeout, } /// Context common to all executor types. #[derive(Clone)] pub(crate) struct SimulationContext { + /// Read-only handle to the simulation time. #[cfg(feature = "tracing")] pub(crate) time_reader: AtomicTimeReader, } @@ -39,8 +46,11 @@ pub(crate) enum Executor { impl Executor { /// Creates an executor that runs futures on the current thread. - pub(crate) fn new_single_threaded(simulation_context: SimulationContext) -> Self { - Self::StExecutor(st_executor::Executor::new(simulation_context)) + pub(crate) fn new_single_threaded( + simulation_context: SimulationContext, + abort_signal: Signal, + ) -> Self { + Self::StExecutor(st_executor::Executor::new(simulation_context, abort_signal)) } /// Creates an executor that runs futures on a thread pool. @@ -54,8 +64,13 @@ impl Executor { pub(crate) fn new_multi_threaded( num_threads: usize, simulation_context: SimulationContext, + abort_signal: Signal, ) -> Self { - Self::MtExecutor(mt_executor::Executor::new(num_threads, simulation_context)) + Self::MtExecutor(mt_executor::Executor::new( + num_threads, + simulation_context, + abort_signal, + )) } /// Spawns a task which output will never be retrieved. @@ -91,19 +106,32 @@ impl Executor { /// Execute spawned tasks, blocking until all futures have completed or /// until the executor reaches a deadlock. - pub(crate) fn run(&mut self) -> Result<(), ExecutorError> { - let msg_count = match self { - Self::StExecutor(executor) => executor.run(), - Self::MtExecutor(executor) => executor.run(), - }; - - if msg_count != 0 { - assert!(msg_count > 0); - - return Err(ExecutorError::Deadlock); + pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> { + match self { + Self::StExecutor(executor) => executor.run(timeout), + Self::MtExecutor(executor) => executor.run(timeout), } + } +} - Ok(()) +/// A single-use shared boolean signal. +#[derive(Clone, Debug)] +pub(crate) struct Signal(Arc>); + +impl Signal { + /// Create a new, cleared signal. + pub(crate) fn new() -> Self { + Self(Arc::new(CachePadded::new(AtomicBool::new(false)))) + } + + /// Sets the signal. + pub(crate) fn set(&self) { + self.0.store(true, Ordering::Relaxed); + } + + /// Returns `true``is the signal was set. + pub(crate) fn is_set(&self) -> bool { + self.0.load(Ordering::Relaxed) } } @@ -196,7 +224,7 @@ mod tests { } }); - executor.run().unwrap(); + executor.run(Duration::ZERO).unwrap(); // Make sure that all tasks are eventually dropped even though each task // wakes the others when dropped. @@ -206,11 +234,18 @@ mod tests { #[test] fn executor_drop_cycle_st() { - executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context())); + executor_drop_cycle(Executor::new_single_threaded( + dummy_simulation_context(), + Signal::new(), + )); } #[test] fn executor_drop_cycle_mt() { - executor_drop_cycle(Executor::new_multi_threaded(3, dummy_simulation_context())); + executor_drop_cycle(Executor::new_multi_threaded( + 3, + dummy_simulation_context(), + Signal::new(), + )); } } diff --git a/asynchronix/src/executor/mt_executor.rs b/asynchronix/src/executor/mt_executor.rs index 45b65cb..6c18d92 100644 --- a/asynchronix/src/executor/mt_executor.rs +++ b/asynchronix/src/executor/mt_executor.rs @@ -53,12 +53,16 @@ use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; -use crossbeam_utils::sync::{Parker, Unparker}; +// TODO: revert to `crossbeam_utils::sync::Parker` once timeout support lands in +// v1.0 (see https://github.com/crossbeam-rs/crossbeam/pull/1012). +use parking::{Parker, Unparker}; use slab::Slab; use crate::channel; use crate::executor::task::{self, CancelToken, Promise, Runnable}; -use crate::executor::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT}; +use crate::executor::{ + ExecutorError, Signal, SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT, +}; use crate::macros::scoped_thread_local::scoped_thread_local; use crate::util::rng::Rng; use pool_manager::PoolManager; @@ -84,6 +88,8 @@ pub(crate) struct Executor { parker: Parker, /// Handles to the worker threads. worker_handles: Vec>, + /// Handle to the forced termination signal. + abort_signal: Signal, } impl Executor { @@ -95,7 +101,11 @@ impl Executor { /// /// This will panic if the specified number of threads is zero or is more /// than `usize::BITS`. - pub(crate) fn new(num_threads: usize, simulation_context: SimulationContext) -> Self { + pub(crate) fn new( + num_threads: usize, + simulation_context: SimulationContext, + abort_signal: Signal, + ) -> Self { let parker = Parker::new(); let unparker = parker.unparker().clone(); @@ -142,12 +152,14 @@ impl Executor { let context = context.clone(); let active_tasks = active_tasks.clone(); let simulation_context = simulation_context.clone(); + let abort_signal = abort_signal.clone(); + move || { let worker = Worker::new(local_queue, context); SIMULATION_CONTEXT.set(&simulation_context, || { ACTIVE_TASKS.set(&active_tasks, || { LOCAL_WORKER.set(&worker, || { - run_local_worker(&worker, id, worker_parker) + run_local_worker(&worker, id, worker_parker, abort_signal) }) }) }); @@ -166,6 +178,7 @@ impl Executor { active_tasks, parker, worker_handles, + abort_signal, } } @@ -223,23 +236,37 @@ impl Executor { self.context.injector.insert_task(runnable); } - /// Execute spawned tasks, blocking until all futures have completed or - /// until the executor reaches a deadlock. - /// - /// The number of unprocessed messages is returned. It should always be 0 - /// unless a deadlock occurred. - pub(crate) fn run(&mut self) -> isize { + /// Execute spawned tasks, blocking until all futures have completed or an + /// error is encountered. + pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> { self.context.pool_manager.activate_worker(); loop { if let Some(worker_panic) = self.context.pool_manager.take_panic() { panic::resume_unwind(worker_panic); } + if self.context.pool_manager.pool_is_idle() { - return self.context.msg_count.load(Ordering::Relaxed); + let msg_count = self.context.msg_count.load(Ordering::Relaxed); + if msg_count != 0 { + assert!(msg_count > 0); + + return Err(ExecutorError::Deadlock); + } + + return Ok(()); } - self.parker.park(); + if timeout.is_zero() { + self.parker.park(); + } else if !self.parker.park_timeout(timeout) { + // A timeout occurred: request all worker threads to return + // as soon as possible. + self.abort_signal.set(); + self.context.pool_manager.activate_all_workers(); + + return Err(ExecutorError::Timeout); + } } } } @@ -247,7 +274,8 @@ impl Executor { impl Drop for Executor { fn drop(&mut self) { // Force all threads to return. - self.context.pool_manager.trigger_termination(); + self.abort_signal.set(); + self.context.pool_manager.activate_all_workers(); for handle in self.worker_handles.drain(0..) { handle.join().unwrap(); } @@ -459,7 +487,7 @@ fn schedule_task(task: Runnable, executor_id: usize) { /// is received or until it panics. /// /// Panics caught in this thread are relayed to the main executor thread. -fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { +fn run_local_worker(worker: &Worker, id: usize, parker: Parker, abort_signal: Signal) { let pool_manager = &worker.executor_context.pool_manager; let injector = &worker.executor_context.injector; let executor_unparker = &worker.executor_context.executor_unparker; @@ -508,7 +536,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { pool_manager.begin_worker_search(); } - if pool_manager.termination_is_triggered() { + if abort_signal.is_set() { return; } @@ -578,7 +606,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // Pop tasks from the fast slot or the local queue. while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { - if pool_manager.termination_is_triggered() { + if abort_signal.is_set() { return; } task.run(); @@ -594,7 +622,8 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // Propagate the panic, if any. if let Err(panic) = result { pool_manager.register_panic(panic); - pool_manager.trigger_termination(); + abort_signal.set(); + pool_manager.activate_all_workers(); executor_unparker.unpark(); } } diff --git a/asynchronix/src/executor/mt_executor/pool_manager.rs b/asynchronix/src/executor/mt_executor/pool_manager.rs index 119c335..f2bec7c 100644 --- a/asynchronix/src/executor/mt_executor/pool_manager.rs +++ b/asynchronix/src/executor/mt_executor/pool_manager.rs @@ -1,8 +1,8 @@ use std::any::Any; -use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::sync::Mutex; -use crossbeam_utils::sync::Unparker; +use parking::Unparker; use super::Stealer; use crate::util::bit; @@ -22,8 +22,6 @@ pub(super) struct PoolManager { active_workers: AtomicUsize, /// Count of all workers currently searching for tasks. searching_workers: AtomicUsize, - /// Flag requesting all workers to return immediately. - terminate_signal: AtomicBool, /// Panic caught in a worker thread. worker_panic: Mutex>>, } @@ -56,7 +54,6 @@ impl PoolManager { worker_unparkers, active_workers: AtomicUsize::new(0), searching_workers: AtomicUsize::new(0), - terminate_signal: AtomicBool::new(false), worker_panic: Mutex::new(None), } } @@ -211,22 +208,14 @@ impl PoolManager { self.searching_workers.load(Ordering::Relaxed) } - /// Triggers the termination signal and unparks all worker threads so they - /// can cleanly terminate. - pub(super) fn trigger_termination(&self) { - self.terminate_signal.store(true, Ordering::Relaxed); - + /// Unparks all workers and mark them as active. + pub(super) fn activate_all_workers(&self) { self.set_all_workers_active(); for unparker in &*self.worker_unparkers { unparker.unpark(); } } - /// Returns true if the termination signal was triggered. - pub(super) fn termination_is_triggered(&self) -> bool { - self.terminate_signal.load(Ordering::Relaxed) - } - /// Registers a panic associated with the provided worker ID. /// /// If no panic is currently registered, the panic in argument is diff --git a/asynchronix/src/executor/st_executor.rs b/asynchronix/src/executor/st_executor.rs index 49e37d2..be20621 100644 --- a/asynchronix/src/executor/st_executor.rs +++ b/asynchronix/src/executor/st_executor.rs @@ -1,15 +1,20 @@ use std::cell::RefCell; -use std::fmt; use std::future::Future; +use std::panic::AssertUnwindSafe; use std::sync::atomic::Ordering; +use std::time::Duration; +use std::{fmt, panic, thread}; +// TODO: revert to `crossbeam_utils::sync::Parker` once timeout support lands in +// v1.0 (see https://github.com/crossbeam-rs/crossbeam/pull/1012). +use parking::Parker; use slab::Slab; use super::task::{self, CancelToken, Promise, Runnable}; use super::NEXT_EXECUTOR_ID; use crate::channel; -use crate::executor::{SimulationContext, SIMULATION_CONTEXT}; +use crate::executor::{ExecutorError, Signal, SimulationContext, SIMULATION_CONTEXT}; use crate::macros::scoped_thread_local::scoped_thread_local; const QUEUE_MIN_CAPACITY: usize = 32; @@ -19,17 +24,15 @@ scoped_thread_local!(static ACTIVE_TASKS: RefCell>); /// A single-threaded `async` executor. pub(crate) struct Executor { - /// Shared executor data. - context: ExecutorContext, - /// List of tasks that have not completed yet. - active_tasks: RefCell>, - /// Read-only handle to the simulation time. - simulation_context: SimulationContext, + /// Executor state. + inner: Option>, + /// Handle to the forced termination signal. + abort_signal: Signal, } impl Executor { /// Creates an executor that runs futures on the current thread. - pub(crate) fn new(simulation_context: SimulationContext) -> Self { + pub(crate) fn new(simulation_context: SimulationContext, abort_signal: Signal) -> Self { // Each executor instance has a unique ID inherited by tasks to ensure // that tasks are scheduled on their parent executor. let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); @@ -42,9 +45,13 @@ impl Executor { let active_tasks = RefCell::new(Slab::new()); Self { - context, - active_tasks, - simulation_context, + inner: Some(Box::new(ExecutorInner { + context, + active_tasks, + simulation_context, + abort_signal: abort_signal.clone(), + })), + abort_signal, } } @@ -58,8 +65,10 @@ impl Executor { T: Future + Send + 'static, T::Output: Send + 'static, { + let inner = self.inner.as_ref().unwrap(); + // Book a slot to store the task cancellation token. - let mut active_tasks = self.active_tasks.borrow_mut(); + let mut active_tasks = inner.active_tasks.borrow_mut(); let task_entry = active_tasks.vacant_entry(); // Wrap the future so that it removes its cancel token from the @@ -67,10 +76,10 @@ impl Executor { let future = CancellableFuture::new(future, task_entry.key()); let (promise, runnable, cancel_token) = - task::spawn(future, schedule_task, self.context.executor_id); + task::spawn(future, schedule_task, inner.context.executor_id); task_entry.insert(cancel_token); - let mut queue = self.context.queue.borrow_mut(); + let mut queue = inner.context.queue.borrow_mut(); queue.push(runnable); promise @@ -88,8 +97,10 @@ impl Executor { T: Future + Send + 'static, T::Output: Send + 'static, { + let inner = self.inner.as_ref().unwrap(); + // Book a slot to store the task cancellation token. - let mut active_tasks = self.active_tasks.borrow_mut(); + let mut active_tasks = inner.active_tasks.borrow_mut(); let task_entry = active_tasks.vacant_entry(); // Wrap the future so that it removes its cancel token from the @@ -97,19 +108,71 @@ impl Executor { let future = CancellableFuture::new(future, task_entry.key()); let (runnable, cancel_token) = - task::spawn_and_forget(future, schedule_task, self.context.executor_id); + task::spawn_and_forget(future, schedule_task, inner.context.executor_id); task_entry.insert(cancel_token); - let mut queue = self.context.queue.borrow_mut(); + let mut queue = inner.context.queue.borrow_mut(); queue.push(runnable); } - /// Execute spawned tasks, blocking until all futures have completed or - /// until the executor reaches a deadlock. - /// - /// The number of unprocessed messages is returned. It should always be 0 - /// unless a deadlock occurred. - pub(crate) fn run(&mut self) -> isize { + /// Execute spawned tasks, blocking until all futures have completed or an + /// error is encountered. + pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> { + if timeout.is_zero() { + return self.inner.as_mut().unwrap().run(); + } + + // Temporarily move out the inner state so it can be moved to another + // thread. + let mut inner = self.inner.take().unwrap(); + + let parker = Parker::new(); + let unparker = parker.unparker(); + let th = thread::spawn(move || { + // It is necessary to catch worker panics, otherwise the main thread + // will never be unparked if the worker thread panics. + let res = panic::catch_unwind(AssertUnwindSafe(|| inner.run())); + + unparker.unpark(); + + match res { + Ok(res) => (inner, res), + Err(e) => panic::resume_unwind(e), + } + }); + + if !parker.park_timeout(timeout) { + // Make a best-effort attempt at stopping the worker thread. + self.abort_signal.set(); + + return Err(ExecutorError::Timeout); + } + + match th.join() { + Ok((inner, res)) => { + self.inner = Some(inner); + + res + } + Err(e) => panic::resume_unwind(e), + } + } +} + +/// Inner state of the executor. +struct ExecutorInner { + /// Shared executor data. + context: ExecutorContext, + /// List of tasks that have not completed yet. + active_tasks: RefCell>, + /// Read-only handle to the simulation time. + simulation_context: SimulationContext, + /// Signal requesting the worker thread to return as soon as possible. + abort_signal: Signal, +} + +impl ExecutorInner { + fn run(&mut self) -> Result<(), ExecutorError> { // In case this executor is nested in another one, reset the counter of in-flight messages. let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count); @@ -122,17 +185,27 @@ impl Executor { }; task.run(); + + if self.abort_signal.is_set() { + return; + } }) }) }); self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); - self.context.msg_count + if self.context.msg_count != 0 { + assert!(self.context.msg_count > 0); + + return Err(ExecutorError::Deadlock); + } + + Ok(()) } } -impl Drop for Executor { +impl Drop for ExecutorInner { fn drop(&mut self) { // Drop all tasks that have not completed. // diff --git a/asynchronix/src/grpc/api/simulation.proto b/asynchronix/src/grpc/api/simulation.proto index 22bbf3c..4a0884f 100644 --- a/asynchronix/src/grpc/api/simulation.proto +++ b/asynchronix/src/grpc/api/simulation.proto @@ -11,19 +11,20 @@ enum ErrorCode { INTERNAL_ERROR = 0; SIMULATION_NOT_STARTED = 1; SIMULATION_TERMINATED = 2; - SIMULATION_DEADLOCK = 3; - SIMULATION_MODEL_ERROR = 4; - SIMULATION_PANIC = 5; - SIMULATION_BAD_QUERY = 6; - SIMULATION_TIME_OUT_OF_RANGE = 7; - MISSING_ARGUMENT = 10; - INVALID_TIME = 11; - INVALID_DURATION = 12; - INVALID_PERIOD = 13; - INVALID_MESSAGE = 14; - INVALID_KEY = 15; - SOURCE_NOT_FOUND = 20; - SINK_NOT_FOUND = 21; + SIMULATION_TIMEOUT = 3; + SIMULATION_DEADLOCK = 4; + SIMULATION_MODEL_ERROR = 5; + SIMULATION_PANIC = 6; + SIMULATION_BAD_QUERY = 7; + SIMULATION_TIME_OUT_OF_RANGE = 8; + MISSING_ARGUMENT = 20; + INVALID_TIME = 30; + INVALID_DURATION = 31; + INVALID_PERIOD = 32; + INVALID_MESSAGE = 33; + INVALID_KEY = 34; + SOURCE_NOT_FOUND = 40; + SINK_NOT_FOUND = 41; } message Error { diff --git a/asynchronix/src/grpc/codegen/simulation.rs b/asynchronix/src/grpc/codegen/simulation.rs index b1281cb..8338f24 100644 --- a/asynchronix/src/grpc/codegen/simulation.rs +++ b/asynchronix/src/grpc/codegen/simulation.rs @@ -339,19 +339,20 @@ pub enum ErrorCode { InternalError = 0, SimulationNotStarted = 1, SimulationTerminated = 2, - SimulationDeadlock = 3, - SimulationModelError = 4, - SimulationPanic = 5, - SimulationBadQuery = 6, - SimulationTimeOutOfRange = 22, - MissingArgument = 7, - InvalidTime = 8, - InvalidDuration = 9, - InvalidPeriod = 10, - InvalidMessage = 11, - InvalidKey = 12, - SourceNotFound = 20, - SinkNotFound = 21, + SimulationTimeout = 3, + SimulationDeadlock = 4, + SimulationModelError = 5, + SimulationPanic = 6, + SimulationBadQuery = 7, + SimulationTimeOutOfRange = 8, + MissingArgument = 20, + InvalidTime = 30, + InvalidDuration = 31, + InvalidPeriod = 32, + InvalidMessage = 33, + InvalidKey = 34, + SourceNotFound = 40, + SinkNotFound = 41, } impl ErrorCode { /// String value of the enum field names used in the ProtoBuf definition. @@ -363,6 +364,7 @@ impl ErrorCode { ErrorCode::InternalError => "INTERNAL_ERROR", ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED", + ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT", ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK", ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR", ErrorCode::SimulationPanic => "SIMULATION_PANIC", @@ -384,6 +386,7 @@ impl ErrorCode { "INTERNAL_ERROR" => Some(Self::InternalError), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), + "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError), "SIMULATION_PANIC" => Some(Self::SimulationPanic), diff --git a/asynchronix/src/grpc/services.rs b/asynchronix/src/grpc/services.rs index 0799ded..c9a7de7 100644 --- a/asynchronix/src/grpc/services.rs +++ b/asynchronix/src/grpc/services.rs @@ -38,8 +38,10 @@ fn map_execution_error(error: ExecutionError) -> Error { ExecutionError::Panic(_) => ErrorCode::SimulationPanic, ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, ExecutionError::Terminated => ErrorCode::SimulationTerminated, + ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime, }; + let error_message = error.to_string(); to_error(error_code, error_message) diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index 640babd..8657ffc 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -1,6 +1,6 @@ use std::fmt; -use crate::executor::Executor; +use crate::executor::{Executor, Signal}; use crate::simulation::{self, LocalScheduler, Mailbox}; use super::{Model, ProtoModel}; @@ -184,6 +184,7 @@ pub struct BuildContext<'a, P: ProtoModel> { pub mailbox: &'a Mailbox, context: &'a Context, executor: &'a Executor, + abort_signal: &'a Signal, } impl<'a, P: ProtoModel> BuildContext<'a, P> { @@ -192,11 +193,13 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { mailbox: &'a Mailbox, context: &'a Context, executor: &'a Executor, + abort_signal: &'a Signal, ) -> Self { Self { mailbox, context, executor, + abort_signal, } } @@ -231,6 +234,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { submodel_name, self.context.scheduler.scheduler.clone(), self.executor, + self.abort_signal, ); } } diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index e3c3579..d5f466d 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -145,7 +145,7 @@ use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; use crate::channel::ChannelObserver; -use crate::executor::{Executor, ExecutorError}; +use crate::executor::{Executor, ExecutorError, Signal}; use crate::model::{BuildContext, Context, Model, ProtoModel}; use crate::ports::{InputFn, ReplierFn}; use crate::time::{AtomicTime, Clock, MonotonicTime}; @@ -195,6 +195,7 @@ pub struct Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + timeout: Duration, observers: Vec<(String, Box)>, is_terminated: bool, } @@ -206,6 +207,7 @@ impl Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + timeout: Duration, observers: Vec<(String, Box)>, ) -> Self { Self { @@ -213,11 +215,26 @@ impl Simulation { scheduler_queue, time, clock, + timeout, observers, is_terminated: false, } } + /// Sets a timeout for each simulation step. + /// + /// The timeout corresponds to the maximum wall clock time allocated for the + /// completion of a single simulation step before an + /// [`ExecutionError::Timeout`] error is raised. + /// + /// A null duration disables the timeout, which is the default behavior. + /// + /// See also [`SimInit::set_timeout`]. + #[cfg(not(target_family = "wasm"))] + pub fn set_timeout(&mut self, timeout: Duration) { + self.timeout = timeout; + } + /// Returns the current simulation time. pub fn time(&self) -> MonotonicTime { self.time.read() @@ -362,7 +379,7 @@ impl Simulation { return Err(ExecutionError::Terminated); } - self.executor.run().map_err(|e| match e { + self.executor.run(self.timeout).map_err(|e| match e { ExecutorError::Deadlock => { self.is_terminated = true; let mut deadlock_info = Vec::new(); @@ -378,6 +395,11 @@ impl Simulation { ExecutionError::Deadlock(deadlock_info) } + ExecutorError::Timeout => { + self.is_terminated = true; + + ExecutionError::Timeout + } }) } @@ -544,8 +566,10 @@ pub enum ExecutionError { /// The query was invalid and did not obtain a response. BadQuery, /// The simulation has been terminated due to an earlier deadlock, model - /// error or model panic. + /// error, model panic or timeout. Terminated, + /// The simulation step has failed to complete within the allocated time. + Timeout, } impl fmt::Display for ExecutionError { @@ -593,6 +617,7 @@ impl fmt::Display for ExecutionError { } Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"), Self::Terminated => f.write_str("the simulation has been terminated"), + Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"), } } } @@ -653,19 +678,22 @@ pub(crate) fn add_model( name: String, scheduler: Scheduler, executor: &Executor, + abort_signal: &Signal, ) { #[cfg(feature = "tracing")] let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address())); - let build_context = BuildContext::new(&mailbox, &context, executor); + let build_context = BuildContext::new(&mailbox, &context, executor, abort_signal); let model = model.build(&build_context); let mut receiver = mailbox.0; + let abort_signal = abort_signal.clone(); + let fut = async move { let mut model = model.init(&context).await.0; - while receiver.recv(&mut model, &context).await.is_ok() {} + while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {} }; #[cfg(feature = "tracing")] diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index c6580b3..7ac88c6 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -1,5 +1,6 @@ use std::fmt; use std::sync::{Arc, Mutex}; +use std::time::Duration; use crate::channel::ChannelObserver; use crate::executor::{Executor, SimulationContext}; @@ -9,7 +10,7 @@ use crate::time::{Clock, NoClock}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Simulation}; +use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Signal, Simulation}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -17,7 +18,9 @@ pub struct SimInit { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + timeout: Duration, observers: Vec<(String, Box)>, + abort_signal: Signal, } impl SimInit { @@ -31,19 +34,25 @@ impl SimInit { /// threads. /// /// Note that the number of worker threads is automatically constrained to - /// be between 1 and `usize::BITS` (inclusive). + /// be between 1 and `usize::BITS` (inclusive). It is always set to 1 on + /// `wasm` targets. pub fn with_num_threads(num_threads: usize) -> Self { - let num_threads = num_threads.clamp(1, usize::BITS as usize); + let num_threads = if cfg!(target_family = "wasm") { + 1 + } else { + num_threads.clamp(1, usize::BITS as usize) + }; let time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)); let simulation_context = SimulationContext { #[cfg(feature = "tracing")] time_reader: time.reader(), }; + let abort_signal = Signal::new(); let executor = if num_threads == 1 { - Executor::new_single_threaded(simulation_context) + Executor::new_single_threaded(simulation_context, abort_signal.clone()) } else { - Executor::new_multi_threaded(num_threads, simulation_context) + Executor::new_multi_threaded(num_threads, simulation_context, abort_signal.clone()) }; Self { @@ -51,7 +60,9 @@ impl SimInit { scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time, clock: Box::new(NoClock::new()), + timeout: Duration::ZERO, observers: Vec::new(), + abort_signal, } } @@ -70,12 +81,20 @@ impl SimInit { self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); - add_model(model, mailbox, name, scheduler, &self.executor); + + add_model( + model, + mailbox, + name, + scheduler, + &self.executor, + &self.abort_signal, + ); self } - /// Synchronize the simulation with the provided [`Clock`]. + /// Synchronizes the simulation with the provided [`Clock`]. /// /// If the clock isn't explicitly set then the default [`NoClock`] is used, /// resulting in the simulation running as fast as possible. @@ -85,6 +104,23 @@ impl SimInit { self } + /// Sets a timeout for the call to [`SimInit::init`] and for any subsequent + /// simulation step. + /// + /// The timeout corresponds to the maximum wall clock time allocated for the + /// completion of a single simulation step before an + /// [`ExecutionError::Timeout`] error is raised. + /// + /// A null duration disables the timeout, which is the default behavior. + /// + /// See also [`Simulation::set_timeout`]. + #[cfg(not(target_family = "wasm"))] + pub fn set_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + + self + } + /// Builds a simulation initialized at the specified simulation time, /// executing the [`Model::init()`](crate::model::Model::init) method on all /// model initializers. @@ -97,6 +133,7 @@ impl SimInit { self.scheduler_queue, self.time, self.clock, + self.timeout, self.observers, ); simulation.run()?; diff --git a/asynchronix/tests/integration/main.rs b/asynchronix/tests/integration/main.rs new file mode 100644 index 0000000..cd29ae2 --- /dev/null +++ b/asynchronix/tests/integration/main.rs @@ -0,0 +1,8 @@ +// Integration tests follow the organization suggested by Matklad: +// https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html + +mod model_scheduling; +mod simulation_deadlock; +mod simulation_scheduling; +#[cfg(not(miri))] +mod simulation_timeout; diff --git a/asynchronix/tests/model_scheduling.rs b/asynchronix/tests/integration/model_scheduling.rs similarity index 100% rename from asynchronix/tests/model_scheduling.rs rename to asynchronix/tests/integration/model_scheduling.rs diff --git a/asynchronix/tests/simulation_deadlock.rs b/asynchronix/tests/integration/simulation_deadlock.rs similarity index 100% rename from asynchronix/tests/simulation_deadlock.rs rename to asynchronix/tests/integration/simulation_deadlock.rs diff --git a/asynchronix/tests/simulation_scheduling.rs b/asynchronix/tests/integration/simulation_scheduling.rs similarity index 88% rename from asynchronix/tests/simulation_scheduling.rs rename to asynchronix/tests/integration/simulation_scheduling.rs index ab249fe..6552d20 100644 --- a/asynchronix/tests/simulation_scheduling.rs +++ b/asynchronix/tests/integration/simulation_scheduling.rs @@ -2,8 +2,6 @@ use std::time::Duration; -const MT_NUM_THREADS: usize = 4; - #[cfg(not(miri))] use asynchronix::model::Context; use asynchronix::model::Model; @@ -11,6 +9,8 @@ use asynchronix::ports::{EventBuffer, Output}; use asynchronix::simulation::{Address, Mailbox, SimInit, Simulation}; use asynchronix::time::MonotonicTime; +const MT_NUM_THREADS: usize = 4; + // Input-to-output pass-through model. struct PassThroughModel { pub output: Output, @@ -49,7 +49,7 @@ fn passthrough_bench( (simu, addr, out_stream) } -fn simulation_schedule_events(num_threads: usize) { +fn schedule_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); @@ -90,7 +90,7 @@ fn simulation_schedule_events(num_threads: usize) { assert!(output.next().is_none()); } -fn simulation_schedule_keyed_events(num_threads: usize) { +fn schedule_keyed_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); @@ -131,7 +131,7 @@ fn simulation_schedule_keyed_events(num_threads: usize) { assert!(output.next().is_none()); } -fn simulation_schedule_periodic_events(num_threads: usize) { +fn schedule_periodic_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); @@ -170,7 +170,7 @@ fn simulation_schedule_periodic_events(num_threads: usize) { } } -fn simulation_schedule_periodic_keyed_events(num_threads: usize) { +fn schedule_periodic_keyed_events(num_threads: usize) { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); @@ -219,43 +219,43 @@ fn simulation_schedule_periodic_keyed_events(num_threads: usize) { } #[test] -fn simulation_schedule_events_st() { - simulation_schedule_events(1); +fn schedule_events_st() { + schedule_events(1); } #[test] -fn simulation_schedule_events_mt() { - simulation_schedule_events(MT_NUM_THREADS); +fn schedule_events_mt() { + schedule_events(MT_NUM_THREADS); } #[test] -fn simulation_schedule_keyed_events_st() { - simulation_schedule_keyed_events(1); +fn schedule_keyed_events_st() { + schedule_keyed_events(1); } #[test] -fn simulation_schedule_keyed_events_mt() { - simulation_schedule_keyed_events(MT_NUM_THREADS); +fn schedule_keyed_events_mt() { + schedule_keyed_events(MT_NUM_THREADS); } #[test] -fn simulation_schedule_periodic_events_st() { - simulation_schedule_periodic_events(1); +fn schedule_periodic_events_st() { + schedule_periodic_events(1); } #[test] -fn simulation_schedule_periodic_events_mt() { - simulation_schedule_periodic_events(MT_NUM_THREADS); +fn schedule_periodic_events_mt() { + schedule_periodic_events(MT_NUM_THREADS); } #[test] -fn simulation_schedule_periodic_keyed_events_st() { - simulation_schedule_periodic_keyed_events(1); +fn schedule_periodic_keyed_events_st() { + schedule_periodic_keyed_events(1); } #[test] -fn simulation_schedule_periodic_keyed_events_mt() { - simulation_schedule_periodic_keyed_events(MT_NUM_THREADS); +fn schedule_periodic_keyed_events_mt() { + schedule_periodic_keyed_events(MT_NUM_THREADS); } #[cfg(not(miri))] @@ -313,7 +313,7 @@ fn timestamp_bench( } #[cfg(not(miri))] -fn simulation_system_clock_from_instant(num_threads: usize) { +fn system_clock_from_instant(num_threads: usize) { let t0 = MonotonicTime::EPOCH; const TOLERANCE: f64 = 0.005; // [s] @@ -369,7 +369,7 @@ fn simulation_system_clock_from_instant(num_threads: usize) { } #[cfg(not(miri))] -fn simulation_system_clock_from_system_time(num_threads: usize) { +fn system_clock_from_system_time(num_threads: usize) { let t0 = MonotonicTime::EPOCH; const TOLERANCE: f64 = 0.005; // [s] @@ -431,7 +431,7 @@ fn simulation_system_clock_from_system_time(num_threads: usize) { } #[cfg(not(miri))] -fn simulation_auto_system_clock(num_threads: usize) { +fn auto_system_clock(num_threads: usize) { let t0 = MonotonicTime::EPOCH; const TOLERANCE: f64 = 0.005; // [s] @@ -478,36 +478,36 @@ fn simulation_auto_system_clock(num_threads: usize) { #[cfg(not(miri))] #[test] -fn simulation_system_clock_from_instant_st() { - simulation_system_clock_from_instant(1); +fn system_clock_from_instant_st() { + system_clock_from_instant(1); } #[cfg(not(miri))] #[test] -fn simulation_system_clock_from_instant_mt() { - simulation_system_clock_from_instant(MT_NUM_THREADS); +fn system_clock_from_instant_mt() { + system_clock_from_instant(MT_NUM_THREADS); } #[cfg(not(miri))] #[test] -fn simulation_system_clock_from_system_time_st() { - simulation_system_clock_from_system_time(1); +fn system_clock_from_system_time_st() { + system_clock_from_system_time(1); } #[cfg(not(miri))] #[test] -fn simulation_system_clock_from_system_time_mt() { - simulation_system_clock_from_system_time(MT_NUM_THREADS); +fn system_clock_from_system_time_mt() { + system_clock_from_system_time(MT_NUM_THREADS); } #[cfg(not(miri))] #[test] -fn simulation_auto_system_clock_st() { - simulation_auto_system_clock(1); +fn auto_system_clock_st() { + auto_system_clock(1); } #[cfg(not(miri))] #[test] -fn simulation_auto_system_clock_mt() { - simulation_auto_system_clock(MT_NUM_THREADS); +fn auto_system_clock_mt() { + auto_system_clock(MT_NUM_THREADS); } diff --git a/asynchronix/tests/integration/simulation_timeout.rs b/asynchronix/tests/integration/simulation_timeout.rs new file mode 100644 index 0000000..2c3d63a --- /dev/null +++ b/asynchronix/tests/integration/simulation_timeout.rs @@ -0,0 +1,103 @@ +//! Timeout during step execution. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use asynchronix::model::Model; +use asynchronix::ports::Output; +use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; +use asynchronix::time::MonotonicTime; + +const MT_NUM_THREADS: usize = 4; + +#[derive(Default)] +struct TestModel { + output: Output<()>, + // A liveliness flag that is cleared when the model is dropped. + is_alive: Arc, +} +impl TestModel { + fn new() -> (Self, Arc) { + let is_alive = Arc::new(AtomicBool::new(true)); + + ( + Self { + output: Output::default(), + is_alive: is_alive.clone(), + }, + is_alive, + ) + } + + async fn input(&mut self) { + self.output.send(()).await; + } +} +impl Drop for TestModel { + fn drop(&mut self) { + self.is_alive.store(false, Ordering::Relaxed); + } +} +impl Model for TestModel {} + +fn timeout_untriggered(num_threads: usize) { + let (model, _model_is_alive) = TestModel::new(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, "test") + .set_timeout(Duration::from_secs(1)) + .init(t0) + .unwrap(); + + assert!(simu.process_event(TestModel::input, (), addr).is_ok()); +} + +fn timeout_triggered(num_threads: usize) { + let (mut model, model_is_alive) = TestModel::new(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + + // Make a loopback connection. + model.output.connect(TestModel::input, addr.clone()); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, "test") + .set_timeout(Duration::from_secs(1)) + .init(t0) + .unwrap(); + + assert!(matches!( + simu.process_event(TestModel::input, (), addr), + Err(ExecutionError::Timeout) + )); + + // Make sure the request to stop the simulation has succeeded. + thread::sleep(Duration::from_millis(10)); + assert!(!model_is_alive.load(Ordering::Relaxed)); +} + +#[test] +fn timeout_untriggered_st() { + timeout_untriggered(1); +} + +#[test] +fn timeout_untriggered_mt() { + timeout_untriggered(MT_NUM_THREADS); +} + +#[test] +fn timeout_triggered_st() { + timeout_triggered(1); +} + +#[test] +fn timeout_triggered_mt() { + timeout_triggered(MT_NUM_THREADS); +}