1
0
forked from ROMEO/nexosim

Merge pull request #57 from asynchronics/feature/timeout

Add support for simulation timeouts
This commit is contained in:
Serge Barral 2024-11-09 12:18:41 +01:00 committed by GitHub
commit 0c2f92d4cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 468 additions and 153 deletions

View File

@ -40,6 +40,7 @@ futures-channel = "0.3"
futures-task = "0.3" futures-task = "0.3"
multishot = "0.3.2" multishot = "0.3.2"
num_cpus = "1.13" num_cpus = "1.13"
parking = "2"
pin-project-lite = "0.2" pin-project-lite = "0.2"
recycle-box = "0.2" recycle-box = "0.2"
slab = "0.4" slab = "0.4"

View File

@ -3,6 +3,7 @@
//! Not for production use! //! Not for production use!
use std::future::Future; use std::future::Future;
use std::time::Duration;
use crate::executor; use crate::executor;
@ -25,6 +26,7 @@ impl Executor {
Self(executor::Executor::new_multi_threaded( Self(executor::Executor::new_multi_threaded(
pool_size, pool_size,
dummy_context, dummy_context,
executor::Signal::new(),
)) ))
} }
@ -43,6 +45,6 @@ impl Executor {
/// Let the executor run, blocking until all futures have completed or until /// Let the executor run, blocking until all futures have completed or until
/// the executor deadlocks. /// the executor deadlocks.
pub fn run(&mut self) { pub fn run(&mut self) {
self.0.run().unwrap(); self.0.run(Duration::ZERO).unwrap();
} }
} }

View File

@ -5,7 +5,11 @@ mod st_executor;
mod task; mod task;
use std::future::Future; use std::future::Future;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crossbeam_utils::CachePadded;
use crate::macros::scoped_thread_local::scoped_thread_local; use crate::macros::scoped_thread_local::scoped_thread_local;
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
@ -19,11 +23,14 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ExecutorError { pub(crate) enum ExecutorError {
/// The simulation has deadlocked. /// The simulation has deadlocked.
Deadlock, Deadlock,
/// The simulation has timed out.
Timeout,
} }
/// Context common to all executor types. /// Context common to all executor types.
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct SimulationContext { pub(crate) struct SimulationContext {
/// Read-only handle to the simulation time.
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
pub(crate) time_reader: AtomicTimeReader, pub(crate) time_reader: AtomicTimeReader,
} }
@ -39,8 +46,11 @@ pub(crate) enum Executor {
impl Executor { impl Executor {
/// Creates an executor that runs futures on the current thread. /// Creates an executor that runs futures on the current thread.
pub(crate) fn new_single_threaded(simulation_context: SimulationContext) -> Self { pub(crate) fn new_single_threaded(
Self::StExecutor(st_executor::Executor::new(simulation_context)) simulation_context: SimulationContext,
abort_signal: Signal,
) -> Self {
Self::StExecutor(st_executor::Executor::new(simulation_context, abort_signal))
} }
/// Creates an executor that runs futures on a thread pool. /// Creates an executor that runs futures on a thread pool.
@ -54,8 +64,13 @@ impl Executor {
pub(crate) fn new_multi_threaded( pub(crate) fn new_multi_threaded(
num_threads: usize, num_threads: usize,
simulation_context: SimulationContext, simulation_context: SimulationContext,
abort_signal: Signal,
) -> Self { ) -> Self {
Self::MtExecutor(mt_executor::Executor::new(num_threads, simulation_context)) Self::MtExecutor(mt_executor::Executor::new(
num_threads,
simulation_context,
abort_signal,
))
} }
/// Spawns a task which output will never be retrieved. /// Spawns a task which output will never be retrieved.
@ -91,19 +106,32 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock. /// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) -> Result<(), ExecutorError> { pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> {
let msg_count = match self { match self {
Self::StExecutor(executor) => executor.run(), Self::StExecutor(executor) => executor.run(timeout),
Self::MtExecutor(executor) => executor.run(), Self::MtExecutor(executor) => executor.run(timeout),
}; }
}
if msg_count != 0 {
assert!(msg_count > 0);
return Err(ExecutorError::Deadlock);
} }
Ok(()) /// A single-use shared boolean signal.
#[derive(Clone, Debug)]
pub(crate) struct Signal(Arc<CachePadded<AtomicBool>>);
impl Signal {
/// Create a new, cleared signal.
pub(crate) fn new() -> Self {
Self(Arc::new(CachePadded::new(AtomicBool::new(false))))
}
/// Sets the signal.
pub(crate) fn set(&self) {
self.0.store(true, Ordering::Relaxed);
}
/// Returns `true``is the signal was set.
pub(crate) fn is_set(&self) -> bool {
self.0.load(Ordering::Relaxed)
} }
} }
@ -196,7 +224,7 @@ mod tests {
} }
}); });
executor.run().unwrap(); executor.run(Duration::ZERO).unwrap();
// Make sure that all tasks are eventually dropped even though each task // Make sure that all tasks are eventually dropped even though each task
// wakes the others when dropped. // wakes the others when dropped.
@ -206,11 +234,18 @@ mod tests {
#[test] #[test]
fn executor_drop_cycle_st() { fn executor_drop_cycle_st() {
executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context())); executor_drop_cycle(Executor::new_single_threaded(
dummy_simulation_context(),
Signal::new(),
));
} }
#[test] #[test]
fn executor_drop_cycle_mt() { fn executor_drop_cycle_mt() {
executor_drop_cycle(Executor::new_multi_threaded(3, dummy_simulation_context())); executor_drop_cycle(Executor::new_multi_threaded(
3,
dummy_simulation_context(),
Signal::new(),
));
} }
} }

