1
0
forked from ROMEO/nexosim

Make execution failible, impl deadlock detection

TODO: return the list of models involved in a deadlock.

Note that Many execution errors are not implemented at all at the
moment and will need separate PRs, namely:
- Terminated
- ModelError
- Panic
This commit is contained in:
Serge Barral
2024-10-20 12:35:44 +02:00
parent e7889c8e9b
commit 1cfaa00f9e
22 changed files with 556 additions and 223 deletions

View File

@ -4,6 +4,7 @@
mod queue;
use std::cell::Cell;
use std::error;
use std::fmt;
use std::future::Future;
@ -20,6 +21,14 @@ use recycle_box::coerce_box;
use crate::model::{Context, Model};
// Counts the difference between the number of sent and received messages for
// this thread.
//
// This is used by the executor to make sure that all messages have been
// received upon completion of a simulation step, i.e. that no deadlock
// occurred.
thread_local! { pub(crate) static THREAD_MSG_COUNT: Cell<isize> = const { Cell::new(0) }; }
/// Data shared between the receiver and the senders.
struct Inner<M> {
/// Non-blocking internal queue.
@ -104,6 +113,9 @@ impl<M: Model> Receiver<M> {
match msg {
Some(mut msg) => {
// Decrement the count of in-flight messages.
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));
// Consume the message to obtain a boxed future.
let fut = msg.call_once(model, context, self.future_box.take().unwrap());
@ -219,6 +231,9 @@ impl<M: Model> Sender<M> {
if success {
self.inner.receiver_signal.notify();
// Increment the count of in-flight messages.
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_add(1));
Ok(())
} else {
Err(SendError)

View File

@ -15,6 +15,12 @@ use task::Promise;
/// Unique identifier for executor instances.
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(PartialEq, Eq, Debug)]
pub(crate) enum ExecutorError {
/// The simulation has deadlocked.
Deadlock,
}
/// Context common to all executor types.
#[derive(Clone)]
pub(crate) struct SimulationContext {
@ -43,8 +49,8 @@ impl Executor {
///
/// # Panics
///
/// This will panic if the specified number of threads is zero or is more
/// than `usize::BITS`.
/// This will panic if the specified number of threads is zero or more than
/// `usize::BITS`.
pub(crate) fn new_multi_threaded(
num_threads: usize,
simulation_context: SimulationContext,
@ -85,11 +91,19 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) {
match self {
pub(crate) fn run(&mut self) -> Result<(), ExecutorError> {
let msg_count = match self {
Self::StExecutor(executor) => executor.run(),
Self::MtExecutor(executor) => executor.run(),
};
if msg_count != 0 {
assert!(msg_count > 0);
return Err(ExecutorError::Deadlock);
}
Ok(())
}
}
@ -98,7 +112,7 @@ mod tests {
use std::sync::atomic::Ordering;
use std::sync::Arc;
use futures_channel::{mpsc, oneshot};
use futures_channel::mpsc;
use futures_util::StreamExt;
use super::*;
@ -131,47 +145,6 @@ mod tests {
}
}
fn executor_deadlock(mut executor: Executor) {
let (_sender1, receiver1) = oneshot::channel::<()>();
let (_sender2, receiver2) = oneshot::channel::<()>();
let launch_count = Arc::new(AtomicUsize::new(0));
let completion_count = Arc::new(AtomicUsize::new(0));
executor.spawn_and_forget({
let launch_count = launch_count.clone();
let completion_count = completion_count.clone();
async move {
launch_count.fetch_add(1, Ordering::Relaxed);
let _ = receiver2.await;
completion_count.fetch_add(1, Ordering::Relaxed);
}
});
executor.spawn_and_forget({
let launch_count = launch_count.clone();
let completion_count = completion_count.clone();
async move {
launch_count.fetch_add(1, Ordering::Relaxed);
let _ = receiver1.await;
completion_count.fetch_add(1, Ordering::Relaxed);
}
});
executor.run();
// Check that the executor returns on deadlock, i.e. none of the task has
// completed.
assert_eq!(launch_count.load(Ordering::Relaxed), 2);
assert_eq!(completion_count.load(Ordering::Relaxed), 0);
// Drop the executor and thus the receiver tasks before the senders,
// failing which the senders may signal that the channel has been
// dropped and wake the tasks outside the executor.
drop(executor);
}
fn executor_drop_cycle(mut executor: Executor) {
let (sender1, mut receiver1) = mpsc::channel(2);
let (sender2, mut receiver2) = mpsc::channel(2);
@ -223,7 +196,7 @@ mod tests {
}
});
executor.run();
executor.run().unwrap();
// Make sure that all tasks are eventually dropped even though each task
// wakes the others when dropped.
@ -231,20 +204,6 @@ mod tests {
assert_eq!(drop_count.load(Ordering::Relaxed), 3);
}
#[test]
fn executor_deadlock_st() {
executor_deadlock(Executor::new_single_threaded(dummy_simulation_context()));
}
#[test]
fn executor_deadlock_mt() {
executor_deadlock(Executor::new_multi_threaded(3, dummy_simulation_context()));
}
#[test]
fn executor_deadlock_mt_one_worker() {
executor_deadlock(Executor::new_multi_threaded(1, dummy_simulation_context()));
}
#[test]
fn executor_drop_cycle_st() {
executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context()));

View File

@ -48,7 +48,7 @@ use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
@ -56,8 +56,9 @@ use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker};
use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable};
use super::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT};
use crate::channel;
use crate::executor::task::{self, CancelToken, Promise, Runnable};
use crate::executor::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::util::rng::Rng;
use pool_manager::PoolManager;
@ -224,7 +225,10 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) {
///
/// The number of unprocessed messages is returned. It should always be 0
/// unless a deadlock occurred.
pub(crate) fn run(&mut self) -> isize {
self.context.pool_manager.activate_worker();
loop {
@ -232,7 +236,7 @@ impl Executor {
panic::resume_unwind(worker_panic);
}
if self.context.pool_manager.pool_is_idle() {
return;
return self.context.msg_count.load(Ordering::Relaxed);
}
self.parker.park();
@ -298,6 +302,11 @@ struct ExecutorContext {
executor_unparker: Unparker,
/// Manager for all worker threads.
pool_manager: PoolManager,
/// Difference between the number of sent and received messages.
///
/// This counter is only updated by worker threads before they park and is
/// therefore only consistent once all workers are parked.
msg_count: AtomicIsize,
}
impl ExecutorContext {
@ -320,6 +329,7 @@ impl ExecutorContext {
stealers.into_boxed_slice(),
worker_unparkers,
),
msg_count: AtomicIsize::new(0),
}
}
}
@ -456,6 +466,15 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot;
// Update the global message counter.
let update_msg_count = || {
let thread_msg_count = channel::THREAD_MSG_COUNT.replace(0);
worker
.executor_context
.msg_count
.fetch_add(thread_msg_count, Ordering::Relaxed);
};
let result = panic::catch_unwind(AssertUnwindSafe(|| {
// Set how long to spin when searching for a task.
const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000);
@ -468,9 +487,10 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Try to deactivate the worker.
if pool_manager.try_set_worker_inactive(id) {
parker.park();
// No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker.
update_msg_count();
parker.park();
} else if injector.is_empty() {
// This worker could not be deactivated because it was the last
// active worker. In such case, the call to
@ -479,6 +499,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// not activate a new worker, which is why some tasks may now be
// visible in the injector queue.
pool_manager.set_all_workers_inactive();
update_msg_count();
executor_unparker.unpark();
parker.park();
// No need to call `begin_worker_search()`: this was done by the

View File

@ -8,6 +8,7 @@ use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID;
use crate::channel;
use crate::executor::{SimulationContext, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local;
@ -105,7 +106,13 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) {
///
/// The number of unprocessed messages is returned. It should always be 0
/// unless a deadlock occurred.
pub(crate) fn run(&mut self) -> isize {
// In case this executor is nested in another one, reset the counter of in-flight messages.
let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count);
SIMULATION_CONTEXT.set(&self.simulation_context, || {
ACTIVE_TASKS.set(&self.active_tasks, || {
EXECUTOR_CONTEXT.set(&self.context, || loop {
@ -118,6 +125,10 @@ impl Executor {
})
})
});
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
self.context.msg_count
}
}
@ -168,6 +179,8 @@ struct ExecutorContext {
/// Unique executor identifier inherited by all tasks spawned on this
/// executor instance.
executor_id: usize,
/// Number of in-flight messages.
msg_count: isize,
}
impl ExecutorContext {
@ -176,6 +189,7 @@ impl ExecutorContext {
Self {
queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)),
executor_id,
msg_count: 0,
}
}
}

View File

@ -10,14 +10,20 @@ import "google/protobuf/empty.proto";
enum ErrorCode {
INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1;
MISSING_ARGUMENT = 2;
INVALID_TIME = 3;
INVALID_DURATION = 4;
INVALID_MESSAGE = 5;
INVALID_KEY = 6;
SOURCE_NOT_FOUND = 10;
SINK_NOT_FOUND = 11;
SIMULATION_TIME_OUT_OF_RANGE = 12;
SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3;
SIMULATION_MODEL_ERROR = 4;
SIMULATION_PANIC = 5;
SIMULATION_BAD_QUERY = 6;
SIMULATION_TIME_OUT_OF_RANGE = 7;
MISSING_ARGUMENT = 10;
INVALID_TIME = 11;
INVALID_DURATION = 12;
INVALID_PERIOD = 13;
INVALID_MESSAGE = 14;
INVALID_KEY = 15;
SOURCE_NOT_FOUND = 20;
SINK_NOT_FOUND = 21;
}
message Error {

View File

@ -338,14 +338,20 @@ pub mod any_request {
pub enum ErrorCode {
InternalError = 0,
SimulationNotStarted = 1,
MissingArgument = 2,
InvalidTime = 3,
InvalidDuration = 4,
InvalidMessage = 5,
InvalidKey = 6,
SourceNotFound = 10,
SinkNotFound = 11,
SimulationTimeOutOfRange = 12,
SimulationTerminated = 2,
SimulationDeadlock = 3,
SimulationModelError = 4,
SimulationPanic = 5,
SimulationBadQuery = 6,
SimulationTimeOutOfRange = 22,
MissingArgument = 7,
InvalidTime = 8,
InvalidDuration = 9,
InvalidPeriod = 10,
InvalidMessage = 11,
InvalidKey = 12,
SourceNotFound = 20,
SinkNotFound = 21,
}
impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition.
@ -356,14 +362,20 @@ impl ErrorCode {
match self {
ErrorCode::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR",
ErrorCode::SimulationPanic => "SIMULATION_PANIC",
ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
ErrorCode::MissingArgument => "MISSING_ARGUMENT",
ErrorCode::InvalidTime => "INVALID_TIME",
ErrorCode::InvalidDuration => "INVALID_DURATION",
ErrorCode::InvalidPeriod => "INVALID_PERIOD",
ErrorCode::InvalidMessage => "INVALID_MESSAGE",
ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@ -371,14 +383,20 @@ impl ErrorCode {
match value {
"INTERNAL_ERROR" => Some(Self::InternalError),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError),
"SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_DURATION" => Some(Self::InvalidDuration),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
_ => None,
}
}

View File

@ -8,6 +8,7 @@ use prost_types::Timestamp;
use tai_time::MonotonicTime;
use super::codegen::simulation::{Error, ErrorCode};
use crate::simulation::ExecutionError;
pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
@ -29,6 +30,21 @@ fn simulation_not_started_error() -> Error {
)
}
/// Map an `ExecutionError` to a Protobuf error.
fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::ModelError { .. } => ErrorCode::SimulationModelError,
ExecutionError::Panic(_) => ErrorCode::SimulationPanic,
ExecutionError::BadQuery => ErrorCode::SimulationBadQuery,
ExecutionError::Terminated => ErrorCode::SimulationTerminated,
ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime,
};
let error_message = error.to_string();
to_error(error_code, error_message)
}
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
///
/// This will fail if the time is outside the protobuf-specified range for

View File

@ -8,8 +8,8 @@ use crate::simulation::Simulation;
use super::super::codegen::simulation::*;
use super::{
monotonic_to_timestamp, simulation_not_started_error, timestamp_to_monotonic, to_error,
to_positive_duration, to_strictly_positive_duration,
map_execution_error, monotonic_to_timestamp, simulation_not_started_error,
timestamp_to_monotonic, to_error, to_positive_duration, to_strictly_positive_duration,
};
/// Protobuf-based simulation manager.
@ -61,18 +61,19 @@ impl ControllerService {
/// processed events have completed.
pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply {
let reply = match self {
Self::Started { simulation, .. } => {
simulation.step();
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
step_reply::Result::Time(timestamp)
} else {
step_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
Self::Started { simulation, .. } => match simulation.step() {
Ok(()) => {
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
step_reply::Result::Time(timestamp)
} else {
step_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
}
Err(e) => step_reply::Result::Error(map_execution_error(e)),
},
Self::NotStarted => step_reply::Result::Error(simulation_not_started_error()),
};
@ -117,7 +118,7 @@ impl ControllerService {
"the specified deadline lies in the past",
))?;
simulation.step_by(duration);
simulation.step_by(duration).map_err(map_execution_error)?;
}
};
@ -221,7 +222,7 @@ impl ControllerService {
}
});
simulation.process(action);
simulation.process(action).map_err(map_execution_error)?;
Ok(key_id)
}(),
@ -315,9 +316,7 @@ impl ControllerService {
)
})?;
simulation.process(event);
Ok(())
simulation.process(event).map_err(map_execution_error)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
@ -360,11 +359,11 @@ impl ControllerService {
)
})?;
simulation.process(query);
simulation.process(query).map_err(map_execution_error)?;
let replies = promise.take_collect().ok_or(to_error(
ErrorCode::InternalError,
"a reply to the query was expected but none was available".to_string(),
ErrorCode::SimulationBadQuery,
"a reply to the query was expected but none was available; maybe the target model was not added to the simulation?".to_string(),
))?;
replies.map_err(|e| {

View File

@ -5,7 +5,7 @@ use crate::registry::EndpointRegistry;
use crate::simulation::SimInit;
use crate::simulation::Simulation;
use super::{timestamp_to_monotonic, to_error};
use super::{map_execution_error, timestamp_to_monotonic, to_error};
use super::super::codegen::simulation::*;
@ -69,7 +69,12 @@ impl InitService {
.ok_or_else(|| {
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
})
.map(|start_time| (sim_init.init(start_time), registry))
.and_then(|start_time| {
sim_init
.init(start_time)
.map_err(|e| map_execution_error(e))
.map(|sim| (sim, registry))
})
});
let (reply, bench) = match reply {

View File

@ -235,7 +235,9 @@
//! .add_model(multiplier2, multiplier2_mbox, "multiplier2")
//! .add_model(delay1, delay1_mbox, "delay1")
//! .add_model(delay2, delay2_mbox, "delay2")
//! .init(t0);
//! .init(t0)?;
//!
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ```
//!
//! ## Running simulations
@ -323,23 +325,25 @@
//! # .add_model(multiplier2, multiplier2_mbox, "multiplier2")
//! # .add_model(delay1, delay1_mbox, "delay1")
//! # .add_model(delay2, delay2_mbox, "delay2")
//! # .init(t0);
//! # .init(t0)?;
//! // Send a value to the first multiplier.
//! simu.process_event(Multiplier::input, 21.0, &input_address);
//! simu.process_event(Multiplier::input, 21.0, &input_address)?;
//!
//! // The simulation is still at t0 so nothing is expected at the output of the
//! // second delay gate.
//! assert!(output_slot.next().is_none());
//!
//! // Advance simulation time until the next event and check the time and output.
//! simu.step();
//! simu.step()?;
//! assert_eq!(simu.time(), t0 + Duration::from_secs(1));
//! assert_eq!(output_slot.next(), Some(84.0));
//!
//! // Get the answer to the ultimate question of life, the universe & everything.
//! simu.step();
//! simu.step()?;
//! assert_eq!(simu.time(), t0 + Duration::from_secs(2));
//! assert_eq!(output_slot.next(), Some(42.0));
//!
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ```
//!
//! # Message ordering guarantees

View File

@ -113,14 +113,15 @@
//! # impl Model for ModelB {};
//! # let modelA_addr = Mailbox::<ModelA>::new().address();
//! # let modelB_addr = Mailbox::<ModelB>::new().address();
//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH);
//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH)?;
//! simu.process_event(
//! |m: &mut ModelA| {
//! m.output.connect(ModelB::input, modelB_addr);
//! },
//! (),
//! &modelA_addr
//! );
//! )?;
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ```
mod mailbox;
mod scheduler;
@ -143,7 +144,7 @@ use std::time::Duration;
use recycle_box::{coerce_box, RecycleBox};
use crate::executor::Executor;
use crate::executor::{Executor, ExecutorError};
use crate::model::{Context, Model, SetupContext};
use crate::ports::{InputFn, ReplierFn};
use crate::time::{AtomicTime, Clock, MonotonicTime};
@ -223,8 +224,8 @@ impl Simulation {
/// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured
/// simulation clock. This method blocks until all newly processed events
/// have completed.
pub fn step(&mut self) {
self.step_to_next_bounded(MonotonicTime::MAX);
pub fn step(&mut self) -> Result<(), ExecutionError> {
self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ())
}
/// Iteratively advances the simulation time by the specified duration, as
@ -234,10 +235,10 @@ impl Simulation {
/// time have completed. The simulation time upon completion is equal to the
/// initial simulation time incremented by the specified duration, whether
/// or not an event was scheduled for that time.
pub fn step_by(&mut self, duration: Duration) {
pub fn step_by(&mut self, duration: Duration) -> Result<(), ExecutionError> {
let target_time = self.time.read() + duration;
self.step_until_unchecked(target_time);
self.step_until_unchecked(target_time)
}
/// Iteratively advances the simulation time until the specified deadline,
@ -247,16 +248,14 @@ impl Simulation {
/// time have completed. The simulation time upon completion is equal to the
/// specified target time, whether or not an event was scheduled for that
/// time.
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), SchedulingError> {
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> {
if self.time.read() >= target_time {
return Err(SchedulingError::InvalidScheduledTime);
return Err(ExecutionError::InvalidTargetTime(target_time));
}
self.step_until_unchecked(target_time);
Ok(())
self.step_until_unchecked(target_time)
}
/// Returns a scheduler handle.
/// Returns an owned scheduler handle.
pub fn scheduler(&self) -> Scheduler {
Scheduler::new(self.scheduler_queue.clone(), self.time.reader())
}
@ -265,15 +264,20 @@ impl Simulation {
///
/// Simulation time remains unchanged. The periodicity of the action, if
/// any, is ignored.
pub fn process(&mut self, action: Action) {
pub fn process(&mut self, action: Action) -> Result<(), ExecutionError> {
action.spawn_and_forget(&self.executor);
self.executor.run();
self.run()
}
/// Processes an event immediately, blocking until completion.
///
/// Simulation time remains unchanged.
pub fn process_event<M, F, T, S>(&mut self, func: F, arg: T, address: impl Into<Address<M>>)
pub fn process_event<M, F, T, S>(
&mut self,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), ExecutionError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
@ -297,18 +301,19 @@ impl Simulation {
};
self.executor.spawn_and_forget(fut);
self.executor.run();
self.run()
}
/// Processes a query immediately, blocking until completion.
///
/// Simulation time remains unchanged.
/// Simulation time remains unchanged. If the targeted model was not added
/// to the simulation, an `ExecutionError::InvalidQuery` is returned.
pub fn process_query<M, F, T, R, S>(
&mut self,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<R, QueryError>
) -> Result<R, ExecutionError>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>,
@ -338,9 +343,17 @@ impl Simulation {
};
self.executor.spawn_and_forget(fut);
self.executor.run();
self.run()?;
reply_reader.try_read().map_err(|_| QueryError {})
reply_reader
.try_read()
.map_err(|_| ExecutionError::BadQuery)
}
fn run(&mut self) -> Result<(), ExecutionError> {
self.executor.run().map_err(|e| match e {
ExecutorError::Deadlock => ExecutionError::Deadlock(Vec::new()),
})
}
/// Advances simulation time to that of the next scheduled action if its
@ -349,7 +362,10 @@ impl Simulation {
///
/// If at least one action was found that satisfied the time bound, the
/// corresponding new simulation time is returned.
fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option<MonotonicTime> {
fn step_to_next_bounded(
&mut self,
upper_time_bound: MonotonicTime,
) -> Result<Option<MonotonicTime>, ExecutionError> {
// Function pulling the next action. If the action is periodic, it is
// immediately re-scheduled.
fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action {
@ -380,7 +396,10 @@ impl Simulation {
// Move to the next scheduled time.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = peek_next_key(&mut scheduler_queue)?;
let mut current_key = match peek_next_key(&mut scheduler_queue) {
Some(key) => key,
None => return Ok(None),
};
self.time.write(current_key.0);
loop {
@ -420,9 +439,9 @@ impl Simulation {
let current_time = current_key.0;
// TODO: check synchronization status?
self.clock.synchronize(current_time);
self.executor.run();
self.run()?;
return Some(current_time);
return Ok(Some(current_time));
}
};
}
@ -437,18 +456,19 @@ impl Simulation {
///
/// This method does not check whether the specified time lies in the future
/// of the current simulation time.
fn step_until_unchecked(&mut self, target_time: MonotonicTime) {
fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> {
loop {
match self.step_to_next_bounded(target_time) {
// The target time was reached exactly.
Some(t) if t == target_time => return,
Ok(Some(t)) if t == target_time => return Ok(()),
// No actions are scheduled before or at the target time.
None => {
Ok(None) => {
// Update the simulation time.
self.time.write(target_time);
self.clock.synchronize(target_time);
return;
return Ok(());
}
Err(e) => return Err(e),
// The target time was not reached yet.
_ => {}
}
@ -479,6 +499,141 @@ impl fmt::Display for QueryError {
impl Error for QueryError {}
/// Information regarding a deadlocked model.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DeadlockInfo {
model_name: String,
mailbox_size: usize,
}
/// An error returned upon simulation execution failure.
///
/// Note that if a `Deadlock`, `ModelError` or `ModelPanic` is returned, any
/// subsequent attempt to run the simulation will return `Terminated`.
#[derive(Debug)]
pub enum ExecutionError {
/// The simulation has deadlocked.
///
/// Enlists all models with non-empty mailboxes.
Deadlock(Vec<DeadlockInfo>),
/// A model has aborted the simulation.
ModelError {
/// Name of the model.
model_name: String,
/// Error registered by the model.
error: Box<dyn Error>,
},
/// A panic was caught during execution with the message contained in the
/// payload.
Panic(String),
/// The specified target simulation time is in the past of the current
/// simulation time.
InvalidTargetTime(MonotonicTime),
/// The query was invalid and did not obtain a response.
BadQuery,
/// The simulation has been terminated due to an earlier deadlock, model
/// error or model panic.
Terminated,
}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Deadlock(list) => {
f.write_str(
"a simulation deadlock has been detected that involves the following models: ",
)?;
let mut first_item = true;
for info in list {
if first_item {
first_item = false;
} else {
f.write_str(", ")?;
}
write!(
f,
"'{}' ({} item{} in mailbox)",
info.model_name,
info.mailbox_size,
if info.mailbox_size == 1 { "" } else { "s" }
)?;
}
Ok(())
}
Self::ModelError { model_name, error } => {
write!(
f,
"the simulation has been aborted by model '{}' with the following error: {}",
model_name, error
)
}
Self::Panic(msg) => {
f.write_str("a panic has been caught during simulation:\n")?;
f.write_str(msg)
}
Self::InvalidTargetTime(time) => {
write!(
f,
"target simulation stamp {} lies in the past of the current simulation time",
time
)
}
Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"),
Self::Terminated => f.write_str("the simulation has been terminated"),
}
}
}
impl Error for ExecutionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
if let Self::ModelError { error, .. } = &self {
Some(error.as_ref())
} else {
None
}
}
}
/// An error returned upon simulation execution or scheduling failure.
#[derive(Debug)]
pub enum SimulationError {
/// The execution of the simulation failed.
ExecutionError(ExecutionError),
/// An attempt to schedule an item failed.
SchedulingError(SchedulingError),
}
impl fmt::Display for SimulationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::ExecutionError(e) => e.fmt(f),
Self::SchedulingError(e) => e.fmt(f),
}
}
}
impl Error for SimulationError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::ExecutionError(e) => e.source(),
Self::SchedulingError(e) => e.source(),
}
}
}
impl From<ExecutionError> for SimulationError {
fn from(e: ExecutionError) -> Self {
Self::ExecutionError(e)
}
}
impl From<SchedulingError> for SimulationError {
fn from(e: SchedulingError) -> Self {
Self::SchedulingError(e)
}
}
/// Adds a model and its mailbox to the simulation bench.
pub(crate) fn add_model<M: Model>(
mut model: M,

View File

@ -63,6 +63,13 @@ impl Scheduler {
/// model, these events are guaranteed to be processed according to the
/// scheduling order of the actions.
pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> {
// The scheduler queue must always be locked when reading the time,
// otherwise the following race could occur:
// 1) this method reads the time and concludes that it is not too late
// to schedule the action,
// 2) the `Simulation` object takes the lock, increments simulation time
// and runs the simulation step,
// 3) this method takes the lock and schedules the now-outdated action.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();

View File

@ -8,7 +8,7 @@ use crate::time::{Clock, NoClock};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation};
use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Simulation};
/// Builder for a multi-threaded, discrete-event simulation.
pub struct SimInit {
@ -82,12 +82,15 @@ impl SimInit {
/// Builds a simulation initialized at the specified simulation time,
/// executing the [`Model::init()`](crate::model::Model::init) method on all
/// model initializers.
pub fn init(mut self, start_time: MonotonicTime) -> Simulation {
pub fn init(mut self, start_time: MonotonicTime) -> Result<Simulation, ExecutionError> {
self.time.write(start_time);
self.clock.synchronize(start_time);
self.executor.run();
Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock)
let mut simulation =
Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock);
simulation.run()?;
Ok(simulation)
}
}