1
0
forked from ROMEO/nexosim

Merge pull request #60 from asynchronics/feature/catch_panics

Report panics as errors + identify panicking model
This commit is contained in:
Jauhien Piatlicki 2024-11-13 23:56:51 +01:00 committed by GitHub
commit a533b3e6c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 437 additions and 229 deletions

View File

@ -41,7 +41,7 @@ futures-task = "0.3"
multishot = "0.3.2"
num_cpus = "1.13"
parking = "2"
pin-project-lite = "0.2"
pin-project = "1"
recycle-box = "0.2"
slab = "0.4"
spin_sleep = "1"

View File

@ -85,7 +85,7 @@ impl Model for MotorAssembly {}
impl ProtoModel for ProtoMotorAssembly {
type Model = MotorAssembly;
fn build(self, ctx: &BuildContext<Self>) -> MotorAssembly {
fn build(self, ctx: &mut BuildContext<Self>) -> MotorAssembly {
let mut assembly = MotorAssembly::new();
let mut motor = Motor::new(self.init_pos);
let mut driver = Driver::new(1.0);

View File

@ -58,7 +58,7 @@ impl ProtoModel for ProtoListener {
type Model = Listener;
/// Start the UDP Server immediately upon model construction.
fn build(self, _: &BuildContext<Self>) -> Listener {
fn build(self, _: &mut BuildContext<Self>) -> Listener {
let (tx, rx) = channel();
let external_handle = thread::spawn(move || {

View File

@ -4,6 +4,7 @@ mod mt_executor;
mod st_executor;
mod task;
use std::any::Any;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
@ -12,6 +13,7 @@ use std::time::Duration;
use crossbeam_utils::CachePadded;
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::simulation::ModelId;
#[cfg(feature = "tracing")]
use crate::time::AtomicTimeReader;
use task::Promise;
@ -19,12 +21,14 @@ use task::Promise;
/// Unique identifier for executor instances.
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(PartialEq, Eq, Debug)]
#[derive(Debug)]
pub(crate) enum ExecutorError {
/// The simulation has deadlocked.
Deadlock,
/// The simulation has timed out.
Timeout,
/// The simulation has panicked.
Panic(ModelId, Box<dyn Any + Send + 'static>),
}
/// Context common to all executor types.

View File

@ -64,6 +64,7 @@ use crate::executor::{
ExecutorError, Signal, SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT,
};
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::simulation::CURRENT_MODEL_ID;
use crate::util::rng::Rng;
use pool_manager::PoolManager;
@ -242,8 +243,8 @@ impl Executor {
self.context.pool_manager.activate_worker();
loop {
if let Some(worker_panic) = self.context.pool_manager.take_panic() {
panic::resume_unwind(worker_panic);
if let Some((model_id, payload)) = self.context.pool_manager.take_panic() {
return Err(ExecutorError::Panic(model_id, payload));
}
if self.context.pool_manager.pool_is_idle() {
@ -619,9 +620,10 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker, abort_signal: Si
}
}));
// Propagate the panic, if any.
if let Err(panic) = result {
pool_manager.register_panic(panic);
// Report the panic, if any.
if let Err(payload) = result {
let model_id = CURRENT_MODEL_ID.take();
pool_manager.register_panic(model_id, payload);
abort_signal.set();
pool_manager.activate_all_workers();
executor_unparker.unpark();

View File

@ -5,6 +5,7 @@ use std::sync::Mutex;
use parking::Unparker;
use super::Stealer;
use crate::simulation::ModelId;
use crate::util::bit;
use crate::util::rng;
@ -23,7 +24,7 @@ pub(super) struct PoolManager {
/// Count of all workers currently searching for tasks.
searching_workers: AtomicUsize,
/// Panic caught in a worker thread.
worker_panic: Mutex<Option<Box<dyn Any + Send + 'static>>>,
worker_panic: Mutex<Option<(ModelId, Box<dyn Any + Send + 'static>)>>,
}
impl PoolManager {
@ -216,20 +217,19 @@ impl PoolManager {
}
}
/// Registers a panic associated with the provided worker ID.
/// Registers a worker panic.
///
/// If no panic is currently registered, the panic in argument is
/// registered. If a panic was already registered by a worker and was not
/// yet processed by the executor, then nothing is done.
pub(super) fn register_panic(&self, panic: Box<dyn Any + Send + 'static>) {
/// If a panic was already registered and was not yet processed by the
/// executor, then nothing is done.
pub(super) fn register_panic(&self, model_id: ModelId, payload: Box<dyn Any + Send + 'static>) {
let mut worker_panic = self.worker_panic.lock().unwrap();
if worker_panic.is_none() {
*worker_panic = Some(panic);
*worker_panic = Some((model_id, payload));
}
}
/// Takes a worker panic if any is registered.
pub(super) fn take_panic(&self) -> Option<Box<dyn Any + Send + 'static>> {
pub(super) fn take_panic(&self) -> Option<(ModelId, Box<dyn Any + Send + 'static>)> {
let mut worker_panic = self.worker_panic.lock().unwrap();
worker_panic.take()
}

View File

@ -16,6 +16,7 @@ use super::NEXT_EXECUTOR_ID;
use crate::channel;
use crate::executor::{ExecutorError, Signal, SimulationContext, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::simulation::CURRENT_MODEL_ID;
const QUEUE_MIN_CAPACITY: usize = 32;
@ -129,16 +130,10 @@ impl Executor {
let parker = Parker::new();
let unparker = parker.unparker();
let th = thread::spawn(move || {
// It is necessary to catch worker panics, otherwise the main thread
// will never be unparked if the worker thread panics.
let res = panic::catch_unwind(AssertUnwindSafe(|| inner.run()));
let res = inner.run();
unparker.unpark();
match res {
Ok(res) => (inner, res),
Err(e) => panic::resume_unwind(e),
}
(inner, res)
});
if !parker.park_timeout(timeout) {
@ -148,15 +143,11 @@ impl Executor {
return Err(ExecutorError::Timeout);
}
match th.join() {
Ok((inner, res)) => {
let (inner, res) = th.join().unwrap();
self.inner = Some(inner);
res
}
Err(e) => panic::resume_unwind(e),
}
}
}
/// Inner state of the executor.
@ -176,9 +167,10 @@ impl ExecutorInner {
// In case this executor is nested in another one, reset the counter of in-flight messages.
let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count);
SIMULATION_CONTEXT.set(&self.simulation_context, || {
let result = SIMULATION_CONTEXT.set(&self.simulation_context, || {
ACTIVE_TASKS.set(&self.active_tasks, || {
EXECUTOR_CONTEXT.set(&self.context, || loop {
EXECUTOR_CONTEXT.set(&self.context, || {
panic::catch_unwind(AssertUnwindSafe(|| loop {
let task = match self.context.queue.borrow_mut().pop() {
Some(task) => task,
None => break,
@ -189,12 +181,20 @@ impl ExecutorInner {
if self.abort_signal.is_set() {
return;
}
}))
})
})
});
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
// Return the panic payload, if any.
if let Err(payload) = result {
let model_id = CURRENT_MODEL_ID.take();
return Err(ExecutorError::Panic(model_id, payload));
}
// Check for deadlock.
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
if self.context.msg_count != 0 {
assert!(self.context.msg_count > 0);

View File

@ -11,17 +11,16 @@ enum ErrorCode {
INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2;
SIMULATION_TIMEOUT = 3;
SIMULATION_DEADLOCK = 4;
SIMULATION_MODEL_ERROR = 5;
SIMULATION_PANIC = 6;
SIMULATION_DEADLOCK = 3;
SIMULATION_PANIC = 4;
SIMULATION_TIMEOUT = 5;
SIMULATION_OUT_OF_SYNC = 6;
SIMULATION_BAD_QUERY = 7;
SIMULATION_TIME_OUT_OF_RANGE = 8;
SIMULATION_OUT_OF_SYNC = 9;
MISSING_ARGUMENT = 20;
INVALID_TIME = 30;
INVALID_DURATION = 31;
INVALID_PERIOD = 32;
INVALID_PERIOD = 31;
INVALID_DEADLINE = 32;
INVALID_MESSAGE = 33;
INVALID_KEY = 34;
SOURCE_NOT_FOUND = 40;

View File

@ -339,17 +339,16 @@ pub enum ErrorCode {
InternalError = 0,
SimulationNotStarted = 1,
SimulationTerminated = 2,
SimulationTimeout = 3,
SimulationDeadlock = 4,
SimulationModelError = 5,
SimulationPanic = 6,
SimulationDeadlock = 3,
SimulationPanic = 4,
SimulationTimeout = 5,
SimulationOutOfSync = 6,
SimulationBadQuery = 7,
SimulationTimeOutOfRange = 8,
SimulationOutOfSync = 9,
MissingArgument = 20,
InvalidTime = 30,
InvalidDuration = 31,
InvalidPeriod = 32,
InvalidPeriod = 31,
InvalidDeadline = 32,
InvalidMessage = 33,
InvalidKey = 34,
SourceNotFound = 40,
@ -365,17 +364,16 @@ impl ErrorCode {
ErrorCode::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR",
ErrorCode::SimulationPanic => "SIMULATION_PANIC",
ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT",
ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
ErrorCode::MissingArgument => "MISSING_ARGUMENT",
ErrorCode::InvalidTime => "INVALID_TIME",
ErrorCode::InvalidDuration => "INVALID_DURATION",
ErrorCode::InvalidPeriod => "INVALID_PERIOD",
ErrorCode::InvalidDeadline => "INVALID_DEADLINE",
ErrorCode::InvalidMessage => "INVALID_MESSAGE",
ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
@ -388,17 +386,16 @@ impl ErrorCode {
"INTERNAL_ERROR" => Some(Self::InternalError),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError),
"SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
"SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_DURATION" => Some(Self::InvalidDuration),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),

View File

@ -34,13 +34,12 @@ fn simulation_not_started_error() -> Error {
fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::ModelError { .. } => ErrorCode::SimulationModelError,
ExecutionError::Panic(_) => ErrorCode::SimulationPanic,
ExecutionError::Panic { .. } => ErrorCode::SimulationPanic,
ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,
ExecutionError::BadQuery => ErrorCode::SimulationBadQuery,
ExecutionError::Terminated => ErrorCode::SimulationTerminated,
ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime,
ExecutionError::InvalidDeadline(_) => ErrorCode::InvalidDeadline,
};
let error_message = error.to_string();

View File

@ -107,14 +107,14 @@ impl ControllerService {
simulation.step_until(time).map_err(|_| {
to_error(
ErrorCode::InvalidTime,
ErrorCode::InvalidDeadline,
"the specified deadline lies in the past",
)
})?;
}
step_until_request::Deadline::Duration(duration) => {
let duration = to_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDuration,
ErrorCode::InvalidDeadline,
"the specified deadline lies in the past",
))?;
@ -156,7 +156,7 @@ impl ControllerService {
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or(to_error(
ErrorCode::InvalidDuration,
ErrorCode::InvalidPeriod,
"the specified event period is not strictly positive",
))
})
@ -175,7 +175,7 @@ impl ControllerService {
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDuration,
ErrorCode::InvalidDeadline,
"the specified scheduling deadline is not in the future",
))?;

View File

@ -9,32 +9,31 @@
//!
//! It is frequently convenient to expose to users a model builder type—called a
//! *model prototype*—rather than the final model. This can be done by
//! implementing the `ProtoModel`, which defines the associated model
//! type and a [`ProtoModel::build` method`] invoked when a model is added the
//! implementing the [`ProtoModel`] trait, which defines the associated model
//! type and a [`ProtoModel::build`] method invoked when a model is added the
//! the simulation and returning the actual model instance.
//!
//! Prototype models can be used whenever the Rust builder pattern is helpful,
//! for instance to set optional parameters. One of the use-cases that may
//! benefit from the use of prototype models, however, is hierarchical model
//! building. When a parent model contains sub-models, these sub-models are
//! often an implementation detail that needs not be exposed to the user. One
//! may then define a prototype model that contains all outputs and requestors
//! ports, which upon invocation of `ProtoModel::build()` are moved to the
//! appropriate sub-models (note that the `build` method also allows adding
//! sub-models to the simulation).
//! benefit from the use of prototype models is hierarchical model building.
//! When a parent model contains submodels, these submodels are often an
//! implementation detail that needs not be exposed to the user. One may then
//! define a prototype model that contains all outputs and requestors ports.
//! Upon invocation of [`ProtoModel::build`], the ports are moved to the
//! appropriate submodels and those submodels are added to the simulation.
//!
//! Note that a trivial `ProtoModel` implementation is generated by default for
//! any object implementing the `Model` trait, where the associated
//! `ProtoModel::Model` type is the model type itself and where
//! `ProtoModel::build` simply returns the model instance. This is what makes it
//! possible to use either an explicitly-defined `ProtoModel` as argument to the
//! [`SimInit::add_model`](crate::simulation::SimInit::add_model) method, or a
//! plain `Model` type.
//! Note that a trivial [`ProtoModel`] implementation is generated by default
//! for any object implementing the [`Model`] trait, where the associated
//! [`ProtoModel::Model`] type is the model type itself and where
//! [`ProtoModel::build`] simply returns the model instance. This is what makes
//! it possible to use either an explicitly-defined [`ProtoModel`] as argument
//! to the [`SimInit::add_model`](crate::simulation::SimInit::add_model) method,
//! or a plain [`Model`] type.
//!
//! #### Examples
//!
//! A model that does not require initialization or building can simply use the
//! default implementation of the `Model` trait:
//! default implementation of the [`Model`] trait:
//!
//! ```
//! use asynchronix::model::Model;
@ -66,7 +65,7 @@
//! }
//! ```
//!
//! Finally, if a model builder is required, the `ProtoModel` trait can be
//! Finally, if a model builder is required, the [`ProtoModel`] trait can be
//! explicitly implemented:
//!
//! ```
@ -80,8 +79,7 @@
//! my_outputs: Vec<Output<usize>>
//! }
//! impl Multiplier {
//! // Private constructor: the final model is only built by the prototype
//! // model.
//! // Private constructor: the final model is built by the prototype model.
//! fn new(
//! value_times_1: Output<usize>,
//! value_times_2: Output<usize>,
@ -92,8 +90,7 @@
//! }
//! }
//!
//! // Public inputs and repliers to be used by the user during bench
//! // construction.
//! // Public input to be used during bench construction.
//! pub async fn my_input(&mut self, my_data: usize) {
//! for (i, output) in self.my_outputs.iter_mut().enumerate() {
//! output.send(my_data*(i + 1)).await;
@ -113,7 +110,7 @@
//!
//! fn build(
//! mut self,
//! _: &BuildContext<Self>
//! _: &mut BuildContext<Self>
//! ) -> Multiplier {
//! Multiplier::new(self.value_times_1, self.value_times_2, self.value_times_3)
//! }
@ -325,14 +322,14 @@ pub trait ProtoModel: Sized {
/// This method is invoked when the
/// [`SimInit::add_model()`](crate::simulation::SimInit::add_model) or
/// [`BuildContext::add_submodel`] method is called.
fn build(self, ctx: &BuildContext<Self>) -> Self::Model;
fn build(self, ctx: &mut BuildContext<Self>) -> Self::Model;
}
// Every model can be used as a prototype for itself.
impl<M: Model> ProtoModel for M {
type Model = Self;
fn build(self, _: &BuildContext<Self>) -> Self::Model {
fn build(self, _: &mut BuildContext<Self>) -> Self::Model {
self
}
}

View File

@ -153,7 +153,7 @@ impl<M: Model> fmt::Debug for Context<M> {
///
/// fn build(
/// self,
/// ctx: &BuildContext<Self>)
/// ctx: &mut BuildContext<Self>)
/// -> MultiplyBy4 {
/// let mut mult = MultiplyBy4 { forward: Output::default() };
/// let mut submult1 = MultiplyBy2::default();
@ -185,6 +185,7 @@ pub struct BuildContext<'a, P: ProtoModel> {
context: &'a Context<P::Model>,
executor: &'a Executor,
abort_signal: &'a Signal,
model_names: &'a mut Vec<String>,
}
impl<'a, P: ProtoModel> BuildContext<'a, P> {
@ -194,30 +195,35 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
context: &'a Context<P::Model>,
executor: &'a Executor,
abort_signal: &'a Signal,
model_names: &'a mut Vec<String>,
) -> Self {
Self {
mailbox,
context,
executor,
abort_signal,
model_names,
}
}
/// Returns the model instance name.
/// Returns the fully qualified model instance name.
///
/// The fully qualified name is made of the unqualified model name, if
/// relevant prepended by the dot-separated names of all parent models.
pub fn name(&self) -> &str {
&self.context.name
}
/// Adds a sub-model to the simulation bench.
///
/// The `name` argument needs not be unique. If an empty string is provided,
/// it is replaced by the string `<unknown>`.
///
/// The provided name is appended to that of the parent model using a dot as
/// a separator (e.g. `parent_name.child_name`) to build an identifier. This
/// identifier is used for logging or error-reporting purposes.
/// The `name` argument needs not be unique. It is appended to that of the
/// parent models' names using a dot separator (e.g.
/// `parent_name.child_name`) to build the fully qualified name. The use of
/// the dot character in the unqualified name is possible but discouraged.
/// If an empty string is provided, it is replaced by the string
/// `<unknown>`.
pub fn add_submodel<S: ProtoModel>(
&self,
&mut self,
model: S,
mailbox: Mailbox<S::Model>,
name: impl Into<String>,
@ -235,6 +241,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
self.context.scheduler.scheduler.clone(),
self.executor,
self.abort_signal,
self.model_names,
);
}
}

View File

@ -69,7 +69,7 @@
//! impl ProtoModel for ProtoParentModel {
//! type Model = ParentModel;
//!
//! fn build(self, ctx: &BuildContext<Self>) -> ParentModel {
//! fn build(self, ctx: &mut BuildContext<Self>) -> ParentModel {
//! let mut child = ChildModel::new(self.output.clone());
//!
//! ctx.add_submodel(child, Mailbox::new(), "child");

View File

@ -4,7 +4,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::vec;
use pin_project_lite::pin_project;
use pin_project::pin_project;
use diatomic_waker::WakeSink;
@ -292,15 +292,15 @@ impl<T: Clone, R> Default for QueryBroadcaster<T, R> {
}
}
pin_project! {
/// A future aggregating the outputs of a collection of sender futures.
///
/// The idea is to join all sender futures as efficiently as possible, meaning:
///
/// - the sender futures are polled simultaneously rather than waiting for their
/// completion in a sequential manner,
/// - the happy path (all futures immediately ready) is very fast.
pub(super) struct BroadcastFuture<R> {
#[pin_project]
/// A future aggregating the outputs of a collection of sender futures.
///
/// The idea is to join all sender futures as efficiently as possible, meaning:
///
/// - the sender futures are polled simultaneously rather than waiting for their
/// completion in a sequential manner,
/// - the happy path (all futures immediately ready) is very fast.
pub(super) struct BroadcastFuture<R> {
// Thread-safe waker handle.
wake_sink: WakeSink,
// Tasks associated to the sender futures.
@ -311,7 +311,6 @@ pin_project! {
pending_futures_count: usize,
// State of completion of the future.
state: FutureState,
}
}
impl<R> BroadcastFuture<R> {

View File

@ -52,14 +52,14 @@
//! possible in safe Rust) it is still possible in theory to generate deadlocks.
//! Though rare in practice, these may occur due to one of the below:
//!
//! 1. *query loopback*: if a model sends a query which is further forwarded by
//! other models until it loops back to the initial model, that model would
//! in effect wait for its own response and block,
//! 2. *mailbox saturation*: if several models concurrently send to one another
//! a very large number of messages in succession, these models may end up
//! saturating all mailboxes, at which point they will wait for the other's
//! mailboxes to free space so they can send the next message, eventually
//! preventing all of them to make further progress.
//! 1. *query loopback*: if a model sends a query which loops back to itself
//! (either directly or transitively via other models), that model
//! would in effect wait for its own response and block,
//! 2. *mailbox saturation loopback*: if an asynchronous model method sends in
//! the same call many events that end up saturating its own mailbox (either
//! directly or transitively via other models), then any attempt to send
//! another event would block forever waiting for its own mailbox to free
//! some space.
//!
//! The first scenario is usually very easy to avoid and is typically the result
//! of an improper assembly of models. Because requestor ports are only used
@ -67,21 +67,15 @@
//! exceptional.
//!
//! The second scenario is rare in well-behaving models and if it occurs, it is
//! most typically at the very beginning of a simulation when all models
//! simultaneously send events during the call to
//! most typically at the very beginning of a simulation when models
//! simultaneously and mutually send events during the call to
//! [`Model::init()`](crate::model::Model::init). If such a large amount of
//! concurrent messages is deemed normal behavior, the issue can be readily
//! remedied by increasing the capacity of the saturated mailboxes.
//! events is deemed normal behavior, the issue can be remedied by increasing
//! the capacity of the saturated mailboxes.
//!
//! At the moment, Asynchronix is unfortunately not able to discriminate between
//! such pathological deadlocks and the "expected" deadlock that occurs when all
//! events in a given time slice have completed and all models are starved on an
//! empty mailbox. Consequently, blocking method such as [`SimInit::init()`],
//! [`Simulation::step()`], [`Simulation::process_event()`], etc., will return
//! without error after a pathological deadlock, leaving the user responsible
//! for inferring the deadlock from the behavior of the simulation in the next
//! steps. This is obviously not ideal, but is hopefully only a temporary state
//! of things until a more precise deadlock detection algorithm is implemented.
//! Any deadlocks will be reported as an [`ExecutionError::Deadlock`] error,
//! which identifies all involved models and the amount of unprocessed messages
//! (events or requests) in their mailboxes.
//!
//! ## Modifying connections during simulation
//!
@ -136,12 +130,18 @@ pub(crate) use scheduler::{
};
pub use sim_init::SimInit;
use std::any::Any;
use std::cell::Cell;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
use std::time::Duration;
use std::{panic, task};
use pin_project::pin_project;
use recycle_box::{coerce_box, RecycleBox};
use crate::channel::ChannelObserver;
@ -152,6 +152,8 @@ use crate::time::{AtomicTime, Clock, MonotonicTime, SyncStatus};
use crate::util::seq_futures::SeqFuture;
use crate::util::slot;
thread_local! { pub(crate) static CURRENT_MODEL_ID: Cell<ModelId> = const { Cell::new(ModelId::none()) }; }
/// Simulation environment.
///
/// A `Simulation` is created by calling
@ -198,11 +200,13 @@ pub struct Simulation {
clock_tolerance: Option<Duration>,
timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
model_names: Vec<String>,
is_terminated: bool,
}
impl Simulation {
/// Creates a new `Simulation` with the specified clock.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
@ -211,6 +215,7 @@ impl Simulation {
clock_tolerance: Option<Duration>,
timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
model_names: Vec<String>,
) -> Self {
Self {
executor,
@ -220,6 +225,7 @@ impl Simulation {
clock_tolerance,
timeout,
observers,
model_names,
is_terminated: false,
}
}
@ -276,7 +282,7 @@ impl Simulation {
/// time.
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> {
if self.time.read() >= target_time {
return Err(ExecutionError::InvalidTargetTime(target_time));
return Err(ExecutionError::InvalidDeadline(target_time));
}
self.step_until_unchecked(target_time)
}
@ -332,8 +338,9 @@ impl Simulation {
/// Processes a query immediately, blocking until completion.
///
/// Simulation time remains unchanged. If the targeted model was not added
/// to the simulation, an `ExecutionError::InvalidQuery` is returned.
/// Simulation time remains unchanged. If the mailbox targeted by the query
/// was not found in the simulation, an [`ExecutionError::BadQuery`] is
/// returned.
pub fn process_query<M, F, T, R, S>(
&mut self,
func: F,
@ -386,11 +393,11 @@ impl Simulation {
ExecutorError::Deadlock => {
self.is_terminated = true;
let mut deadlock_info = Vec::new();
for (name, observer) in &self.observers {
for (model, observer) in &self.observers {
let mailbox_size = observer.len();
if mailbox_size != 0 {
deadlock_info.push(DeadlockInfo {
model_name: name.clone(),
model: model.clone(),
mailbox_size,
});
}
@ -403,6 +410,18 @@ impl Simulation {
ExecutionError::Timeout
}
ExecutorError::Panic(model_id, payload) => {
self.is_terminated = true;
let model = match model_id.get() {
// The panic was emitted by a model.
Some(id) => self.model_names.get(id).unwrap().clone(),
// The panic is due to an internal issue.
None => panic::resume_unwind(payload),
};
ExecutionError::Panic { model, payload }
}
})
}
@ -491,6 +510,8 @@ impl Simulation {
if let SyncStatus::OutOfSync(lag) = self.clock.synchronize(current_time) {
if let Some(tolerance) = &self.clock_tolerance {
if &lag > tolerance {
self.is_terminated = true;
return Err(ExecutionError::OutOfSync(lag));
}
}
@ -543,50 +564,75 @@ impl fmt::Debug for Simulation {
/// Information regarding a deadlocked model.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DeadlockInfo {
/// Name of the deadlocked model.
pub model_name: String,
/// The fully qualified name of a deadlocked model.
///
/// This is the name of the model, if relevant prepended by the
/// dot-separated names of all parent models.
pub model: String,
/// Number of messages in the mailbox.
pub mailbox_size: usize,
}
/// An error returned upon simulation execution failure.
///
/// Note that if a `Deadlock`, `ModelError` or `ModelPanic` is returned, any
/// subsequent attempt to run the simulation will return `Terminated`.
#[derive(Debug)]
pub enum ExecutionError {
/// The simulation has deadlocked.
/// The simulation has been terminated due to an earlier deadlock, model
/// panic, timeout or synchronization loss.
Terminated,
/// The simulation has deadlocked due to the enlisted models.
///
/// Enlists all models with non-empty mailboxes.
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
Deadlock(Vec<DeadlockInfo>),
/// A model has aborted the simulation.
ModelError {
/// Name of the model.
model_name: String,
/// Error registered by the model.
error: Box<dyn Error>,
/// A panic was caught during execution.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
Panic {
/// The fully qualified name of the panicking model.
///
/// The fully qualified name is made of the unqualified model name, if
/// relevant prepended by the dot-separated names of all parent models.
model: String,
/// The payload associated with the panic.
///
/// The payload can be usually downcast to a `String` or `&str`. This is
/// always the case if the panic was triggered by the `panic!` macro,
/// but panics can in principle emit arbitrary payloads with e.g.
/// [`panic_any`](std::panic::panic_any).
payload: Box<dyn Any + Send + 'static>,
},
/// A panic was caught during execution with the message contained in the
/// payload.
Panic(String),
/// The simulation step has failed to complete within the allocated time.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
///
/// See also [`SimInit::set_timeout`] and [`Simulation::set_timeout`].
Timeout,
/// The simulation has lost synchronization with the clock and lags behind
/// by the duration given in the payload.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
///
/// See also [`SimInit::set_clock_tolerance`].
OutOfSync(Duration),
/// The specified target simulation time is in the past of the current
/// simulation time.
InvalidTargetTime(MonotonicTime),
/// The query was invalid and did not obtain a response.
/// The query did not obtain a response because the mailbox targeted by the
/// query was not found in the simulation.
///
/// This is a non-fatal error.
BadQuery,
/// The simulation has been terminated due to an earlier deadlock, model
/// error, model panic or timeout.
Terminated,
/// The specified simulation deadline is in the past of the current
/// simulation time.
///
/// This is a non-fatal error.
InvalidDeadline(MonotonicTime),
}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Terminated => f.write_str("the simulation has been terminated"),
Self::Deadlock(list) => {
f.write_str(
"a simulation deadlock has been detected that involves the following models: ",
@ -601,7 +647,7 @@ impl fmt::Display for ExecutionError {
write!(
f,
"'{}' ({} item{} in mailbox)",
info.model_name,
info.model,
info.mailbox_size,
if info.mailbox_size == 1 { "" } else { "s" }
)?;
@ -609,16 +655,15 @@ impl fmt::Display for ExecutionError {
Ok(())
}
Self::ModelError { model_name, error } => {
write!(
f,
"the simulation has been aborted by model '{}' with the following error: {}",
model_name, error
)
}
Self::Panic(msg) => {
f.write_str("a panic has been caught during simulation:\n")?;
f.write_str(msg)
Self::Panic{model, payload} => {
let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() {
s
} else if let Some(s) = payload.downcast_ref::<String>() {
s
} else {
return write!(f, "model '{}' has panicked", model);
};
write!(f, "model '{}' has panicked with the message: '{}'", model, msg)
}
Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"),
Self::OutOfSync(lag) => {
@ -628,28 +673,19 @@ impl fmt::Display for ExecutionError {
lag
)
}
Self::InvalidTargetTime(time) => {
Self::BadQuery => f.write_str("the query did not return any response; was the target mailbox added to the simulation?"),
Self::InvalidDeadline(time) => {
write!(
f,
"target simulation stamp {} lies in the past of the current simulation time",
"the specified deadline ({}) lies in the past of the current simulation time",
time
)
}
Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"),
Self::Terminated => f.write_str("the simulation has been terminated"),
}
}
}
impl Error for ExecutionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
if let Self::ModelError { error, .. } = &self {
Some(error.as_ref())
} else {
None
}
}
}
impl Error for ExecutionError {}
/// An error returned upon simulation execution or scheduling failure.
#[derive(Debug)]
@ -698,14 +734,19 @@ pub(crate) fn add_model<P: ProtoModel>(
scheduler: Scheduler,
executor: &Executor,
abort_signal: &Signal,
model_names: &mut Vec<String>,
) {
#[cfg(feature = "tracing")]
let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name);
let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address()));
let build_context = BuildContext::new(&mailbox, &context, executor, abort_signal);
let context = Context::new(
name.clone(),
LocalScheduler::new(scheduler, mailbox.address()),
);
let mut build_context =
BuildContext::new(&mailbox, &context, executor, abort_signal, model_names);
let model = model.build(&build_context);
let model = model.build(&mut build_context);
let mut receiver = mailbox.0;
let abort_signal = abort_signal.clone();
@ -715,8 +756,90 @@ pub(crate) fn add_model<P: ProtoModel>(
while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {}
};
let model_id = ModelId::new(model_names.len());
model_names.push(name);
#[cfg(not(feature = "tracing"))]
let fut = ModelFuture::new(fut, model_id);
#[cfg(feature = "tracing")]
let fut = tracing::Instrument::instrument(fut, span);
let fut = ModelFuture::new(fut, model_id, span);
executor.spawn_and_forget(fut);
}
/// A unique index assigned to a model instance.
///
/// This is a thin wrapper over a `usize` which encodes a lack of value as
/// `usize::MAX`.
#[derive(Copy, Clone, Debug)]
pub(crate) struct ModelId(usize);
impl ModelId {
const fn none() -> Self {
Self(usize::MAX)
}
fn new(id: usize) -> Self {
assert_ne!(id, usize::MAX);
Self(id)
}
fn get(&self) -> Option<usize> {
if self.0 != usize::MAX {
Some(self.0)
} else {
None
}
}
}
impl Default for ModelId {
fn default() -> Self {
Self(usize::MAX)
}
}
#[pin_project]
struct ModelFuture<F> {
#[pin]
fut: F,
id: ModelId,
#[cfg(feature = "tracing")]
span: tracing::Span,
}
impl<F> ModelFuture<F> {
#[cfg(not(feature = "tracing"))]
fn new(fut: F, id: ModelId) -> Self {
Self { fut, id }
}
#[cfg(feature = "tracing")]
fn new(fut: F, id: ModelId, span: tracing::Span) -> Self {
Self { fut, id, span }
}
}
impl<F: Future> Future for ModelFuture<F> {
type Output = F::Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
#[cfg(feature = "tracing")]
let _enter = this.span.enter();
// The current model ID is not set/unset through a guard or scoped TLS
// because it must survive panics to identify the last model that was
// polled.
CURRENT_MODEL_ID.set(*this.id);
let poll = this.fut.poll(cx);
// The model ID is unset right after polling so we can distinguish
// between panics generated by models and panics generated by the
// executor itself, as in the later case `CURRENT_MODEL_ID.get()` will
// return `None`.
CURRENT_MODEL_ID.set(ModelId::none());
poll
}
}

View File

@ -10,7 +10,7 @@ use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, ptr};
use pin_project_lite::pin_project;
use pin_project::pin_project;
use recycle_box::{coerce_box, RecycleBox};
use crate::channel::Sender;
@ -741,18 +741,17 @@ pub(crate) trait ActionInner: Send + 'static {
fn spawn_and_forget(self: Box<Self>, executor: &Executor);
}
pin_project! {
/// An object that can be converted to a future performing a single
/// non-cancellable action.
///
/// Note that this particular action is in fact already a future: since the
/// future cannot be cancelled and the action does not need to be cloned,
/// there is no need to defer the construction of the future. This makes
/// `into_future` a trivial cast, which saves a boxing operation.
pub(crate) struct OnceAction<F> {
#[pin_project]
/// An object that can be converted to a future performing a single
/// non-cancellable action.
///
/// Note that this particular action is in fact already a future: since the
/// future cannot be cancelled and the action does not need to be cloned,
/// there is no need to defer the construction of the future. This makes
/// `into_future` a trivial cast, which saves a boxing operation.
pub(crate) struct OnceAction<F> {
#[pin]
fut: F,
}
}
impl<F> OnceAction<F>

View File

@ -22,6 +22,7 @@ pub struct SimInit {
timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
abort_signal: Signal,
model_names: Vec<String>,
}
impl SimInit {
@ -65,21 +66,26 @@ impl SimInit {
timeout: Duration::ZERO,
observers: Vec::new(),
abort_signal,
model_names: Vec::new(),
}
}
/// Adds a model and its mailbox to the simulation bench.
///
/// The `name` argument needs not be unique. If an empty string is provided,
/// it is replaced by the string `<unknown>`. This name serves an identifier
/// for logging or error-reporting purposes.
/// The `name` argument needs not be unique. The use of the dot character in
/// the name is possible but discouraged as it can cause confusion with the
/// fully qualified name of a submodel. If an empty string is provided, it
/// is replaced by the string `<unknown>`.
pub fn add_model<P: ProtoModel>(
mut self,
model: P,
mailbox: Mailbox<P::Model>,
name: impl Into<String>,
) -> Self {
let name = name.into();
let mut name = name.into();
if name.is_empty() {
name = String::from("<unknown>");
};
self.observers
.push((name.clone(), Box::new(mailbox.0.observer())));
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
@ -91,6 +97,7 @@ impl SimInit {
scheduler,
&self.executor,
&self.abort_signal,
&mut self.model_names,
);
self
@ -109,8 +116,8 @@ impl SimInit {
/// Specifies a tolerance for clock synchronization.
///
/// When a clock synchronization tolerance is set, then any report of
/// synchronization loss by `Clock::synchronize` that exceeds the specified
/// tolerance will trigger an `ExecutionError::OutOfSync` error.
/// synchronization loss by [`Clock::synchronize`] that exceeds the
/// specified tolerance will trigger an [`ExecutionError::OutOfSync`] error.
pub fn set_clock_tolerance(mut self, tolerance: Duration) -> Self {
self.clock_tolerance = Some(tolerance);
@ -149,6 +156,7 @@ impl SimInit {
self.clock_tolerance,
self.timeout,
self.observers,
self.model_names,
);
simulation.run()?;

View File

@ -5,6 +5,7 @@ mod model_scheduling;
#[cfg(not(miri))]
mod simulation_clock_sync;
mod simulation_deadlock;
mod simulation_panic;
mod simulation_scheduling;
#[cfg(not(miri))]
mod simulation_timeout;

View File

@ -55,7 +55,7 @@ fn deadlock_on_mailbox_overflow(num_threads: usize) {
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL_NAME.into(),
model: MODEL_NAME.into(),
mailbox_size: MAILBOX_SIZE
}
)
@ -90,7 +90,7 @@ fn deadlock_on_query_loopback(num_threads: usize) {
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL_NAME.into(),
model: MODEL_NAME.into(),
mailbox_size: 1,
}
);
@ -134,7 +134,7 @@ fn deadlock_on_transitive_query_loopback(num_threads: usize) {
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL1_NAME.into(),
model: MODEL1_NAME.into(),
mailbox_size: 1,
}
);
@ -192,14 +192,14 @@ fn deadlock_on_multiple_query_loopback(num_threads: usize) {
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL1_NAME.into(),
model: MODEL1_NAME.into(),
mailbox_size: 1,
}
);
assert_eq!(
deadlock_info[1],
DeadlockInfo {
model_name: MODEL2_NAME.into(),
model: MODEL2_NAME.into(),
mailbox_size: 1,
}
);

View File

@ -0,0 +1,73 @@
//! Model panic reporting.
use asynchronix::model::Model;
use asynchronix::ports::Output;
use asynchronix::simulation::{ExecutionError, Mailbox, SimInit};
use asynchronix::time::MonotonicTime;
const MT_NUM_THREADS: usize = 4;
#[derive(Default)]
struct TestModel {
countdown_out: Output<usize>,
}
impl TestModel {
async fn countdown_in(&mut self, count: usize) {
if count == 0 {
panic!("test message");
}
self.countdown_out.send(count - 1).await;
}
}
impl Model for TestModel {}
/// Pass a counter around several models and decrement it each time, panicking
/// when it becomes zero.
fn model_panic(num_threads: usize) {
const MODEL_COUNT: usize = 5;
const INIT_COUNTDOWN: usize = 9;
// Connect all models in a cycle graph.
let mut model0 = TestModel::default();
let mbox0 = Mailbox::new();
let addr0 = mbox0.address();
let mut siminit = SimInit::with_num_threads(num_threads);
let mut addr = mbox0.address();
for model_id in (1..MODEL_COUNT).rev() {
let mut model = TestModel::default();
let mbox = Mailbox::new();
model.countdown_out.connect(TestModel::countdown_in, addr);
addr = mbox.address();
siminit = siminit.add_model(model, mbox, model_id.to_string());
}
model0.countdown_out.connect(TestModel::countdown_in, addr);
siminit = siminit.add_model(model0, mbox0, 0.to_string());
// Run the simulation.
let t0 = MonotonicTime::EPOCH;
let mut simu = siminit.init(t0).unwrap();
match simu.process_event(TestModel::countdown_in, INIT_COUNTDOWN, addr0) {
Err(ExecutionError::Panic { model, payload }) => {
let msg = payload.downcast_ref::<&str>().unwrap();
let panicking_model_id = INIT_COUNTDOWN % MODEL_COUNT;
assert_eq!(model, panicking_model_id.to_string());
assert_eq!(*msg, "test message");
}
_ => panic!("panic not detected"),
}
}
#[test]
fn model_panic_st() {
model_panic(1);
}
#[test]
fn model_panic_mt() {
model_panic(MT_NUM_THREADS);
}