View File

@ -53,12 +53,16 @@ use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker}; // TODO: revert to `crossbeam_utils::sync::Parker` once timeout support lands in
// v1.0 (see https://github.com/crossbeam-rs/crossbeam/pull/1012).
use parking::{Parker, Unparker};
use slab::Slab; use slab::Slab;
use crate::channel; use crate::channel;
use crate::executor::task::{self, CancelToken, Promise, Runnable}; use crate::executor::task::{self, CancelToken, Promise, Runnable};
use crate::executor::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT}; use crate::executor::{
ExecutorError, Signal, SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT,
};
use crate::macros::scoped_thread_local::scoped_thread_local; use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::util::rng::Rng; use crate::util::rng::Rng;
use pool_manager::PoolManager; use pool_manager::PoolManager;
@ -84,6 +88,8 @@ pub(crate) struct Executor {
parker: Parker, parker: Parker,
/// Handles to the worker threads. /// Handles to the worker threads.
worker_handles: Vec<JoinHandle<()>>, worker_handles: Vec<JoinHandle<()>>,
/// Handle to the forced termination signal.
abort_signal: Signal,
} }
impl Executor { impl Executor {
@ -95,7 +101,11 @@ impl Executor {
/// ///
/// This will panic if the specified number of threads is zero or is more /// This will panic if the specified number of threads is zero or is more
/// than `usize::BITS`. /// than `usize::BITS`.
pub(crate) fn new(num_threads: usize, simulation_context: SimulationContext) -> Self { pub(crate) fn new(
num_threads: usize,
simulation_context: SimulationContext,
abort_signal: Signal,
) -> Self {
let parker = Parker::new(); let parker = Parker::new();
let unparker = parker.unparker().clone(); let unparker = parker.unparker().clone();
@ -142,12 +152,14 @@ impl Executor {
let context = context.clone(); let context = context.clone();
let active_tasks = active_tasks.clone(); let active_tasks = active_tasks.clone();
let simulation_context = simulation_context.clone(); let simulation_context = simulation_context.clone();
let abort_signal = abort_signal.clone();
move || { move || {
let worker = Worker::new(local_queue, context); let worker = Worker::new(local_queue, context);
SIMULATION_CONTEXT.set(&simulation_context, || { SIMULATION_CONTEXT.set(&simulation_context, || {
ACTIVE_TASKS.set(&active_tasks, || { ACTIVE_TASKS.set(&active_tasks, || {
LOCAL_WORKER.set(&worker, || { LOCAL_WORKER.set(&worker, || {
run_local_worker(&worker, id, worker_parker) run_local_worker(&worker, id, worker_parker, abort_signal)
}) })
}) })
}); });
@ -166,6 +178,7 @@ impl Executor {
active_tasks, active_tasks,
parker, parker,
worker_handles, worker_handles,
abort_signal,
} }
} }
@ -223,23 +236,37 @@ impl Executor {
self.context.injector.insert_task(runnable); self.context.injector.insert_task(runnable);
} }
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or an
/// until the executor reaches a deadlock. /// error is encountered.
/// pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> {
/// The number of unprocessed messages is returned. It should always be 0
/// unless a deadlock occurred.
pub(crate) fn run(&mut self) -> isize {
self.context.pool_manager.activate_worker(); self.context.pool_manager.activate_worker();
loop { loop {
if let Some(worker_panic) = self.context.pool_manager.take_panic() { if let Some(worker_panic) = self.context.pool_manager.take_panic() {
panic::resume_unwind(worker_panic); panic::resume_unwind(worker_panic);
} }
if self.context.pool_manager.pool_is_idle() { if self.context.pool_manager.pool_is_idle() {
return self.context.msg_count.load(Ordering::Relaxed); let msg_count = self.context.msg_count.load(Ordering::Relaxed);
if msg_count != 0 {
assert!(msg_count > 0);
return Err(ExecutorError::Deadlock);
} }
return Ok(());
}
if timeout.is_zero() {
self.parker.park(); self.parker.park();
} else if !self.parker.park_timeout(timeout) {
// A timeout occurred: request all worker threads to return
// as soon as possible.
self.abort_signal.set();
self.context.pool_manager.activate_all_workers();
return Err(ExecutorError::Timeout);
}
} }
} }
} }
@ -247,7 +274,8 @@ impl Executor {
impl Drop for Executor { impl Drop for Executor {
fn drop(&mut self) { fn drop(&mut self) {
// Force all threads to return. // Force all threads to return.
self.context.pool_manager.trigger_termination(); self.abort_signal.set();
self.context.pool_manager.activate_all_workers();
for handle in self.worker_handles.drain(0..) { for handle in self.worker_handles.drain(0..) {
handle.join().unwrap(); handle.join().unwrap();
} }
@ -459,7 +487,7 @@ fn schedule_task(task: Runnable, executor_id: usize) {
/// is received or until it panics. /// is received or until it panics.
/// ///
/// Panics caught in this thread are relayed to the main executor thread. /// Panics caught in this thread are relayed to the main executor thread.
fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { fn run_local_worker(worker: &Worker, id: usize, parker: Parker, abort_signal: Signal) {
let pool_manager = &worker.executor_context.pool_manager; let pool_manager = &worker.executor_context.pool_manager;
let injector = &worker.executor_context.injector; let injector = &worker.executor_context.injector;
let executor_unparker = &worker.executor_context.executor_unparker; let executor_unparker = &worker.executor_context.executor_unparker;
@ -508,7 +536,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
pool_manager.begin_worker_search(); pool_manager.begin_worker_search();
} }
if pool_manager.termination_is_triggered() { if abort_signal.is_set() {
return; return;
} }
@ -578,7 +606,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Pop tasks from the fast slot or the local queue. // Pop tasks from the fast slot or the local queue.
while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) { while let Some(task) = fast_slot.take().or_else(|| local_queue.pop()) {
if pool_manager.termination_is_triggered() { if abort_signal.is_set() {
return; return;
} }
task.run(); task.run();
@ -594,7 +622,8 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Propagate the panic, if any. // Propagate the panic, if any.
if let Err(panic) = result { if let Err(panic) = result {
pool_manager.register_panic(panic); pool_manager.register_panic(panic);
pool_manager.trigger_termination(); abort_signal.set();
pool_manager.activate_all_workers();
executor_unparker.unpark(); executor_unparker.unpark();
} }
} }

View File

@ -1,8 +1,8 @@
use std::any::Any; use std::any::Any;
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::Mutex; use std::sync::Mutex;
use crossbeam_utils::sync::Unparker; use parking::Unparker;
use super::Stealer; use super::Stealer;
use crate::util::bit; use crate::util::bit;
@ -22,8 +22,6 @@ pub(super) struct PoolManager {
active_workers: AtomicUsize, active_workers: AtomicUsize,
/// Count of all workers currently searching for tasks. /// Count of all workers currently searching for tasks.
searching_workers: AtomicUsize, searching_workers: AtomicUsize,
/// Flag requesting all workers to return immediately.
terminate_signal: AtomicBool,
/// Panic caught in a worker thread. /// Panic caught in a worker thread.
worker_panic: Mutex<Option<Box<dyn Any + Send + 'static>>>, worker_panic: Mutex<Option<Box<dyn Any + Send + 'static>>>,
} }
@ -56,7 +54,6 @@ impl PoolManager {
worker_unparkers, worker_unparkers,
active_workers: AtomicUsize::new(0), active_workers: AtomicUsize::new(0),
searching_workers: AtomicUsize::new(0), searching_workers: AtomicUsize::new(0),
terminate_signal: AtomicBool::new(false),
worker_panic: Mutex::new(None), worker_panic: Mutex::new(None),
} }
} }
@ -211,22 +208,14 @@ impl PoolManager {
self.searching_workers.load(Ordering::Relaxed) self.searching_workers.load(Ordering::Relaxed)
} }
/// Triggers the termination signal and unparks all worker threads so they /// Unparks all workers and mark them as active.
/// can cleanly terminate. pub(super) fn activate_all_workers(&self) {
pub(super) fn trigger_termination(&self) {
self.terminate_signal.store(true, Ordering::Relaxed);
self.set_all_workers_active(); self.set_all_workers_active();
for unparker in &*self.worker_unparkers { for unparker in &*self.worker_unparkers {
unparker.unpark(); unparker.unpark();
} }
} }
/// Returns true if the termination signal was triggered.
pub(super) fn termination_is_triggered(&self) -> bool {
self.terminate_signal.load(Ordering::Relaxed)
}
/// Registers a panic associated with the provided worker ID. /// Registers a panic associated with the provided worker ID.
/// ///
/// If no panic is currently registered, the panic in argument is /// If no panic is currently registered, the panic in argument is

