From ba1e668447743f894c15005025dcc04076cc0a02 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 13 Nov 2024 19:21:54 +0100 Subject: [PATCH] Report panics as errors + identify panicking model The build context is now passed as a mutable reference due to the need to mutate data when adding a model. Contains small unrelated cleanups and documentation improvements too. --- asynchronix/Cargo.toml | 2 +- asynchronix/examples/assembly.rs | 2 +- asynchronix/examples/external_input.rs | 2 +- asynchronix/src/executor.rs | 6 +- asynchronix/src/executor/mt_executor.rs | 12 +- .../src/executor/mt_executor/pool_manager.rs | 16 +- asynchronix/src/executor/st_executor.rs | 52 ++-- asynchronix/src/grpc/api/simulation.proto | 13 +- asynchronix/src/grpc/codegen/simulation.rs | 27 +- asynchronix/src/grpc/services.rs | 5 +- .../src/grpc/services/controller_service.rs | 8 +- asynchronix/src/model.rs | 47 ++- asynchronix/src/model/context.rs | 25 +- asynchronix/src/ports.rs | 2 +- asynchronix/src/ports/source/broadcaster.rs | 41 ++- asynchronix/src/simulation.rs | 277 +++++++++++++----- asynchronix/src/simulation/scheduler.rs | 25 +- asynchronix/src/simulation/sim_init.rs | 20 +- asynchronix/tests/integration/main.rs | 1 + .../tests/integration/simulation_deadlock.rs | 10 +- .../tests/integration/simulation_panic.rs | 73 +++++ 21 files changed, 437 insertions(+), 229 deletions(-) create mode 100644 asynchronix/tests/integration/simulation_panic.rs diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 46a0fe0..3b8ade2 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -41,7 +41,7 @@ futures-task = "0.3" multishot = "0.3.2" num_cpus = "1.13" parking = "2" -pin-project-lite = "0.2" +pin-project = "1" recycle-box = "0.2" slab = "0.4" spin_sleep = "1" diff --git a/asynchronix/examples/assembly.rs b/asynchronix/examples/assembly.rs index c7888f8..b3c42a0 100644 --- a/asynchronix/examples/assembly.rs +++ b/asynchronix/examples/assembly.rs @@ -85,7 +85,7 @@ impl Model for MotorAssembly {} impl ProtoModel for ProtoMotorAssembly { type Model = MotorAssembly; - fn build(self, ctx: &BuildContext) -> MotorAssembly { + fn build(self, ctx: &mut BuildContext) -> MotorAssembly { let mut assembly = MotorAssembly::new(); let mut motor = Motor::new(self.init_pos); let mut driver = Driver::new(1.0); diff --git a/asynchronix/examples/external_input.rs b/asynchronix/examples/external_input.rs index 33167aa..e5b48f2 100644 --- a/asynchronix/examples/external_input.rs +++ b/asynchronix/examples/external_input.rs @@ -58,7 +58,7 @@ impl ProtoModel for ProtoListener { type Model = Listener; /// Start the UDP Server immediately upon model construction. - fn build(self, _: &BuildContext) -> Listener { + fn build(self, _: &mut BuildContext) -> Listener { let (tx, rx) = channel(); let external_handle = thread::spawn(move || { diff --git a/asynchronix/src/executor.rs b/asynchronix/src/executor.rs index d7bafc0..1aa500e 100644 --- a/asynchronix/src/executor.rs +++ b/asynchronix/src/executor.rs @@ -4,6 +4,7 @@ mod mt_executor; mod st_executor; mod task; +use std::any::Any; use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -12,6 +13,7 @@ use std::time::Duration; use crossbeam_utils::CachePadded; use crate::macros::scoped_thread_local::scoped_thread_local; +use crate::simulation::ModelId; #[cfg(feature = "tracing")] use crate::time::AtomicTimeReader; use task::Promise; @@ -19,12 +21,14 @@ use task::Promise; /// Unique identifier for executor instances. static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); -#[derive(PartialEq, Eq, Debug)] +#[derive(Debug)] pub(crate) enum ExecutorError { /// The simulation has deadlocked. Deadlock, /// The simulation has timed out. Timeout, + /// The simulation has panicked. + Panic(ModelId, Box), } /// Context common to all executor types. diff --git a/asynchronix/src/executor/mt_executor.rs b/asynchronix/src/executor/mt_executor.rs index 6c18d92..bb9517c 100644 --- a/asynchronix/src/executor/mt_executor.rs +++ b/asynchronix/src/executor/mt_executor.rs @@ -64,6 +64,7 @@ use crate::executor::{ ExecutorError, Signal, SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT, }; use crate::macros::scoped_thread_local::scoped_thread_local; +use crate::simulation::CURRENT_MODEL_ID; use crate::util::rng::Rng; use pool_manager::PoolManager; @@ -242,8 +243,8 @@ impl Executor { self.context.pool_manager.activate_worker(); loop { - if let Some(worker_panic) = self.context.pool_manager.take_panic() { - panic::resume_unwind(worker_panic); + if let Some((model_id, payload)) = self.context.pool_manager.take_panic() { + return Err(ExecutorError::Panic(model_id, payload)); } if self.context.pool_manager.pool_is_idle() { @@ -619,9 +620,10 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker, abort_signal: Si } })); - // Propagate the panic, if any. - if let Err(panic) = result { - pool_manager.register_panic(panic); + // Report the panic, if any. + if let Err(payload) = result { + let model_id = CURRENT_MODEL_ID.take(); + pool_manager.register_panic(model_id, payload); abort_signal.set(); pool_manager.activate_all_workers(); executor_unparker.unpark(); diff --git a/asynchronix/src/executor/mt_executor/pool_manager.rs b/asynchronix/src/executor/mt_executor/pool_manager.rs index f2bec7c..409157a 100644 --- a/asynchronix/src/executor/mt_executor/pool_manager.rs +++ b/asynchronix/src/executor/mt_executor/pool_manager.rs @@ -5,6 +5,7 @@ use std::sync::Mutex; use parking::Unparker; use super::Stealer; +use crate::simulation::ModelId; use crate::util::bit; use crate::util::rng; @@ -23,7 +24,7 @@ pub(super) struct PoolManager { /// Count of all workers currently searching for tasks. searching_workers: AtomicUsize, /// Panic caught in a worker thread. - worker_panic: Mutex>>, + worker_panic: Mutex)>>, } impl PoolManager { @@ -216,20 +217,19 @@ impl PoolManager { } } - /// Registers a panic associated with the provided worker ID. + /// Registers a worker panic. /// - /// If no panic is currently registered, the panic in argument is - /// registered. If a panic was already registered by a worker and was not - /// yet processed by the executor, then nothing is done. - pub(super) fn register_panic(&self, panic: Box) { + /// If a panic was already registered and was not yet processed by the + /// executor, then nothing is done. + pub(super) fn register_panic(&self, model_id: ModelId, payload: Box) { let mut worker_panic = self.worker_panic.lock().unwrap(); if worker_panic.is_none() { - *worker_panic = Some(panic); + *worker_panic = Some((model_id, payload)); } } /// Takes a worker panic if any is registered. - pub(super) fn take_panic(&self) -> Option> { + pub(super) fn take_panic(&self) -> Option<(ModelId, Box)> { let mut worker_panic = self.worker_panic.lock().unwrap(); worker_panic.take() } diff --git a/asynchronix/src/executor/st_executor.rs b/asynchronix/src/executor/st_executor.rs index be20621..9c1008e 100644 --- a/asynchronix/src/executor/st_executor.rs +++ b/asynchronix/src/executor/st_executor.rs @@ -16,6 +16,7 @@ use super::NEXT_EXECUTOR_ID; use crate::channel; use crate::executor::{ExecutorError, Signal, SimulationContext, SIMULATION_CONTEXT}; use crate::macros::scoped_thread_local::scoped_thread_local; +use crate::simulation::CURRENT_MODEL_ID; const QUEUE_MIN_CAPACITY: usize = 32; @@ -129,16 +130,10 @@ impl Executor { let parker = Parker::new(); let unparker = parker.unparker(); let th = thread::spawn(move || { - // It is necessary to catch worker panics, otherwise the main thread - // will never be unparked if the worker thread panics. - let res = panic::catch_unwind(AssertUnwindSafe(|| inner.run())); - + let res = inner.run(); unparker.unpark(); - match res { - Ok(res) => (inner, res), - Err(e) => panic::resume_unwind(e), - } + (inner, res) }); if !parker.park_timeout(timeout) { @@ -148,14 +143,10 @@ impl Executor { return Err(ExecutorError::Timeout); } - match th.join() { - Ok((inner, res)) => { - self.inner = Some(inner); + let (inner, res) = th.join().unwrap(); + self.inner = Some(inner); - res - } - Err(e) => panic::resume_unwind(e), - } + res } } @@ -176,25 +167,34 @@ impl ExecutorInner { // In case this executor is nested in another one, reset the counter of in-flight messages. let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count); - SIMULATION_CONTEXT.set(&self.simulation_context, || { + let result = SIMULATION_CONTEXT.set(&self.simulation_context, || { ACTIVE_TASKS.set(&self.active_tasks, || { - EXECUTOR_CONTEXT.set(&self.context, || loop { - let task = match self.context.queue.borrow_mut().pop() { - Some(task) => task, - None => break, - }; + EXECUTOR_CONTEXT.set(&self.context, || { + panic::catch_unwind(AssertUnwindSafe(|| loop { + let task = match self.context.queue.borrow_mut().pop() { + Some(task) => task, + None => break, + }; - task.run(); + task.run(); - if self.abort_signal.is_set() { - return; - } + if self.abort_signal.is_set() { + return; + } + })) }) }) }); - self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); + // Return the panic payload, if any. + if let Err(payload) = result { + let model_id = CURRENT_MODEL_ID.take(); + return Err(ExecutorError::Panic(model_id, payload)); + } + + // Check for deadlock. + self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); if self.context.msg_count != 0 { assert!(self.context.msg_count > 0); diff --git a/asynchronix/src/grpc/api/simulation.proto b/asynchronix/src/grpc/api/simulation.proto index 8a9150a..25ee269 100644 --- a/asynchronix/src/grpc/api/simulation.proto +++ b/asynchronix/src/grpc/api/simulation.proto @@ -11,17 +11,16 @@ enum ErrorCode { INTERNAL_ERROR = 0; SIMULATION_NOT_STARTED = 1; SIMULATION_TERMINATED = 2; - SIMULATION_TIMEOUT = 3; - SIMULATION_DEADLOCK = 4; - SIMULATION_MODEL_ERROR = 5; - SIMULATION_PANIC = 6; + SIMULATION_DEADLOCK = 3; + SIMULATION_PANIC = 4; + SIMULATION_TIMEOUT = 5; + SIMULATION_OUT_OF_SYNC = 6; SIMULATION_BAD_QUERY = 7; SIMULATION_TIME_OUT_OF_RANGE = 8; - SIMULATION_OUT_OF_SYNC = 9; MISSING_ARGUMENT = 20; INVALID_TIME = 30; - INVALID_DURATION = 31; - INVALID_PERIOD = 32; + INVALID_PERIOD = 31; + INVALID_DEADLINE = 32; INVALID_MESSAGE = 33; INVALID_KEY = 34; SOURCE_NOT_FOUND = 40; diff --git a/asynchronix/src/grpc/codegen/simulation.rs b/asynchronix/src/grpc/codegen/simulation.rs index ea18da6..3b2a133 100644 --- a/asynchronix/src/grpc/codegen/simulation.rs +++ b/asynchronix/src/grpc/codegen/simulation.rs @@ -339,17 +339,16 @@ pub enum ErrorCode { InternalError = 0, SimulationNotStarted = 1, SimulationTerminated = 2, - SimulationTimeout = 3, - SimulationDeadlock = 4, - SimulationModelError = 5, - SimulationPanic = 6, + SimulationDeadlock = 3, + SimulationPanic = 4, + SimulationTimeout = 5, + SimulationOutOfSync = 6, SimulationBadQuery = 7, SimulationTimeOutOfRange = 8, - SimulationOutOfSync = 9, MissingArgument = 20, InvalidTime = 30, - InvalidDuration = 31, - InvalidPeriod = 32, + InvalidPeriod = 31, + InvalidDeadline = 32, InvalidMessage = 33, InvalidKey = 34, SourceNotFound = 40, @@ -365,17 +364,16 @@ impl ErrorCode { ErrorCode::InternalError => "INTERNAL_ERROR", ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED", - ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT", ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK", - ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR", ErrorCode::SimulationPanic => "SIMULATION_PANIC", + ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT", + ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY", ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", - ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", ErrorCode::MissingArgument => "MISSING_ARGUMENT", ErrorCode::InvalidTime => "INVALID_TIME", - ErrorCode::InvalidDuration => "INVALID_DURATION", ErrorCode::InvalidPeriod => "INVALID_PERIOD", + ErrorCode::InvalidDeadline => "INVALID_DEADLINE", ErrorCode::InvalidMessage => "INVALID_MESSAGE", ErrorCode::InvalidKey => "INVALID_KEY", ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", @@ -388,17 +386,16 @@ impl ErrorCode { "INTERNAL_ERROR" => Some(Self::InternalError), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), - "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), - "SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError), "SIMULATION_PANIC" => Some(Self::SimulationPanic), + "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), + "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery), "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), - "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "MISSING_ARGUMENT" => Some(Self::MissingArgument), "INVALID_TIME" => Some(Self::InvalidTime), - "INVALID_DURATION" => Some(Self::InvalidDuration), "INVALID_PERIOD" => Some(Self::InvalidPeriod), + "INVALID_DEADLINE" => Some(Self::InvalidDeadline), "INVALID_MESSAGE" => Some(Self::InvalidMessage), "INVALID_KEY" => Some(Self::InvalidKey), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), diff --git a/asynchronix/src/grpc/services.rs b/asynchronix/src/grpc/services.rs index 65bfb65..e00613a 100644 --- a/asynchronix/src/grpc/services.rs +++ b/asynchronix/src/grpc/services.rs @@ -34,13 +34,12 @@ fn simulation_not_started_error() -> Error { fn map_execution_error(error: ExecutionError) -> Error { let error_code = match error { ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, - ExecutionError::ModelError { .. } => ErrorCode::SimulationModelError, - ExecutionError::Panic(_) => ErrorCode::SimulationPanic, + ExecutionError::Panic { .. } => ErrorCode::SimulationPanic, ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, ExecutionError::Terminated => ErrorCode::SimulationTerminated, - ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime, + ExecutionError::InvalidDeadline(_) => ErrorCode::InvalidDeadline, }; let error_message = error.to_string(); diff --git a/asynchronix/src/grpc/services/controller_service.rs b/asynchronix/src/grpc/services/controller_service.rs index 97bd433..c8dd1d4 100644 --- a/asynchronix/src/grpc/services/controller_service.rs +++ b/asynchronix/src/grpc/services/controller_service.rs @@ -107,14 +107,14 @@ impl ControllerService { simulation.step_until(time).map_err(|_| { to_error( - ErrorCode::InvalidTime, + ErrorCode::InvalidDeadline, "the specified deadline lies in the past", ) })?; } step_until_request::Deadline::Duration(duration) => { let duration = to_positive_duration(duration).ok_or(to_error( - ErrorCode::InvalidDuration, + ErrorCode::InvalidDeadline, "the specified deadline lies in the past", ))?; @@ -156,7 +156,7 @@ impl ControllerService { .period .map(|period| { to_strictly_positive_duration(period).ok_or(to_error( - ErrorCode::InvalidDuration, + ErrorCode::InvalidPeriod, "the specified event period is not strictly positive", )) }) @@ -175,7 +175,7 @@ impl ControllerService { ))?, schedule_event_request::Deadline::Duration(duration) => { let duration = to_strictly_positive_duration(duration).ok_or(to_error( - ErrorCode::InvalidDuration, + ErrorCode::InvalidDeadline, "the specified scheduling deadline is not in the future", ))?; diff --git a/asynchronix/src/model.rs b/asynchronix/src/model.rs index 3e69508..1976472 100644 --- a/asynchronix/src/model.rs +++ b/asynchronix/src/model.rs @@ -9,32 +9,31 @@ //! //! It is frequently convenient to expose to users a model builder type—called a //! *model prototype*—rather than the final model. This can be done by -//! implementing the `ProtoModel`, which defines the associated model -//! type and a [`ProtoModel::build` method`] invoked when a model is added the +//! implementing the [`ProtoModel`] trait, which defines the associated model +//! type and a [`ProtoModel::build`] method invoked when a model is added the //! the simulation and returning the actual model instance. //! //! Prototype models can be used whenever the Rust builder pattern is helpful, //! for instance to set optional parameters. One of the use-cases that may -//! benefit from the use of prototype models, however, is hierarchical model -//! building. When a parent model contains sub-models, these sub-models are -//! often an implementation detail that needs not be exposed to the user. One -//! may then define a prototype model that contains all outputs and requestors -//! ports, which upon invocation of `ProtoModel::build()` are moved to the -//! appropriate sub-models (note that the `build` method also allows adding -//! sub-models to the simulation). +//! benefit from the use of prototype models is hierarchical model building. +//! When a parent model contains submodels, these submodels are often an +//! implementation detail that needs not be exposed to the user. One may then +//! define a prototype model that contains all outputs and requestors ports. +//! Upon invocation of [`ProtoModel::build`], the ports are moved to the +//! appropriate submodels and those submodels are added to the simulation. //! -//! Note that a trivial `ProtoModel` implementation is generated by default for -//! any object implementing the `Model` trait, where the associated -//! `ProtoModel::Model` type is the model type itself and where -//! `ProtoModel::build` simply returns the model instance. This is what makes it -//! possible to use either an explicitly-defined `ProtoModel` as argument to the -//! [`SimInit::add_model`](crate::simulation::SimInit::add_model) method, or a -//! plain `Model` type. +//! Note that a trivial [`ProtoModel`] implementation is generated by default +//! for any object implementing the [`Model`] trait, where the associated +//! [`ProtoModel::Model`] type is the model type itself and where +//! [`ProtoModel::build`] simply returns the model instance. This is what makes +//! it possible to use either an explicitly-defined [`ProtoModel`] as argument +//! to the [`SimInit::add_model`](crate::simulation::SimInit::add_model) method, +//! or a plain [`Model`] type. //! //! #### Examples //! //! A model that does not require initialization or building can simply use the -//! default implementation of the `Model` trait: +//! default implementation of the [`Model`] trait: //! //! ``` //! use asynchronix::model::Model; @@ -66,7 +65,7 @@ //! } //! ``` //! -//! Finally, if a model builder is required, the `ProtoModel` trait can be +//! Finally, if a model builder is required, the [`ProtoModel`] trait can be //! explicitly implemented: //! //! ``` @@ -80,8 +79,7 @@ //! my_outputs: Vec> //! } //! impl Multiplier { -//! // Private constructor: the final model is only built by the prototype -//! // model. +//! // Private constructor: the final model is built by the prototype model. //! fn new( //! value_times_1: Output, //! value_times_2: Output, @@ -92,8 +90,7 @@ //! } //! } //! -//! // Public inputs and repliers to be used by the user during bench -//! // construction. +//! // Public input to be used during bench construction. //! pub async fn my_input(&mut self, my_data: usize) { //! for (i, output) in self.my_outputs.iter_mut().enumerate() { //! output.send(my_data*(i + 1)).await; @@ -113,7 +110,7 @@ //! //! fn build( //! mut self, -//! _: &BuildContext +//! _: &mut BuildContext //! ) -> Multiplier { //! Multiplier::new(self.value_times_1, self.value_times_2, self.value_times_3) //! } @@ -325,14 +322,14 @@ pub trait ProtoModel: Sized { /// This method is invoked when the /// [`SimInit::add_model()`](crate::simulation::SimInit::add_model) or /// [`BuildContext::add_submodel`] method is called. - fn build(self, ctx: &BuildContext) -> Self::Model; + fn build(self, ctx: &mut BuildContext) -> Self::Model; } // Every model can be used as a prototype for itself. impl ProtoModel for M { type Model = Self; - fn build(self, _: &BuildContext) -> Self::Model { + fn build(self, _: &mut BuildContext) -> Self::Model { self } } diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index 8657ffc..c45ac15 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -153,7 +153,7 @@ impl fmt::Debug for Context { /// /// fn build( /// self, -/// ctx: &BuildContext) +/// ctx: &mut BuildContext) /// -> MultiplyBy4 { /// let mut mult = MultiplyBy4 { forward: Output::default() }; /// let mut submult1 = MultiplyBy2::default(); @@ -185,6 +185,7 @@ pub struct BuildContext<'a, P: ProtoModel> { context: &'a Context, executor: &'a Executor, abort_signal: &'a Signal, + model_names: &'a mut Vec, } impl<'a, P: ProtoModel> BuildContext<'a, P> { @@ -194,30 +195,35 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { context: &'a Context, executor: &'a Executor, abort_signal: &'a Signal, + model_names: &'a mut Vec, ) -> Self { Self { mailbox, context, executor, abort_signal, + model_names, } } - /// Returns the model instance name. + /// Returns the fully qualified model instance name. + /// + /// The fully qualified name is made of the unqualified model name, if + /// relevant prepended by the dot-separated names of all parent models. pub fn name(&self) -> &str { &self.context.name } /// Adds a sub-model to the simulation bench. /// - /// The `name` argument needs not be unique. If an empty string is provided, - /// it is replaced by the string ``. - /// - /// The provided name is appended to that of the parent model using a dot as - /// a separator (e.g. `parent_name.child_name`) to build an identifier. This - /// identifier is used for logging or error-reporting purposes. + /// The `name` argument needs not be unique. It is appended to that of the + /// parent models' names using a dot separator (e.g. + /// `parent_name.child_name`) to build the fully qualified name. The use of + /// the dot character in the unqualified name is possible but discouraged. + /// If an empty string is provided, it is replaced by the string + /// ``. pub fn add_submodel( - &self, + &mut self, model: S, mailbox: Mailbox, name: impl Into, @@ -235,6 +241,7 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { self.context.scheduler.scheduler.clone(), self.executor, self.abort_signal, + self.model_names, ); } } diff --git a/asynchronix/src/ports.rs b/asynchronix/src/ports.rs index d90bdc7..76b691a 100644 --- a/asynchronix/src/ports.rs +++ b/asynchronix/src/ports.rs @@ -69,7 +69,7 @@ //! impl ProtoModel for ProtoParentModel { //! type Model = ParentModel; //! -//! fn build(self, ctx: &BuildContext) -> ParentModel { +//! fn build(self, ctx: &mut BuildContext) -> ParentModel { //! let mut child = ChildModel::new(self.output.clone()); //! //! ctx.add_submodel(child, Mailbox::new(), "child"); diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 63576d8..998f693 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::vec; -use pin_project_lite::pin_project; +use pin_project::pin_project; use diatomic_waker::WakeSink; @@ -292,26 +292,25 @@ impl Default for QueryBroadcaster { } } -pin_project! { - /// A future aggregating the outputs of a collection of sender futures. - /// - /// The idea is to join all sender futures as efficiently as possible, meaning: - /// - /// - the sender futures are polled simultaneously rather than waiting for their - /// completion in a sequential manner, - /// - the happy path (all futures immediately ready) is very fast. - pub(super) struct BroadcastFuture { - // Thread-safe waker handle. - wake_sink: WakeSink, - // Tasks associated to the sender futures. - task_set: TaskSet, - // List of all sender futures or their outputs. - future_states: Vec>, - // The total count of futures that have not yet been polled to completion. - pending_futures_count: usize, - // State of completion of the future. - state: FutureState, - } +#[pin_project] +/// A future aggregating the outputs of a collection of sender futures. +/// +/// The idea is to join all sender futures as efficiently as possible, meaning: +/// +/// - the sender futures are polled simultaneously rather than waiting for their +/// completion in a sequential manner, +/// - the happy path (all futures immediately ready) is very fast. +pub(super) struct BroadcastFuture { + // Thread-safe waker handle. + wake_sink: WakeSink, + // Tasks associated to the sender futures. + task_set: TaskSet, + // List of all sender futures or their outputs. + future_states: Vec>, + // The total count of futures that have not yet been polled to completion. + pending_futures_count: usize, + // State of completion of the future. + state: FutureState, } impl BroadcastFuture { diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 6478fbe..60ef222 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -52,14 +52,14 @@ //! possible in safe Rust) it is still possible in theory to generate deadlocks. //! Though rare in practice, these may occur due to one of the below: //! -//! 1. *query loopback*: if a model sends a query which is further forwarded by -//! other models until it loops back to the initial model, that model would -//! in effect wait for its own response and block, -//! 2. *mailbox saturation*: if several models concurrently send to one another -//! a very large number of messages in succession, these models may end up -//! saturating all mailboxes, at which point they will wait for the other's -//! mailboxes to free space so they can send the next message, eventually -//! preventing all of them to make further progress. +//! 1. *query loopback*: if a model sends a query which loops back to itself +//! (either directly or transitively via other models), that model +//! would in effect wait for its own response and block, +//! 2. *mailbox saturation loopback*: if an asynchronous model method sends in +//! the same call many events that end up saturating its own mailbox (either +//! directly or transitively via other models), then any attempt to send +//! another event would block forever waiting for its own mailbox to free +//! some space. //! //! The first scenario is usually very easy to avoid and is typically the result //! of an improper assembly of models. Because requestor ports are only used @@ -67,21 +67,15 @@ //! exceptional. //! //! The second scenario is rare in well-behaving models and if it occurs, it is -//! most typically at the very beginning of a simulation when all models -//! simultaneously send events during the call to +//! most typically at the very beginning of a simulation when models +//! simultaneously and mutually send events during the call to //! [`Model::init()`](crate::model::Model::init). If such a large amount of -//! concurrent messages is deemed normal behavior, the issue can be readily -//! remedied by increasing the capacity of the saturated mailboxes. +//! events is deemed normal behavior, the issue can be remedied by increasing +//! the capacity of the saturated mailboxes. //! -//! At the moment, Asynchronix is unfortunately not able to discriminate between -//! such pathological deadlocks and the "expected" deadlock that occurs when all -//! events in a given time slice have completed and all models are starved on an -//! empty mailbox. Consequently, blocking method such as [`SimInit::init()`], -//! [`Simulation::step()`], [`Simulation::process_event()`], etc., will return -//! without error after a pathological deadlock, leaving the user responsible -//! for inferring the deadlock from the behavior of the simulation in the next -//! steps. This is obviously not ideal, but is hopefully only a temporary state -//! of things until a more precise deadlock detection algorithm is implemented. +//! Any deadlocks will be reported as an [`ExecutionError::Deadlock`] error, +//! which identifies all involved models and the amount of unprocessed messages +//! (events or requests) in their mailboxes. //! //! ## Modifying connections during simulation //! @@ -136,12 +130,18 @@ pub(crate) use scheduler::{ }; pub use sim_init::SimInit; +use std::any::Any; +use std::cell::Cell; use std::error::Error; use std::fmt; use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex, MutexGuard}; +use std::task::Poll; use std::time::Duration; +use std::{panic, task}; +use pin_project::pin_project; use recycle_box::{coerce_box, RecycleBox}; use crate::channel::ChannelObserver; @@ -152,6 +152,8 @@ use crate::time::{AtomicTime, Clock, MonotonicTime, SyncStatus}; use crate::util::seq_futures::SeqFuture; use crate::util::slot; +thread_local! { pub(crate) static CURRENT_MODEL_ID: Cell = const { Cell::new(ModelId::none()) }; } + /// Simulation environment. /// /// A `Simulation` is created by calling @@ -198,11 +200,13 @@ pub struct Simulation { clock_tolerance: Option, timeout: Duration, observers: Vec<(String, Box)>, + model_names: Vec, is_terminated: bool, } impl Simulation { /// Creates a new `Simulation` with the specified clock. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( executor: Executor, scheduler_queue: Arc>, @@ -211,6 +215,7 @@ impl Simulation { clock_tolerance: Option, timeout: Duration, observers: Vec<(String, Box)>, + model_names: Vec, ) -> Self { Self { executor, @@ -220,6 +225,7 @@ impl Simulation { clock_tolerance, timeout, observers, + model_names, is_terminated: false, } } @@ -276,7 +282,7 @@ impl Simulation { /// time. pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { if self.time.read() >= target_time { - return Err(ExecutionError::InvalidTargetTime(target_time)); + return Err(ExecutionError::InvalidDeadline(target_time)); } self.step_until_unchecked(target_time) } @@ -332,8 +338,9 @@ impl Simulation { /// Processes a query immediately, blocking until completion. /// - /// Simulation time remains unchanged. If the targeted model was not added - /// to the simulation, an `ExecutionError::InvalidQuery` is returned. + /// Simulation time remains unchanged. If the mailbox targeted by the query + /// was not found in the simulation, an [`ExecutionError::BadQuery`] is + /// returned. pub fn process_query( &mut self, func: F, @@ -386,11 +393,11 @@ impl Simulation { ExecutorError::Deadlock => { self.is_terminated = true; let mut deadlock_info = Vec::new(); - for (name, observer) in &self.observers { + for (model, observer) in &self.observers { let mailbox_size = observer.len(); if mailbox_size != 0 { deadlock_info.push(DeadlockInfo { - model_name: name.clone(), + model: model.clone(), mailbox_size, }); } @@ -403,6 +410,18 @@ impl Simulation { ExecutionError::Timeout } + ExecutorError::Panic(model_id, payload) => { + self.is_terminated = true; + + let model = match model_id.get() { + // The panic was emitted by a model. + Some(id) => self.model_names.get(id).unwrap().clone(), + // The panic is due to an internal issue. + None => panic::resume_unwind(payload), + }; + + ExecutionError::Panic { model, payload } + } }) } @@ -491,6 +510,8 @@ impl Simulation { if let SyncStatus::OutOfSync(lag) = self.clock.synchronize(current_time) { if let Some(tolerance) = &self.clock_tolerance { if &lag > tolerance { + self.is_terminated = true; + return Err(ExecutionError::OutOfSync(lag)); } } @@ -543,50 +564,75 @@ impl fmt::Debug for Simulation { /// Information regarding a deadlocked model. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct DeadlockInfo { - /// Name of the deadlocked model. - pub model_name: String, + /// The fully qualified name of a deadlocked model. + /// + /// This is the name of the model, if relevant prepended by the + /// dot-separated names of all parent models. + pub model: String, /// Number of messages in the mailbox. pub mailbox_size: usize, } /// An error returned upon simulation execution failure. -/// -/// Note that if a `Deadlock`, `ModelError` or `ModelPanic` is returned, any -/// subsequent attempt to run the simulation will return `Terminated`. #[derive(Debug)] pub enum ExecutionError { - /// The simulation has deadlocked. + /// The simulation has been terminated due to an earlier deadlock, model + /// panic, timeout or synchronization loss. + Terminated, + /// The simulation has deadlocked due to the enlisted models. /// - /// Enlists all models with non-empty mailboxes. + /// This is a fatal error: any subsequent attempt to run the simulation will + /// return an [`ExecutionError::Terminated`] error. Deadlock(Vec), - /// A model has aborted the simulation. - ModelError { - /// Name of the model. - model_name: String, - /// Error registered by the model. - error: Box, + /// A panic was caught during execution. + /// + /// This is a fatal error: any subsequent attempt to run the simulation will + /// return an [`ExecutionError::Terminated`] error. + Panic { + /// The fully qualified name of the panicking model. + /// + /// The fully qualified name is made of the unqualified model name, if + /// relevant prepended by the dot-separated names of all parent models. + model: String, + /// The payload associated with the panic. + /// + /// The payload can be usually downcast to a `String` or `&str`. This is + /// always the case if the panic was triggered by the `panic!` macro, + /// but panics can in principle emit arbitrary payloads with e.g. + /// [`panic_any`](std::panic::panic_any). + payload: Box, }, - /// A panic was caught during execution with the message contained in the - /// payload. - Panic(String), /// The simulation step has failed to complete within the allocated time. + /// + /// This is a fatal error: any subsequent attempt to run the simulation will + /// return an [`ExecutionError::Terminated`] error. + /// + /// See also [`SimInit::set_timeout`] and [`Simulation::set_timeout`]. Timeout, /// The simulation has lost synchronization with the clock and lags behind /// by the duration given in the payload. + /// + /// This is a fatal error: any subsequent attempt to run the simulation will + /// return an [`ExecutionError::Terminated`] error. + /// + /// See also [`SimInit::set_clock_tolerance`]. OutOfSync(Duration), - /// The specified target simulation time is in the past of the current - /// simulation time. - InvalidTargetTime(MonotonicTime), - /// The query was invalid and did not obtain a response. + /// The query did not obtain a response because the mailbox targeted by the + /// query was not found in the simulation. + /// + /// This is a non-fatal error. BadQuery, - /// The simulation has been terminated due to an earlier deadlock, model - /// error, model panic or timeout. - Terminated, + /// The specified simulation deadline is in the past of the current + /// simulation time. + /// + /// This is a non-fatal error. + InvalidDeadline(MonotonicTime), } impl fmt::Display for ExecutionError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + Self::Terminated => f.write_str("the simulation has been terminated"), Self::Deadlock(list) => { f.write_str( "a simulation deadlock has been detected that involves the following models: ", @@ -601,7 +647,7 @@ impl fmt::Display for ExecutionError { write!( f, "'{}' ({} item{} in mailbox)", - info.model_name, + info.model, info.mailbox_size, if info.mailbox_size == 1 { "" } else { "s" } )?; @@ -609,16 +655,15 @@ impl fmt::Display for ExecutionError { Ok(()) } - Self::ModelError { model_name, error } => { - write!( - f, - "the simulation has been aborted by model '{}' with the following error: {}", - model_name, error - ) - } - Self::Panic(msg) => { - f.write_str("a panic has been caught during simulation:\n")?; - f.write_str(msg) + Self::Panic{model, payload} => { + let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() { + s + } else if let Some(s) = payload.downcast_ref::() { + s + } else { + return write!(f, "model '{}' has panicked", model); + }; + write!(f, "model '{}' has panicked with the message: '{}'", model, msg) } Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"), Self::OutOfSync(lag) => { @@ -628,28 +673,19 @@ impl fmt::Display for ExecutionError { lag ) } - Self::InvalidTargetTime(time) => { + Self::BadQuery => f.write_str("the query did not return any response; was the target mailbox added to the simulation?"), + Self::InvalidDeadline(time) => { write!( f, - "target simulation stamp {} lies in the past of the current simulation time", + "the specified deadline ({}) lies in the past of the current simulation time", time ) } - Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"), - Self::Terminated => f.write_str("the simulation has been terminated"), } } } -impl Error for ExecutionError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - if let Self::ModelError { error, .. } = &self { - Some(error.as_ref()) - } else { - None - } - } -} +impl Error for ExecutionError {} /// An error returned upon simulation execution or scheduling failure. #[derive(Debug)] @@ -698,14 +734,19 @@ pub(crate) fn add_model( scheduler: Scheduler, executor: &Executor, abort_signal: &Signal, + model_names: &mut Vec, ) { #[cfg(feature = "tracing")] let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", name); - let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address())); - let build_context = BuildContext::new(&mailbox, &context, executor, abort_signal); + let context = Context::new( + name.clone(), + LocalScheduler::new(scheduler, mailbox.address()), + ); + let mut build_context = + BuildContext::new(&mailbox, &context, executor, abort_signal, model_names); - let model = model.build(&build_context); + let model = model.build(&mut build_context); let mut receiver = mailbox.0; let abort_signal = abort_signal.clone(); @@ -715,8 +756,90 @@ pub(crate) fn add_model( while !abort_signal.is_set() && receiver.recv(&mut model, &context).await.is_ok() {} }; + let model_id = ModelId::new(model_names.len()); + model_names.push(name); + + #[cfg(not(feature = "tracing"))] + let fut = ModelFuture::new(fut, model_id); #[cfg(feature = "tracing")] - let fut = tracing::Instrument::instrument(fut, span); + let fut = ModelFuture::new(fut, model_id, span); executor.spawn_and_forget(fut); } + +/// A unique index assigned to a model instance. +/// +/// This is a thin wrapper over a `usize` which encodes a lack of value as +/// `usize::MAX`. +#[derive(Copy, Clone, Debug)] +pub(crate) struct ModelId(usize); + +impl ModelId { + const fn none() -> Self { + Self(usize::MAX) + } + fn new(id: usize) -> Self { + assert_ne!(id, usize::MAX); + + Self(id) + } + fn get(&self) -> Option { + if self.0 != usize::MAX { + Some(self.0) + } else { + None + } + } +} + +impl Default for ModelId { + fn default() -> Self { + Self(usize::MAX) + } +} + +#[pin_project] +struct ModelFuture { + #[pin] + fut: F, + id: ModelId, + #[cfg(feature = "tracing")] + span: tracing::Span, +} + +impl ModelFuture { + #[cfg(not(feature = "tracing"))] + fn new(fut: F, id: ModelId) -> Self { + Self { fut, id } + } + #[cfg(feature = "tracing")] + fn new(fut: F, id: ModelId, span: tracing::Span) -> Self { + Self { fut, id, span } + } +} + +impl Future for ModelFuture { + type Output = F::Output; + + // Required method + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let this = self.project(); + + #[cfg(feature = "tracing")] + let _enter = this.span.enter(); + + // The current model ID is not set/unset through a guard or scoped TLS + // because it must survive panics to identify the last model that was + // polled. + CURRENT_MODEL_ID.set(*this.id); + let poll = this.fut.poll(cx); + + // The model ID is unset right after polling so we can distinguish + // between panics generated by models and panics generated by the + // executor itself, as in the later case `CURRENT_MODEL_ID.get()` will + // return `None`. + CURRENT_MODEL_ID.set(ModelId::none()); + + poll + } +} diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index 2accfdf..79aeead 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -10,7 +10,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use std::{fmt, ptr}; -use pin_project_lite::pin_project; +use pin_project::pin_project; use recycle_box::{coerce_box, RecycleBox}; use crate::channel::Sender; @@ -741,18 +741,17 @@ pub(crate) trait ActionInner: Send + 'static { fn spawn_and_forget(self: Box, executor: &Executor); } -pin_project! { - /// An object that can be converted to a future performing a single - /// non-cancellable action. - /// - /// Note that this particular action is in fact already a future: since the - /// future cannot be cancelled and the action does not need to be cloned, - /// there is no need to defer the construction of the future. This makes - /// `into_future` a trivial cast, which saves a boxing operation. - pub(crate) struct OnceAction { - #[pin] - fut: F, - } +#[pin_project] +/// An object that can be converted to a future performing a single +/// non-cancellable action. +/// +/// Note that this particular action is in fact already a future: since the +/// future cannot be cancelled and the action does not need to be cloned, +/// there is no need to defer the construction of the future. This makes +/// `into_future` a trivial cast, which saves a boxing operation. +pub(crate) struct OnceAction { + #[pin] + fut: F, } impl OnceAction diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index b5aa31c..793b635 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -22,6 +22,7 @@ pub struct SimInit { timeout: Duration, observers: Vec<(String, Box)>, abort_signal: Signal, + model_names: Vec, } impl SimInit { @@ -65,21 +66,26 @@ impl SimInit { timeout: Duration::ZERO, observers: Vec::new(), abort_signal, + model_names: Vec::new(), } } /// Adds a model and its mailbox to the simulation bench. /// - /// The `name` argument needs not be unique. If an empty string is provided, - /// it is replaced by the string ``. This name serves an identifier - /// for logging or error-reporting purposes. + /// The `name` argument needs not be unique. The use of the dot character in + /// the name is possible but discouraged as it can cause confusion with the + /// fully qualified name of a submodel. If an empty string is provided, it + /// is replaced by the string ``. pub fn add_model( mut self, model: P, mailbox: Mailbox, name: impl Into, ) -> Self { - let name = name.into(); + let mut name = name.into(); + if name.is_empty() { + name = String::from(""); + }; self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); @@ -91,6 +97,7 @@ impl SimInit { scheduler, &self.executor, &self.abort_signal, + &mut self.model_names, ); self @@ -109,8 +116,8 @@ impl SimInit { /// Specifies a tolerance for clock synchronization. /// /// When a clock synchronization tolerance is set, then any report of - /// synchronization loss by `Clock::synchronize` that exceeds the specified - /// tolerance will trigger an `ExecutionError::OutOfSync` error. + /// synchronization loss by [`Clock::synchronize`] that exceeds the + /// specified tolerance will trigger an [`ExecutionError::OutOfSync`] error. pub fn set_clock_tolerance(mut self, tolerance: Duration) -> Self { self.clock_tolerance = Some(tolerance); @@ -149,6 +156,7 @@ impl SimInit { self.clock_tolerance, self.timeout, self.observers, + self.model_names, ); simulation.run()?; diff --git a/asynchronix/tests/integration/main.rs b/asynchronix/tests/integration/main.rs index 7a93ac1..99d1658 100644 --- a/asynchronix/tests/integration/main.rs +++ b/asynchronix/tests/integration/main.rs @@ -5,6 +5,7 @@ mod model_scheduling; #[cfg(not(miri))] mod simulation_clock_sync; mod simulation_deadlock; +mod simulation_panic; mod simulation_scheduling; #[cfg(not(miri))] mod simulation_timeout; diff --git a/asynchronix/tests/integration/simulation_deadlock.rs b/asynchronix/tests/integration/simulation_deadlock.rs index 2507baa..b4e933d 100644 --- a/asynchronix/tests/integration/simulation_deadlock.rs +++ b/asynchronix/tests/integration/simulation_deadlock.rs @@ -55,7 +55,7 @@ fn deadlock_on_mailbox_overflow(num_threads: usize) { assert_eq!( deadlock_info[0], DeadlockInfo { - model_name: MODEL_NAME.into(), + model: MODEL_NAME.into(), mailbox_size: MAILBOX_SIZE } ) @@ -90,7 +90,7 @@ fn deadlock_on_query_loopback(num_threads: usize) { assert_eq!( deadlock_info[0], DeadlockInfo { - model_name: MODEL_NAME.into(), + model: MODEL_NAME.into(), mailbox_size: 1, } ); @@ -134,7 +134,7 @@ fn deadlock_on_transitive_query_loopback(num_threads: usize) { assert_eq!( deadlock_info[0], DeadlockInfo { - model_name: MODEL1_NAME.into(), + model: MODEL1_NAME.into(), mailbox_size: 1, } ); @@ -192,14 +192,14 @@ fn deadlock_on_multiple_query_loopback(num_threads: usize) { assert_eq!( deadlock_info[0], DeadlockInfo { - model_name: MODEL1_NAME.into(), + model: MODEL1_NAME.into(), mailbox_size: 1, } ); assert_eq!( deadlock_info[1], DeadlockInfo { - model_name: MODEL2_NAME.into(), + model: MODEL2_NAME.into(), mailbox_size: 1, } ); diff --git a/asynchronix/tests/integration/simulation_panic.rs b/asynchronix/tests/integration/simulation_panic.rs new file mode 100644 index 0000000..170aaf2 --- /dev/null +++ b/asynchronix/tests/integration/simulation_panic.rs @@ -0,0 +1,73 @@ +//! Model panic reporting. + +use asynchronix::model::Model; +use asynchronix::ports::Output; +use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; +use asynchronix::time::MonotonicTime; + +const MT_NUM_THREADS: usize = 4; + +#[derive(Default)] +struct TestModel { + countdown_out: Output, +} +impl TestModel { + async fn countdown_in(&mut self, count: usize) { + if count == 0 { + panic!("test message"); + } + self.countdown_out.send(count - 1).await; + } +} +impl Model for TestModel {} + +/// Pass a counter around several models and decrement it each time, panicking +/// when it becomes zero. +fn model_panic(num_threads: usize) { + const MODEL_COUNT: usize = 5; + const INIT_COUNTDOWN: usize = 9; + + // Connect all models in a cycle graph. + let mut model0 = TestModel::default(); + let mbox0 = Mailbox::new(); + let addr0 = mbox0.address(); + + let mut siminit = SimInit::with_num_threads(num_threads); + + let mut addr = mbox0.address(); + for model_id in (1..MODEL_COUNT).rev() { + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + model.countdown_out.connect(TestModel::countdown_in, addr); + addr = mbox.address(); + siminit = siminit.add_model(model, mbox, model_id.to_string()); + } + + model0.countdown_out.connect(TestModel::countdown_in, addr); + siminit = siminit.add_model(model0, mbox0, 0.to_string()); + + // Run the simulation. + let t0 = MonotonicTime::EPOCH; + let mut simu = siminit.init(t0).unwrap(); + + match simu.process_event(TestModel::countdown_in, INIT_COUNTDOWN, addr0) { + Err(ExecutionError::Panic { model, payload }) => { + let msg = payload.downcast_ref::<&str>().unwrap(); + let panicking_model_id = INIT_COUNTDOWN % MODEL_COUNT; + + assert_eq!(model, panicking_model_id.to_string()); + assert_eq!(*msg, "test message"); + } + _ => panic!("panic not detected"), + } +} + +#[test] +fn model_panic_st() { + model_panic(1); +} + +#[test] +fn model_panic_mt() { + model_panic(MT_NUM_THREADS); +}