View File

@ -1,15 +1,20 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt;
use std::future::Future; use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{fmt, panic, thread};
// TODO: revert to `crossbeam_utils::sync::Parker` once timeout support lands in
// v1.0 (see https://github.com/crossbeam-rs/crossbeam/pull/1012).
use parking::Parker;
use slab::Slab; use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable}; use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID; use super::NEXT_EXECUTOR_ID;
use crate::channel; use crate::channel;
use crate::executor::{SimulationContext, SIMULATION_CONTEXT}; use crate::executor::{ExecutorError, Signal, SimulationContext, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local; use crate::macros::scoped_thread_local::scoped_thread_local;
const QUEUE_MIN_CAPACITY: usize = 32; const QUEUE_MIN_CAPACITY: usize = 32;
@ -19,17 +24,15 @@ scoped_thread_local!(static ACTIVE_TASKS: RefCell<Slab<CancelToken>>);
/// A single-threaded `async` executor. /// A single-threaded `async` executor.
pub(crate) struct Executor { pub(crate) struct Executor {
/// Shared executor data. /// Executor state.
context: ExecutorContext, inner: Option<Box<ExecutorInner>>,
/// List of tasks that have not completed yet. /// Handle to the forced termination signal.
active_tasks: RefCell<Slab<CancelToken>>, abort_signal: Signal,
/// Read-only handle to the simulation time.
simulation_context: SimulationContext,
} }
impl Executor { impl Executor {
/// Creates an executor that runs futures on the current thread. /// Creates an executor that runs futures on the current thread.
pub(crate) fn new(simulation_context: SimulationContext) -> Self { pub(crate) fn new(simulation_context: SimulationContext, abort_signal: Signal) -> Self {
// Each executor instance has a unique ID inherited by tasks to ensure // Each executor instance has a unique ID inherited by tasks to ensure
// that tasks are scheduled on their parent executor. // that tasks are scheduled on their parent executor.
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed); let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
@ -42,9 +45,13 @@ impl Executor {
let active_tasks = RefCell::new(Slab::new()); let active_tasks = RefCell::new(Slab::new());
Self { Self {
inner: Some(Box::new(ExecutorInner {
context, context,
active_tasks, active_tasks,
simulation_context, simulation_context,
abort_signal: abort_signal.clone(),
})),
abort_signal,
} }
} }
@ -58,8 +65,10 @@ impl Executor {
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
let inner = self.inner.as_ref().unwrap();
// Book a slot to store the task cancellation token. // Book a slot to store the task cancellation token.
let mut active_tasks = self.active_tasks.borrow_mut(); let mut active_tasks = inner.active_tasks.borrow_mut();
let task_entry = active_tasks.vacant_entry(); let task_entry = active_tasks.vacant_entry();
// Wrap the future so that it removes its cancel token from the // Wrap the future so that it removes its cancel token from the
@ -67,10 +76,10 @@ impl Executor {
let future = CancellableFuture::new(future, task_entry.key()); let future = CancellableFuture::new(future, task_entry.key());
let (promise, runnable, cancel_token) = let (promise, runnable, cancel_token) =
task::spawn(future, schedule_task, self.context.executor_id); task::spawn(future, schedule_task, inner.context.executor_id);
task_entry.insert(cancel_token); task_entry.insert(cancel_token);
let mut queue = self.context.queue.borrow_mut(); let mut queue = inner.context.queue.borrow_mut();
queue.push(runnable); queue.push(runnable);
promise promise
@ -88,8 +97,10 @@ impl Executor {
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
let inner = self.inner.as_ref().unwrap();
// Book a slot to store the task cancellation token. // Book a slot to store the task cancellation token.
let mut active_tasks = self.active_tasks.borrow_mut(); let mut active_tasks = inner.active_tasks.borrow_mut();
let task_entry = active_tasks.vacant_entry(); let task_entry = active_tasks.vacant_entry();
// Wrap the future so that it removes its cancel token from the // Wrap the future so that it removes its cancel token from the
@ -97,19 +108,71 @@ impl Executor {
let future = CancellableFuture::new(future, task_entry.key()); let future = CancellableFuture::new(future, task_entry.key());
let (runnable, cancel_token) = let (runnable, cancel_token) =
task::spawn_and_forget(future, schedule_task, self.context.executor_id); task::spawn_and_forget(future, schedule_task, inner.context.executor_id);
task_entry.insert(cancel_token); task_entry.insert(cancel_token);
let mut queue = self.context.queue.borrow_mut(); let mut queue = inner.context.queue.borrow_mut();
queue.push(runnable); queue.push(runnable);
} }
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or an
/// until the executor reaches a deadlock. /// error is encountered.
/// pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> {
/// The number of unprocessed messages is returned. It should always be 0 if timeout.is_zero() {
/// unless a deadlock occurred. return self.inner.as_mut().unwrap().run();
pub(crate) fn run(&mut self) -> isize { }
// Temporarily move out the inner state so it can be moved to another
// thread.
let mut inner = self.inner.take().unwrap();
let parker = Parker::new();
let unparker = parker.unparker();
let th = thread::spawn(move || {
// It is necessary to catch worker panics, otherwise the main thread
// will never be unparked if the worker thread panics.
let res = panic::catch_unwind(AssertUnwindSafe(|| inner.run()));
unparker.unpark();
match res {
Ok(res) => (inner, res),
Err(e) => panic::resume_unwind(e),
}
});
if !parker.park_timeout(timeout) {
// Make a best-effort attempt at stopping the worker thread.
self.abort_signal.set();
return Err(ExecutorError::Timeout);
}
match th.join() {
Ok((inner, res)) => {
self.inner = Some(inner);
res
}
Err(e) => panic::resume_unwind(e),
}
}
}
/// Inner state of the executor.
struct ExecutorInner {
/// Shared executor data.
context: ExecutorContext,
/// List of tasks that have not completed yet.
active_tasks: RefCell<Slab<CancelToken>>,
/// Read-only handle to the simulation time.
simulation_context: SimulationContext,
/// Signal requesting the worker thread to return as soon as possible.
abort_signal: Signal,
}
impl ExecutorInner {
fn run(&mut self) -> Result<(), ExecutorError> {
// In case this executor is nested in another one, reset the counter of in-flight messages. // In case this executor is nested in another one, reset the counter of in-flight messages.
let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count); let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count);
@ -122,17 +185,27 @@ impl Executor {
}; };
task.run(); task.run();
if self.abort_signal.is_set() {
return;
}
}) })
}) })
}); });
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
self.context.msg_count if self.context.msg_count != 0 {
assert!(self.context.msg_count > 0);
return Err(ExecutorError::Deadlock);
}
Ok(())
} }
} }
impl Drop for Executor { impl Drop for ExecutorInner {
fn drop(&mut self) { fn drop(&mut self) {
// Drop all tasks that have not completed. // Drop all tasks that have not completed.
// //

View File

@ -11,19 +11,20 @@ enum ErrorCode {
INTERNAL_ERROR = 0; INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1; SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2; SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3; SIMULATION_TIMEOUT = 3;
SIMULATION_MODEL_ERROR = 4; SIMULATION_DEADLOCK = 4;
SIMULATION_PANIC = 5; SIMULATION_MODEL_ERROR = 5;
SIMULATION_BAD_QUERY = 6; SIMULATION_PANIC = 6;
SIMULATION_TIME_OUT_OF_RANGE = 7; SIMULATION_BAD_QUERY = 7;
MISSING_ARGUMENT = 10; SIMULATION_TIME_OUT_OF_RANGE = 8;
INVALID_TIME = 11; MISSING_ARGUMENT = 20;
INVALID_DURATION = 12; INVALID_TIME = 30;
INVALID_PERIOD = 13; INVALID_DURATION = 31;
INVALID_MESSAGE = 14; INVALID_PERIOD = 32;
INVALID_KEY = 15; INVALID_MESSAGE = 33;
SOURCE_NOT_FOUND = 20; INVALID_KEY = 34;
SINK_NOT_FOUND = 21; SOURCE_NOT_FOUND = 40;
SINK_NOT_FOUND = 41;
} }
message Error { message Error {

View File

@ -339,19 +339,20 @@ pub enum ErrorCode {
InternalError = 0, InternalError = 0,
SimulationNotStarted = 1, SimulationNotStarted = 1,
SimulationTerminated = 2, SimulationTerminated = 2,
SimulationDeadlock = 3, SimulationTimeout = 3,
SimulationModelError = 4, SimulationDeadlock = 4,
SimulationPanic = 5, SimulationModelError = 5,
SimulationBadQuery = 6, SimulationPanic = 6,
SimulationTimeOutOfRange = 22, SimulationBadQuery = 7,
MissingArgument = 7, SimulationTimeOutOfRange = 8,
InvalidTime = 8, MissingArgument = 20,
InvalidDuration = 9, InvalidTime = 30,
InvalidPeriod = 10, InvalidDuration = 31,
InvalidMessage = 11, InvalidPeriod = 32,
InvalidKey = 12, InvalidMessage = 33,
SourceNotFound = 20, InvalidKey = 34,
SinkNotFound = 21, SourceNotFound = 40,
SinkNotFound = 41,
} }
impl ErrorCode { impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition. /// String value of the enum field names used in the ProtoBuf definition.
@ -363,6 +364,7 @@ impl ErrorCode {
ErrorCode::InternalError => "INTERNAL_ERROR", ErrorCode::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED", ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK", ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR", ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR",
ErrorCode::SimulationPanic => "SIMULATION_PANIC", ErrorCode::SimulationPanic => "SIMULATION_PANIC",
@ -384,6 +386,7 @@ impl ErrorCode {
"INTERNAL_ERROR" => Some(Self::InternalError), "INTERNAL_ERROR" => Some(Self::InternalError),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError), "SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError),
"SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_PANIC" => Some(Self::SimulationPanic),

View File

@ -38,8 +38,10 @@ fn map_execution_error(error: ExecutionError) -> Error {
ExecutionError::Panic(_) => ErrorCode::SimulationPanic, ExecutionError::Panic(_) => ErrorCode::SimulationPanic,
ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, ExecutionError::BadQuery => ErrorCode::SimulationBadQuery,
ExecutionError::Terminated => ErrorCode::SimulationTerminated, ExecutionError::Terminated => ErrorCode::SimulationTerminated,
ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime, ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime,
}; };
let error_message = error.to_string(); let error_message = error.to_string();
to_error(error_code, error_message) to_error(error_code, error_message)

View File

@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use crate::executor::Executor; use crate::executor::{Executor, Signal};
use crate::simulation::{self, LocalScheduler, Mailbox}; use crate::simulation::{self, LocalScheduler, Mailbox};
use super::{Model, ProtoModel}; use super::{Model, ProtoModel};
@ -184,6 +184,7 @@ pub struct BuildContext<'a, P: ProtoModel> {
pub mailbox: &'a Mailbox<P::Model>, pub mailbox: &'a Mailbox<P::Model>,
context: &'a Context<P::Model>, context: &'a Context<P::Model>,
executor: &'a Executor, executor: &'a Executor,
abort_signal: &'a Signal,
} }
impl<'a, P: ProtoModel> BuildContext<'a, P> { impl<'a, P: ProtoModel> BuildContext<'a, P> {
@ -192,11 +193,13 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
mailbox: &'a Mailbox<P::Model>, mailbox: &'a Mailbox<P::Model>,
context: &'a Context<P::Model>, context: &'a Context<P::Model>,
executor: &'a Executor, executor: &'a Executor,
abort_signal: &'a Signal,
) -> Self { ) -> Self {
Self { Self {
mailbox, mailbox,
context, context,
executor, executor,
abort_signal,
} }
} }
@ -231,6 +234,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
submodel_name, submodel_name,
self.context.scheduler.scheduler.clone(), self.context.scheduler.scheduler.clone(),
self.executor, self.executor,
self.abort_signal,
); );
} }
} }

View File

@ -145,7 +145,7 @@ use std::time::Duration;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel::ChannelObserver; use crate::channel::ChannelObserver;
use crate::executor::{Executor, ExecutorError}; use crate::executor::{Executor, ExecutorError, Signal};
use crate::model::{BuildContext, Context, Model, ProtoModel}; use crate::model::{BuildContext, Context, Model, ProtoModel};
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
use crate::time::{AtomicTime, Clock, MonotonicTime}; use crate::time::{AtomicTime, Clock, MonotonicTime};
@ -195,6 +195,7 @@ pub struct Simulation {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock>, clock: Box<dyn Clock>,
timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>, observers: Vec<(String, Box<dyn ChannelObserver>)>,
is_terminated: bool, is_terminated: bool,
} }
@ -206,6 +207,7 @@ impl Simulation {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>, observers: Vec<(String, Box<dyn ChannelObserver>)>,
) -> Self { ) -> Self {
Self { Self {
@ -213,11 +215,26 @@ impl Simulation {
scheduler_queue, scheduler_queue,
time, time,
clock, clock,
timeout,
observers, observers,
is_terminated: false, is_terminated: false,
} }
} }
/// Sets a timeout for each simulation step.
///
/// The timeout corresponds to the maximum wall clock time allocated for the
/// completion of a single simulation step before an
/// [`ExecutionError::Timeout`] error is raised.
///
/// A null duration disables the timeout, which is the default behavior.
///
/// See also [`SimInit::set_timeout`].
#[cfg(not(target_family = "wasm"))]
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
/// Returns the current simulation time. /// Returns the current simulation time.
pub fn time(&self) -> MonotonicTime { pub fn time(&self) -> MonotonicTime {
self.time.read() self.time.read()
@ -362,7 +379,7 @@ impl Simulation {
return Err(ExecutionError::Terminated); return Err(ExecutionError::Terminated);
} }
self.executor.run().map_err(|e| match e { self.executor.run(self.timeout).map_err(|e| match e {
ExecutorError::Deadlock => { ExecutorError::Deadlock => {
self.is_terminated = true; self.is_terminated = true;
let mut deadlock_info = Vec::new(); let mut deadlock_info = Vec::new();
@ -378,6 +395,11 @@ impl Simulation {
ExecutionError::Deadlock(deadlock_info) ExecutionError::Deadlock(deadlock_info)
} }
ExecutorError::Timeout => {
self.is_terminated = true;
ExecutionError::Timeout
}
}) })
} }
@ -544,8 +566,10 @@ pub enum ExecutionError {
/// The query was invalid and did not obtain a response. /// The query was invalid and did not obtain a response.
BadQuery, BadQuery,
/// The simulation has been terminated due to an earlier deadlock, model /// The simulation has been terminated due to an earlier deadlock, model
/// error or model panic. /// error, model panic or timeout.
Terminated, Terminated,
/// The simulation step has failed to complete within the allocated time.
Timeout,
} }
impl fmt::Display for ExecutionError { impl fmt::Display for ExecutionError {
@ -593,6 +617,7 @@ impl fmt::Display for ExecutionError {
} }
Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"), Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"),
Self::Terminated => f.write_str("the simulation has been terminated"), Self::Terminated => f.write_str("the simulation has been terminated"),
Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"),
} }
} }
} }
@ -653,19 +678,22 @@ pub(crate) fn add_model<P: ProtoModel>(
name: String, name: String,
scheduler: Scheduler, scheduler: Scheduler,
executor: &Executor, executor: &Executor,
abort_signal: &Signal,
) { ) {
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name);
let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address())); let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address()));
let build_context = BuildContext::new(&mailbox, &context, executor); let build_context = BuildContext::new(&mailbox, &context, executor, abort_signal);
let model = model.build(&build_context); let model = model.build(&build_context);
let mut receiver = mailbox.0; let mut receiver = mailbox.0;
let abort_signal = abort_signal.clone();
let fut = async move { let fut = async move {
let mut model = model.init(&context).await.0; let mut model = model.init(&context).await.0;
while receiver.recv(&mut model, &context).await.is_ok() {} while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {}
}; };
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]

View File

@ -1,5 +1,6 @@
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::channel::ChannelObserver; use crate::channel::ChannelObserver;
use crate::executor::{Executor, SimulationContext}; use crate::executor::{Executor, SimulationContext};
@ -9,7 +10,7 @@ use crate::time::{Clock, NoClock};
use crate::util::priority_queue::PriorityQueue; use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell; use crate::util::sync_cell::SyncCell;
use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Simulation}; use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Signal, Simulation};
/// Builder for a multi-threaded, discrete-event simulation. /// Builder for a multi-threaded, discrete-event simulation.
pub struct SimInit { pub struct SimInit {
@ -17,7 +18,9 @@ pub struct SimInit {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>, observers: Vec<(String, Box<dyn ChannelObserver>)>,
abort_signal: Signal,
} }
impl SimInit { impl SimInit {
@ -31,19 +34,25 @@ impl SimInit {
/// threads. /// threads.
/// ///
/// Note that the number of worker threads is automatically constrained to /// Note that the number of worker threads is automatically constrained to
/// be between 1 and `usize::BITS` (inclusive). /// be between 1 and `usize::BITS` (inclusive). It is always set to 1 on
/// `wasm` targets.
pub fn with_num_threads(num_threads: usize) -> Self { pub fn with_num_threads(num_threads: usize) -> Self {
let num_threads = num_threads.clamp(1, usize::BITS as usize); let num_threads = if cfg!(target_family = "wasm") {
1
} else {
num_threads.clamp(1, usize::BITS as usize)
};
let time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)); let time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH));
let simulation_context = SimulationContext { let simulation_context = SimulationContext {
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
time_reader: time.reader(), time_reader: time.reader(),
}; };
let abort_signal = Signal::new();
let executor = if num_threads == 1 { let executor = if num_threads == 1 {
Executor::new_single_threaded(simulation_context) Executor::new_single_threaded(simulation_context, abort_signal.clone())
} else { } else {
Executor::new_multi_threaded(num_threads, simulation_context) Executor::new_multi_threaded(num_threads, simulation_context, abort_signal.clone())
}; };
Self { Self {
@ -51,7 +60,9 @@ impl SimInit {
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
time, time,
clock: Box::new(NoClock::new()), clock: Box::new(NoClock::new()),
timeout: Duration::ZERO,
observers: Vec::new(), observers: Vec::new(),
abort_signal,
} }
} }
@ -70,12 +81,20 @@ impl SimInit {
self.observers self.observers
.push((name.clone(), Box::new(mailbox.0.observer()))); .push((name.clone(), Box::new(mailbox.0.observer())));
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
add_model(model, mailbox, name, scheduler, &self.executor);
add_model(
model,
mailbox,
name,
scheduler,
&self.executor,
&self.abort_signal,
);
self self
} }
/// Synchronize the simulation with the provided [`Clock`]. /// Synchronizes the simulation with the provided [`Clock`].
/// ///
/// If the clock isn't explicitly set then the default [`NoClock`] is used, /// If the clock isn't explicitly set then the default [`NoClock`] is used,
/// resulting in the simulation running as fast as possible. /// resulting in the simulation running as fast as possible.
@ -85,6 +104,23 @@ impl SimInit {
self self
} }
/// Sets a timeout for the call to [`SimInit::init`] and for any subsequent
/// simulation step.
///
/// The timeout corresponds to the maximum wall clock time allocated for the
/// completion of a single simulation step before an
/// [`ExecutionError::Timeout`] error is raised.
///
/// A null duration disables the timeout, which is the default behavior.
///
/// See also [`Simulation::set_timeout`].
#[cfg(not(target_family = "wasm"))]
pub fn set_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
/// Builds a simulation initialized at the specified simulation time, /// Builds a simulation initialized at the specified simulation time,
/// executing the [`Model::init()`](crate::model::Model::init) method on all /// executing the [`Model::init()`](crate::model::Model::init) method on all
/// model initializers. /// model initializers.
@ -97,6 +133,7 @@ impl SimInit {
self.scheduler_queue, self.scheduler_queue,
self.time, self.time,
self.clock, self.clock,
self.timeout,
self.observers, self.observers,
); );
simulation.run()?; simulation.run()?;

View File

@ -0,0 +1,8 @@
// Integration tests follow the organization suggested by Matklad:
// https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html
mod model_scheduling;
mod simulation_deadlock;
mod simulation_scheduling;
#[cfg(not(miri))]
mod simulation_timeout;

View File

@ -2,8 +2,6 @@
use std::time::Duration; use std::time::Duration;
const MT_NUM_THREADS: usize = 4;
#[cfg(not(miri))] #[cfg(not(miri))]
use asynchronix::model::Context; use asynchronix::model::Context;
use asynchronix::model::Model; use asynchronix::model::Model;
@ -11,6 +9,8 @@ use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Address, Mailbox, SimInit, Simulation}; use asynchronix::simulation::{Address, Mailbox, SimInit, Simulation};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
const MT_NUM_THREADS: usize = 4;
// Input-to-output pass-through model. // Input-to-output pass-through model.
struct PassThroughModel<T: Clone + Send + 'static> { struct PassThroughModel<T: Clone + Send + 'static> {
pub output: Output<T>, pub output: Output<T>,
@ -49,7 +49,7 @@ fn passthrough_bench<T: Clone + Send + 'static>(
(simu, addr, out_stream) (simu, addr, out_stream)
} }
fn simulation_schedule_events(num_threads: usize) { fn schedule_events(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0);
@ -90,7 +90,7 @@ fn simulation_schedule_events(num_threads: usize) {
assert!(output.next().is_none()); assert!(output.next().is_none());
} }
fn simulation_schedule_keyed_events(num_threads: usize) { fn schedule_keyed_events(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0);
@ -131,7 +131,7 @@ fn simulation_schedule_keyed_events(num_threads: usize) {
assert!(output.next().is_none()); assert!(output.next().is_none());
} }
fn simulation_schedule_periodic_events(num_threads: usize) { fn schedule_periodic_events(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0);
@ -170,7 +170,7 @@ fn simulation_schedule_periodic_events(num_threads: usize) {
} }
} }
fn simulation_schedule_periodic_keyed_events(num_threads: usize) { fn schedule_periodic_keyed_events(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0); let (mut simu, addr, mut output) = passthrough_bench(num_threads, t0);
@ -219,43 +219,43 @@ fn simulation_schedule_periodic_keyed_events(num_threads: usize) {
} }
#[test] #[test]
fn simulation_schedule_events_st() { fn schedule_events_st() {
simulation_schedule_events(1); schedule_events(1);
} }
#[test] #[test]
fn simulation_schedule_events_mt() { fn schedule_events_mt() {
simulation_schedule_events(MT_NUM_THREADS); schedule_events(MT_NUM_THREADS);
} }
#[test] #[test]
fn simulation_schedule_keyed_events_st() { fn schedule_keyed_events_st() {
simulation_schedule_keyed_events(1); schedule_keyed_events(1);
} }
#[test] #[test]
fn simulation_schedule_keyed_events_mt() { fn schedule_keyed_events_mt() {
simulation_schedule_keyed_events(MT_NUM_THREADS); schedule_keyed_events(MT_NUM_THREADS);
} }
#[test] #[test]
fn simulation_schedule_periodic_events_st() { fn schedule_periodic_events_st() {
simulation_schedule_periodic_events(1); schedule_periodic_events(1);
} }
#[test] #[test]
fn simulation_schedule_periodic_events_mt() { fn schedule_periodic_events_mt() {
simulation_schedule_periodic_events(MT_NUM_THREADS); schedule_periodic_events(MT_NUM_THREADS);
} }
#[test] #[test]
fn simulation_schedule_periodic_keyed_events_st() { fn schedule_periodic_keyed_events_st() {
simulation_schedule_periodic_keyed_events(1); schedule_periodic_keyed_events(1);
} }
#[test] #[test]
fn simulation_schedule_periodic_keyed_events_mt() { fn schedule_periodic_keyed_events_mt() {
simulation_schedule_periodic_keyed_events(MT_NUM_THREADS); schedule_periodic_keyed_events(MT_NUM_THREADS);
} }
#[cfg(not(miri))] #[cfg(not(miri))]
@ -313,7 +313,7 @@ fn timestamp_bench(
} }
#[cfg(not(miri))] #[cfg(not(miri))]
fn simulation_system_clock_from_instant(num_threads: usize) { fn system_clock_from_instant(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
const TOLERANCE: f64 = 0.005; // [s] const TOLERANCE: f64 = 0.005; // [s]
@ -369,7 +369,7 @@ fn simulation_system_clock_from_instant(num_threads: usize) {
} }
#[cfg(not(miri))] #[cfg(not(miri))]
fn simulation_system_clock_from_system_time(num_threads: usize) { fn system_clock_from_system_time(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
const TOLERANCE: f64 = 0.005; // [s] const TOLERANCE: f64 = 0.005; // [s]
@ -431,7 +431,7 @@ fn simulation_system_clock_from_system_time(num_threads: usize) {
} }
#[cfg(not(miri))] #[cfg(not(miri))]
fn simulation_auto_system_clock(num_threads: usize) { fn auto_system_clock(num_threads: usize) {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
const TOLERANCE: f64 = 0.005; // [s] const TOLERANCE: f64 = 0.005; // [s]
@ -478,36 +478,36 @@ fn simulation_auto_system_clock(num_threads: usize) {
#[cfg(not(miri))] #[cfg(not(miri))]
#[test] #[test]
fn simulation_system_clock_from_instant_st() { fn system_clock_from_instant_st() {
simulation_system_clock_from_instant(1); system_clock_from_instant(1);
} }
#[cfg(not(miri))] #[cfg(not(miri))]
#[test] #[test]
fn simulation_system_clock_from_instant_mt() { fn system_clock_from_instant_mt() {
simulation_system_clock_from_instant(MT_NUM_THREADS); system_clock_from_instant(MT_NUM_THREADS);
} }
#[cfg(not(miri))] #[cfg(not(miri))]
#[test] #[test]
fn simulation_system_clock_from_system_time_st() { fn system_clock_from_system_time_st() {
simulation_system_clock_from_system_time(1); system_clock_from_system_time(1);
} }
#[cfg(not(miri))] #[cfg(not(miri))]
#[test] #[test]
fn simulation_system_clock_from_system_time_mt() { fn system_clock_from_system_time_mt() {
simulation_system_clock_from_system_time(MT_NUM_THREADS); system_clock_from_system_time(MT_NUM_THREADS);
} }
#[cfg(not(miri))] #[cfg(not(miri))]
#[test] #[test]
fn simulation_auto_system_clock_st() { fn auto_system_clock_st() {
simulation_auto_system_clock(1); auto_system_clock(1);
} }
#[cfg(not(miri))] #[cfg(not(miri))]
#[test] #[test]
fn simulation_auto_system_clock_mt() { fn auto_system_clock_mt() {
simulation_auto_system_clock(MT_NUM_THREADS); auto_system_clock(MT_NUM_THREADS);
} }

View File

@ -0,0 +1,103 @@
//! Timeout during step execution.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use asynchronix::model::Model;
use asynchronix::ports::Output;
use asynchronix::simulation::{ExecutionError, Mailbox, SimInit};
use asynchronix::time::MonotonicTime;
const MT_NUM_THREADS: usize = 4;
#[derive(Default)]
struct TestModel {
output: Output<()>,
// A liveliness flag that is cleared when the model is dropped.
is_alive: Arc<AtomicBool>,
}
impl TestModel {
fn new() -> (Self, Arc<AtomicBool>) {
let is_alive = Arc::new(AtomicBool::new(true));
(
Self {
output: Output::default(),
is_alive: is_alive.clone(),
},
is_alive,
)
}
async fn input(&mut self) {
self.output.send(()).await;
}
}
impl Drop for TestModel {
fn drop(&mut self) {
self.is_alive.store(false, Ordering::Relaxed);
}
}
impl Model for TestModel {}
fn timeout_untriggered(num_threads: usize) {
let (model, _model_is_alive) = TestModel::new();
let mbox = Mailbox::new();
let addr = mbox.address();
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "test")
.set_timeout(Duration::from_secs(1))
.init(t0)
.unwrap();
assert!(simu.process_event(TestModel::input, (), addr).is_ok());
}
fn timeout_triggered(num_threads: usize) {
let (mut model, model_is_alive) = TestModel::new();
let mbox = Mailbox::new();
let addr = mbox.address();
// Make a loopback connection.
model.output.connect(TestModel::input, addr.clone());
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "test")
.set_timeout(Duration::from_secs(1))
.init(t0)
.unwrap();
assert!(matches!(
simu.process_event(TestModel::input, (), addr),
Err(ExecutionError::Timeout)
));
// Make sure the request to stop the simulation has succeeded.
thread::sleep(Duration::from_millis(10));
assert!(!model_is_alive.load(Ordering::Relaxed));
}
#[test]
fn timeout_untriggered_st() {
timeout_untriggered(1);
}
#[test]
fn timeout_untriggered_mt() {
timeout_untriggered(MT_NUM_THREADS);
}
#[test]
fn timeout_triggered_st() {
timeout_triggered(1);
}
#[test]
fn timeout_triggered_mt() {
timeout_triggered(MT_NUM_THREADS);
}