From 1cfaa00f9ee94d8e471c59f424134eec42121f33 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sun, 20 Oct 2024 12:35:44 +0200 Subject: [PATCH 1/2] Make execution failible, impl deadlock detection TODO: return the list of models involved in a deadlock. Note that Many execution errors are not implemented at all at the moment and will need separate PRs, namely: - Terminated - ModelError - Panic --- asynchronix/examples/assembly.rs | 14 +- asynchronix/examples/espresso_machine.rs | 34 +-- asynchronix/examples/external_input.rs | 10 +- asynchronix/examples/power_supply.rs | 10 +- asynchronix/examples/stepper_motor.rs | 30 +-- asynchronix/src/channel.rs | 15 ++ asynchronix/src/executor.rs | 81 ++----- asynchronix/src/executor/mt_executor.rs | 33 ++- asynchronix/src/executor/st_executor.rs | 16 +- asynchronix/src/grpc/api/simulation.proto | 22 +- asynchronix/src/grpc/codegen/simulation.rs | 38 +++- asynchronix/src/grpc/services.rs | 16 ++ .../src/grpc/services/controller_service.rs | 41 ++-- asynchronix/src/grpc/services/init_service.rs | 9 +- asynchronix/src/lib.rs | 14 +- asynchronix/src/simulation.rs | 213 +++++++++++++++--- asynchronix/src/simulation/scheduler.rs | 7 + asynchronix/src/simulation/sim_init.rs | 11 +- asynchronix/tests/model_scheduling.rs | 38 ++-- asynchronix/tests/simulation_deadlock.rs | 98 ++++++++ asynchronix/tests/simulation_scheduling.rs | 27 +-- asynchronix/tests/tests.rs | 2 + 22 files changed, 556 insertions(+), 223 deletions(-) create mode 100644 asynchronix/tests/simulation_deadlock.rs diff --git a/asynchronix/examples/assembly.rs b/asynchronix/examples/assembly.rs index 3445d68..f5b1114 100644 --- a/asynchronix/examples/assembly.rs +++ b/asynchronix/examples/assembly.rs @@ -25,7 +25,7 @@ use std::time::Duration; use asynchronix::model::{Model, SetupContext}; use asynchronix::ports::{EventBuffer, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit, SimulationError}; use asynchronix::time::MonotonicTime; mod stepper_motor; @@ -84,7 +84,7 @@ impl Model for MotorAssembly { } } -fn main() { +fn main() -> Result<(), SimulationError> { // --------------- // Bench assembly. // --------------- @@ -107,7 +107,7 @@ fn main() { // Assembly and initialization. let mut simu = SimInit::new() .add_model(assembly, assembly_mbox, "assembly") - .init(t0); + .init(t0)?; let scheduler = simu.scheduler(); @@ -132,10 +132,10 @@ fn main() { .unwrap(); // Advance simulation time to two next events. - simu.step(); + simu.step()?; t += Duration::new(2, 0); assert_eq!(simu.time(), t); - simu.step(); + simu.step()?; t += Duration::new(0, 100_000_000); assert_eq!(simu.time(), t); @@ -147,7 +147,7 @@ fn main() { // Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to // 9 position increments. - simu.step_by(Duration::new(0, 900_000_000)); + simu.step_by(Duration::new(0, 900_000_000))?; t += Duration::new(0, 900_000_000); assert_eq!(simu.time(), t); for _ in 0..9 { @@ -155,4 +155,6 @@ fn main() { assert_eq!(position.next(), Some(pos)); } assert!(position.next().is_none()); + + Ok(()) } diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index e8f9be2..235dd4f 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -35,7 +35,7 @@ use std::time::Duration; use asynchronix::model::{Context, InitializedModel, Model}; use asynchronix::ports::{EventSlot, Output}; -use asynchronix::simulation::{ActionKey, Mailbox, SimInit}; +use asynchronix::simulation::{ActionKey, Mailbox, SimInit, SimulationError}; use asynchronix::time::MonotonicTime; /// Water pump. @@ -332,7 +332,7 @@ pub enum WaterSenseState { NotEmpty, } -fn main() { +fn main() -> Result<(), SimulationError> { // --------------- // Bench assembly. // --------------- @@ -375,7 +375,7 @@ fn main() { .add_model(controller, controller_mbox, "controller") .add_model(pump, pump_mbox, "pump") .add_model(tank, tank_mbox, "tank") - .init(t0); + .init(t0)?; let scheduler = simu.scheduler(); @@ -388,10 +388,10 @@ fn main() { assert_eq!(simu.time(), t); // Brew one espresso shot with the default brew time. - simu.process_event(Controller::brew_cmd, (), &controller_addr); + simu.process_event(Controller::brew_cmd, (), &controller_addr)?; assert_eq!(flow_rate.next(), Some(pump_flow_rate)); - simu.step(); + simu.step()?; t += Controller::DEFAULT_BREW_TIME; assert_eq!(simu.time(), t); assert_eq!(flow_rate.next(), Some(0.0)); @@ -400,33 +400,33 @@ fn main() { let volume_per_shot = pump_flow_rate * Controller::DEFAULT_BREW_TIME.as_secs_f64(); let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who cares about floating-point rounding errors? for _ in 0..(shots_per_tank - 1) { - simu.process_event(Controller::brew_cmd, (), &controller_addr); + simu.process_event(Controller::brew_cmd, (), &controller_addr)?; assert_eq!(flow_rate.next(), Some(pump_flow_rate)); - simu.step(); + simu.step()?; t += Controller::DEFAULT_BREW_TIME; assert_eq!(simu.time(), t); assert_eq!(flow_rate.next(), Some(0.0)); } // Check that the tank becomes empty before the completion of the next shot. - simu.process_event(Controller::brew_cmd, (), &controller_addr); - simu.step(); + simu.process_event(Controller::brew_cmd, (), &controller_addr)?; + simu.step()?; assert!(simu.time() < t + Controller::DEFAULT_BREW_TIME); t = simu.time(); assert_eq!(flow_rate.next(), Some(0.0)); // Try to brew another shot while the tank is still empty. - simu.process_event(Controller::brew_cmd, (), &controller_addr); + simu.process_event(Controller::brew_cmd, (), &controller_addr)?; assert!(flow_rate.next().is_none()); // Change the brew time and fill up the tank. let brew_time = Duration::new(30, 0); - simu.process_event(Controller::brew_time, brew_time, &controller_addr); - simu.process_event(Tank::fill, 1.0e-3, tank_addr); - simu.process_event(Controller::brew_cmd, (), &controller_addr); + simu.process_event(Controller::brew_time, brew_time, &controller_addr)?; + simu.process_event(Tank::fill, 1.0e-3, tank_addr)?; + simu.process_event(Controller::brew_cmd, (), &controller_addr)?; assert_eq!(flow_rate.next(), Some(pump_flow_rate)); - simu.step(); + simu.step()?; t += brew_time; assert_eq!(simu.time(), t); assert_eq!(flow_rate.next(), Some(0.0)); @@ -440,11 +440,13 @@ fn main() { &controller_addr, ) .unwrap(); - simu.process_event(Controller::brew_cmd, (), &controller_addr); + simu.process_event(Controller::brew_cmd, (), &controller_addr)?; assert_eq!(flow_rate.next(), Some(pump_flow_rate)); - simu.step(); + simu.step()?; t += Duration::from_secs(15); assert_eq!(simu.time(), t); assert_eq!(flow_rate.next(), Some(0.0)); + + Ok(()) } diff --git a/asynchronix/examples/external_input.rs b/asynchronix/examples/external_input.rs index 0cd1dd0..ec33d42 100644 --- a/asynchronix/examples/external_input.rs +++ b/asynchronix/examples/external_input.rs @@ -32,7 +32,7 @@ use mio::{Events, Interest, Poll, Token}; use asynchronix::model::{Context, InitializedModel, Model, SetupContext}; use asynchronix::ports::{EventBuffer, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit, SimulationError}; use asynchronix::time::{AutoSystemClock, MonotonicTime}; const DELTA: Duration = Duration::from_millis(2); @@ -184,7 +184,7 @@ impl Drop for Listener { } } -fn main() { +fn main() -> Result<(), SimulationError> { // --------------- // Bench assembly. // --------------- @@ -210,7 +210,7 @@ fn main() { let mut simu = SimInit::new() .add_model(listener, listener_mbox, "listener") .set_clock(AutoSystemClock::new()) - .init(t0); + .init(t0)?; // ---------- // Simulation. @@ -231,7 +231,7 @@ fn main() { }); // Advance simulation, external messages will be collected. - simu.step_by(Duration::from_secs(2)); + simu.step_by(Duration::from_secs(2))?; // Check collected external messages. let mut packets = 0_u32; @@ -244,4 +244,6 @@ fn main() { assert_eq!(message.next(), None); sender_handle.join().unwrap(); + + Ok(()) } diff --git a/asynchronix/examples/power_supply.rs b/asynchronix/examples/power_supply.rs index 52377b6..ed87703 100644 --- a/asynchronix/examples/power_supply.rs +++ b/asynchronix/examples/power_supply.rs @@ -28,7 +28,7 @@ //! ``` use asynchronix::model::Model; use asynchronix::ports::{EventSlot, Output, Requestor}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit, SimulationError}; use asynchronix::time::MonotonicTime; /// Power supply. @@ -99,7 +99,7 @@ impl Load { impl Model for Load {} -fn main() { +fn main() -> Result<(), SimulationError> { // --------------- // Bench assembly. // --------------- @@ -144,7 +144,7 @@ fn main() { .add_model(load1, load1_mbox, "load1") .add_model(load2, load2_mbox, "load2") .add_model(load3, load3_mbox, "load3") - .init(t0); + .init(t0)?; // ---------- // Simulation. @@ -158,7 +158,7 @@ fn main() { // Vary the supply voltage, check the load and power supply consumptions. for voltage in [10.0, 15.0, 20.0] { - simu.process_event(PowerSupply::voltage_setting, voltage, &psu_addr); + simu.process_event(PowerSupply::voltage_setting, voltage, &psu_addr)?; let v_square = voltage * voltage; assert!(same_power(load1_power.next().unwrap(), v_square / r1)); @@ -169,4 +169,6 @@ fn main() { v_square * (1.0 / r1 + 1.0 / r2 + 1.0 / r3) )); } + + Ok(()) } diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 0e175ed..aec6832 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -205,7 +205,7 @@ impl Driver { impl Model for Driver {} #[allow(dead_code)] -fn main() { +fn main() -> Result<(), asynchronix::simulation::SimulationError> { // --------------- // Bench assembly. // --------------- @@ -235,7 +235,7 @@ fn main() { let mut simu = SimInit::new() .add_model(driver, driver_mbox, "driver") .add_model(motor, motor_mbox, "motor") - .init(t0); + .init(t0)?; let scheduler = simu.scheduler(); @@ -260,10 +260,10 @@ fn main() { .unwrap(); // Advance simulation time to two next events. - simu.step(); + simu.step()?; t += Duration::new(2, 0); assert_eq!(simu.time(), t); - simu.step(); + simu.step()?; t += Duration::new(0, 100_000_000); assert_eq!(simu.time(), t); @@ -275,7 +275,7 @@ fn main() { // Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to // 9 position increments. - simu.step_by(Duration::new(0, 900_000_000)); + simu.step_by(Duration::new(0, 900_000_000))?; t += Duration::new(0, 900_000_000); assert_eq!(simu.time(), t); for _ in 0..9 { @@ -285,24 +285,24 @@ fn main() { assert!(position.next().is_none()); // Increase the load beyond the torque limit for a 1A driver current. - simu.process_event(Motor::load, 2.0, &motor_addr); + simu.process_event(Motor::load, 2.0, &motor_addr)?; // Advance simulation time and check that the motor is blocked. - simu.step(); + simu.step()?; t += Duration::new(0, 100_000_000); assert_eq!(simu.time(), t); assert!(position.next().is_none()); // Do it again. - simu.step(); + simu.step()?; t += Duration::new(0, 100_000_000); assert_eq!(simu.time(), t); assert!(position.next().is_none()); // Decrease the load below the torque limit for a 1A driver current and // advance simulation time. - simu.process_event(Motor::load, 0.5, &motor_addr); - simu.step(); + simu.process_event(Motor::load, 0.5, &motor_addr)?; + simu.step()?; t += Duration::new(0, 100_000_000); // The motor should start moving again, but since the phase was incremented @@ -314,7 +314,7 @@ fn main() { // Advance simulation time by 0.7s, which with a 10Hz PPS should correspond to // 7 position increments. - simu.step_by(Duration::new(0, 700_000_000)); + simu.step_by(Duration::new(0, 700_000_000))?; t += Duration::new(0, 700_000_000); assert_eq!(simu.time(), t); for _ in 0..7 { @@ -325,8 +325,8 @@ fn main() { // Now make the motor rotate in the opposite direction. Note that this // driver only accounts for a new PPS at the next pulse. - simu.process_event(Driver::pulse_rate, -10.0, &driver_addr); - simu.step(); + simu.process_event(Driver::pulse_rate, -10.0, &driver_addr)?; + simu.step()?; t += Duration::new(0, 100_000_000); assert_eq!(simu.time(), t); pos = (pos + 1) % Motor::STEPS_PER_REV; @@ -334,9 +334,11 @@ fn main() { // Advance simulation time by 1.9s, which with a -10Hz PPS should correspond // to 19 position decrements. - simu.step_by(Duration::new(1, 900_000_000)); + simu.step_by(Duration::new(1, 900_000_000))?; t += Duration::new(1, 900_000_000); assert_eq!(simu.time(), t); pos = (pos + Motor::STEPS_PER_REV - 19) % Motor::STEPS_PER_REV; assert_eq!(position.by_ref().last(), Some(pos)); + + Ok(()) } diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index 5fe67b2..03ad9a1 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -4,6 +4,7 @@ mod queue; +use std::cell::Cell; use std::error; use std::fmt; use std::future::Future; @@ -20,6 +21,14 @@ use recycle_box::coerce_box; use crate::model::{Context, Model}; +// Counts the difference between the number of sent and received messages for +// this thread. +// +// This is used by the executor to make sure that all messages have been +// received upon completion of a simulation step, i.e. that no deadlock +// occurred. +thread_local! { pub(crate) static THREAD_MSG_COUNT: Cell = const { Cell::new(0) }; } + /// Data shared between the receiver and the senders. struct Inner { /// Non-blocking internal queue. @@ -104,6 +113,9 @@ impl Receiver { match msg { Some(mut msg) => { + // Decrement the count of in-flight messages. + THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1)); + // Consume the message to obtain a boxed future. let fut = msg.call_once(model, context, self.future_box.take().unwrap()); @@ -219,6 +231,9 @@ impl Sender { if success { self.inner.receiver_signal.notify(); + // Increment the count of in-flight messages. + THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_add(1)); + Ok(()) } else { Err(SendError) diff --git a/asynchronix/src/executor.rs b/asynchronix/src/executor.rs index 30e7110..630c053 100644 --- a/asynchronix/src/executor.rs +++ b/asynchronix/src/executor.rs @@ -15,6 +15,12 @@ use task::Promise; /// Unique identifier for executor instances. static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); +#[derive(PartialEq, Eq, Debug)] +pub(crate) enum ExecutorError { + /// The simulation has deadlocked. + Deadlock, +} + /// Context common to all executor types. #[derive(Clone)] pub(crate) struct SimulationContext { @@ -43,8 +49,8 @@ impl Executor { /// /// # Panics /// - /// This will panic if the specified number of threads is zero or is more - /// than `usize::BITS`. + /// This will panic if the specified number of threads is zero or more than + /// `usize::BITS`. pub(crate) fn new_multi_threaded( num_threads: usize, simulation_context: SimulationContext, @@ -85,11 +91,19 @@ impl Executor { /// Execute spawned tasks, blocking until all futures have completed or /// until the executor reaches a deadlock. - pub(crate) fn run(&mut self) { - match self { + pub(crate) fn run(&mut self) -> Result<(), ExecutorError> { + let msg_count = match self { Self::StExecutor(executor) => executor.run(), Self::MtExecutor(executor) => executor.run(), + }; + + if msg_count != 0 { + assert!(msg_count > 0); + + return Err(ExecutorError::Deadlock); } + + Ok(()) } } @@ -98,7 +112,7 @@ mod tests { use std::sync::atomic::Ordering; use std::sync::Arc; - use futures_channel::{mpsc, oneshot}; + use futures_channel::mpsc; use futures_util::StreamExt; use super::*; @@ -131,47 +145,6 @@ mod tests { } } - fn executor_deadlock(mut executor: Executor) { - let (_sender1, receiver1) = oneshot::channel::<()>(); - let (_sender2, receiver2) = oneshot::channel::<()>(); - - let launch_count = Arc::new(AtomicUsize::new(0)); - let completion_count = Arc::new(AtomicUsize::new(0)); - - executor.spawn_and_forget({ - let launch_count = launch_count.clone(); - let completion_count = completion_count.clone(); - - async move { - launch_count.fetch_add(1, Ordering::Relaxed); - let _ = receiver2.await; - completion_count.fetch_add(1, Ordering::Relaxed); - } - }); - executor.spawn_and_forget({ - let launch_count = launch_count.clone(); - let completion_count = completion_count.clone(); - - async move { - launch_count.fetch_add(1, Ordering::Relaxed); - let _ = receiver1.await; - completion_count.fetch_add(1, Ordering::Relaxed); - } - }); - - executor.run(); - - // Check that the executor returns on deadlock, i.e. none of the task has - // completed. - assert_eq!(launch_count.load(Ordering::Relaxed), 2); - assert_eq!(completion_count.load(Ordering::Relaxed), 0); - - // Drop the executor and thus the receiver tasks before the senders, - // failing which the senders may signal that the channel has been - // dropped and wake the tasks outside the executor. - drop(executor); - } - fn executor_drop_cycle(mut executor: Executor) { let (sender1, mut receiver1) = mpsc::channel(2); let (sender2, mut receiver2) = mpsc::channel(2); @@ -223,7 +196,7 @@ mod tests { } }); - executor.run(); + executor.run().unwrap(); // Make sure that all tasks are eventually dropped even though each task // wakes the others when dropped. @@ -231,20 +204,6 @@ mod tests { assert_eq!(drop_count.load(Ordering::Relaxed), 3); } - #[test] - fn executor_deadlock_st() { - executor_deadlock(Executor::new_single_threaded(dummy_simulation_context())); - } - - #[test] - fn executor_deadlock_mt() { - executor_deadlock(Executor::new_multi_threaded(3, dummy_simulation_context())); - } - - #[test] - fn executor_deadlock_mt_one_worker() { - executor_deadlock(Executor::new_multi_threaded(1, dummy_simulation_context())); - } #[test] fn executor_drop_cycle_st() { executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context())); diff --git a/asynchronix/src/executor/mt_executor.rs b/asynchronix/src/executor/mt_executor.rs index 965905f..45b65cb 100644 --- a/asynchronix/src/executor/mt_executor.rs +++ b/asynchronix/src/executor/mt_executor.rs @@ -48,7 +48,7 @@ use std::cell::Cell; use std::fmt; use std::future::Future; use std::panic::{self, AssertUnwindSafe}; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; @@ -56,8 +56,9 @@ use std::time::{Duration, Instant}; use crossbeam_utils::sync::{Parker, Unparker}; use slab::Slab; -use super::task::{self, CancelToken, Promise, Runnable}; -use super::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT}; +use crate::channel; +use crate::executor::task::{self, CancelToken, Promise, Runnable}; +use crate::executor::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT}; use crate::macros::scoped_thread_local::scoped_thread_local; use crate::util::rng::Rng; use pool_manager::PoolManager; @@ -224,7 +225,10 @@ impl Executor { /// Execute spawned tasks, blocking until all futures have completed or /// until the executor reaches a deadlock. - pub(crate) fn run(&mut self) { + /// + /// The number of unprocessed messages is returned. It should always be 0 + /// unless a deadlock occurred. + pub(crate) fn run(&mut self) -> isize { self.context.pool_manager.activate_worker(); loop { @@ -232,7 +236,7 @@ impl Executor { panic::resume_unwind(worker_panic); } if self.context.pool_manager.pool_is_idle() { - return; + return self.context.msg_count.load(Ordering::Relaxed); } self.parker.park(); @@ -298,6 +302,11 @@ struct ExecutorContext { executor_unparker: Unparker, /// Manager for all worker threads. pool_manager: PoolManager, + /// Difference between the number of sent and received messages. + /// + /// This counter is only updated by worker threads before they park and is + /// therefore only consistent once all workers are parked. + msg_count: AtomicIsize, } impl ExecutorContext { @@ -320,6 +329,7 @@ impl ExecutorContext { stealers.into_boxed_slice(), worker_unparkers, ), + msg_count: AtomicIsize::new(0), } } } @@ -456,6 +466,15 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { let local_queue = &worker.local_queue; let fast_slot = &worker.fast_slot; + // Update the global message counter. + let update_msg_count = || { + let thread_msg_count = channel::THREAD_MSG_COUNT.replace(0); + worker + .executor_context + .msg_count + .fetch_add(thread_msg_count, Ordering::Relaxed); + }; + let result = panic::catch_unwind(AssertUnwindSafe(|| { // Set how long to spin when searching for a task. const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); @@ -468,9 +487,10 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // Try to deactivate the worker. if pool_manager.try_set_worker_inactive(id) { - parker.park(); // No need to call `begin_worker_search()`: this was done by the // thread that unparked the worker. + update_msg_count(); + parker.park(); } else if injector.is_empty() { // This worker could not be deactivated because it was the last // active worker. In such case, the call to @@ -479,6 +499,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) { // not activate a new worker, which is why some tasks may now be // visible in the injector queue. pool_manager.set_all_workers_inactive(); + update_msg_count(); executor_unparker.unpark(); parker.park(); // No need to call `begin_worker_search()`: this was done by the diff --git a/asynchronix/src/executor/st_executor.rs b/asynchronix/src/executor/st_executor.rs index eba197b..49e37d2 100644 --- a/asynchronix/src/executor/st_executor.rs +++ b/asynchronix/src/executor/st_executor.rs @@ -8,6 +8,7 @@ use slab::Slab; use super::task::{self, CancelToken, Promise, Runnable}; use super::NEXT_EXECUTOR_ID; +use crate::channel; use crate::executor::{SimulationContext, SIMULATION_CONTEXT}; use crate::macros::scoped_thread_local::scoped_thread_local; @@ -105,7 +106,13 @@ impl Executor { /// Execute spawned tasks, blocking until all futures have completed or /// until the executor reaches a deadlock. - pub(crate) fn run(&mut self) { + /// + /// The number of unprocessed messages is returned. It should always be 0 + /// unless a deadlock occurred. + pub(crate) fn run(&mut self) -> isize { + // In case this executor is nested in another one, reset the counter of in-flight messages. + let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count); + SIMULATION_CONTEXT.set(&self.simulation_context, || { ACTIVE_TASKS.set(&self.active_tasks, || { EXECUTOR_CONTEXT.set(&self.context, || loop { @@ -118,6 +125,10 @@ impl Executor { }) }) }); + + self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); + + self.context.msg_count } } @@ -168,6 +179,8 @@ struct ExecutorContext { /// Unique executor identifier inherited by all tasks spawned on this /// executor instance. executor_id: usize, + /// Number of in-flight messages. + msg_count: isize, } impl ExecutorContext { @@ -176,6 +189,7 @@ impl ExecutorContext { Self { queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)), executor_id, + msg_count: 0, } } } diff --git a/asynchronix/src/grpc/api/simulation.proto b/asynchronix/src/grpc/api/simulation.proto index b340415..22bbf3c 100644 --- a/asynchronix/src/grpc/api/simulation.proto +++ b/asynchronix/src/grpc/api/simulation.proto @@ -10,14 +10,20 @@ import "google/protobuf/empty.proto"; enum ErrorCode { INTERNAL_ERROR = 0; SIMULATION_NOT_STARTED = 1; - MISSING_ARGUMENT = 2; - INVALID_TIME = 3; - INVALID_DURATION = 4; - INVALID_MESSAGE = 5; - INVALID_KEY = 6; - SOURCE_NOT_FOUND = 10; - SINK_NOT_FOUND = 11; - SIMULATION_TIME_OUT_OF_RANGE = 12; + SIMULATION_TERMINATED = 2; + SIMULATION_DEADLOCK = 3; + SIMULATION_MODEL_ERROR = 4; + SIMULATION_PANIC = 5; + SIMULATION_BAD_QUERY = 6; + SIMULATION_TIME_OUT_OF_RANGE = 7; + MISSING_ARGUMENT = 10; + INVALID_TIME = 11; + INVALID_DURATION = 12; + INVALID_PERIOD = 13; + INVALID_MESSAGE = 14; + INVALID_KEY = 15; + SOURCE_NOT_FOUND = 20; + SINK_NOT_FOUND = 21; } message Error { diff --git a/asynchronix/src/grpc/codegen/simulation.rs b/asynchronix/src/grpc/codegen/simulation.rs index 3430a88..b1281cb 100644 --- a/asynchronix/src/grpc/codegen/simulation.rs +++ b/asynchronix/src/grpc/codegen/simulation.rs @@ -338,14 +338,20 @@ pub mod any_request { pub enum ErrorCode { InternalError = 0, SimulationNotStarted = 1, - MissingArgument = 2, - InvalidTime = 3, - InvalidDuration = 4, - InvalidMessage = 5, - InvalidKey = 6, - SourceNotFound = 10, - SinkNotFound = 11, - SimulationTimeOutOfRange = 12, + SimulationTerminated = 2, + SimulationDeadlock = 3, + SimulationModelError = 4, + SimulationPanic = 5, + SimulationBadQuery = 6, + SimulationTimeOutOfRange = 22, + MissingArgument = 7, + InvalidTime = 8, + InvalidDuration = 9, + InvalidPeriod = 10, + InvalidMessage = 11, + InvalidKey = 12, + SourceNotFound = 20, + SinkNotFound = 21, } impl ErrorCode { /// String value of the enum field names used in the ProtoBuf definition. @@ -356,14 +362,20 @@ impl ErrorCode { match self { ErrorCode::InternalError => "INTERNAL_ERROR", ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", + ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED", + ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK", + ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR", + ErrorCode::SimulationPanic => "SIMULATION_PANIC", + ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY", + ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", ErrorCode::MissingArgument => "MISSING_ARGUMENT", ErrorCode::InvalidTime => "INVALID_TIME", ErrorCode::InvalidDuration => "INVALID_DURATION", + ErrorCode::InvalidPeriod => "INVALID_PERIOD", ErrorCode::InvalidMessage => "INVALID_MESSAGE", ErrorCode::InvalidKey => "INVALID_KEY", ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", ErrorCode::SinkNotFound => "SINK_NOT_FOUND", - ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -371,14 +383,20 @@ impl ErrorCode { match value { "INTERNAL_ERROR" => Some(Self::InternalError), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), + "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), + "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), + "SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError), + "SIMULATION_PANIC" => Some(Self::SimulationPanic), + "SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery), + "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), "MISSING_ARGUMENT" => Some(Self::MissingArgument), "INVALID_TIME" => Some(Self::InvalidTime), "INVALID_DURATION" => Some(Self::InvalidDuration), + "INVALID_PERIOD" => Some(Self::InvalidPeriod), "INVALID_MESSAGE" => Some(Self::InvalidMessage), "INVALID_KEY" => Some(Self::InvalidKey), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "SINK_NOT_FOUND" => Some(Self::SinkNotFound), - "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), _ => None, } } diff --git a/asynchronix/src/grpc/services.rs b/asynchronix/src/grpc/services.rs index 455c865..0799ded 100644 --- a/asynchronix/src/grpc/services.rs +++ b/asynchronix/src/grpc/services.rs @@ -8,6 +8,7 @@ use prost_types::Timestamp; use tai_time::MonotonicTime; use super::codegen::simulation::{Error, ErrorCode}; +use crate::simulation::ExecutionError; pub(crate) use controller_service::ControllerService; pub(crate) use init_service::InitService; @@ -29,6 +30,21 @@ fn simulation_not_started_error() -> Error { ) } +/// Map an `ExecutionError` to a Protobuf error. +fn map_execution_error(error: ExecutionError) -> Error { + let error_code = match error { + ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, + ExecutionError::ModelError { .. } => ErrorCode::SimulationModelError, + ExecutionError::Panic(_) => ErrorCode::SimulationPanic, + ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, + ExecutionError::Terminated => ErrorCode::SimulationTerminated, + ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime, + }; + let error_message = error.to_string(); + + to_error(error_code, error_message) +} + /// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. /// /// This will fail if the time is outside the protobuf-specified range for diff --git a/asynchronix/src/grpc/services/controller_service.rs b/asynchronix/src/grpc/services/controller_service.rs index 9345949..97bd433 100644 --- a/asynchronix/src/grpc/services/controller_service.rs +++ b/asynchronix/src/grpc/services/controller_service.rs @@ -8,8 +8,8 @@ use crate::simulation::Simulation; use super::super::codegen::simulation::*; use super::{ - monotonic_to_timestamp, simulation_not_started_error, timestamp_to_monotonic, to_error, - to_positive_duration, to_strictly_positive_duration, + map_execution_error, monotonic_to_timestamp, simulation_not_started_error, + timestamp_to_monotonic, to_error, to_positive_duration, to_strictly_positive_duration, }; /// Protobuf-based simulation manager. @@ -61,18 +61,19 @@ impl ControllerService { /// processed events have completed. pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply { let reply = match self { - Self::Started { simulation, .. } => { - simulation.step(); - - if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { - step_reply::Result::Time(timestamp) - } else { - step_reply::Result::Error(to_error( - ErrorCode::SimulationTimeOutOfRange, - "the final simulation time is out of range", - )) + Self::Started { simulation, .. } => match simulation.step() { + Ok(()) => { + if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { + step_reply::Result::Time(timestamp) + } else { + step_reply::Result::Error(to_error( + ErrorCode::SimulationTimeOutOfRange, + "the final simulation time is out of range", + )) + } } - } + Err(e) => step_reply::Result::Error(map_execution_error(e)), + }, Self::NotStarted => step_reply::Result::Error(simulation_not_started_error()), }; @@ -117,7 +118,7 @@ impl ControllerService { "the specified deadline lies in the past", ))?; - simulation.step_by(duration); + simulation.step_by(duration).map_err(map_execution_error)?; } }; @@ -221,7 +222,7 @@ impl ControllerService { } }); - simulation.process(action); + simulation.process(action).map_err(map_execution_error)?; Ok(key_id) }(), @@ -315,9 +316,7 @@ impl ControllerService { ) })?; - simulation.process(event); - - Ok(()) + simulation.process(event).map_err(map_execution_error) }(), Self::NotStarted => Err(simulation_not_started_error()), }; @@ -360,11 +359,11 @@ impl ControllerService { ) })?; - simulation.process(query); + simulation.process(query).map_err(map_execution_error)?; let replies = promise.take_collect().ok_or(to_error( - ErrorCode::InternalError, - "a reply to the query was expected but none was available".to_string(), + ErrorCode::SimulationBadQuery, + "a reply to the query was expected but none was available; maybe the target model was not added to the simulation?".to_string(), ))?; replies.map_err(|e| { diff --git a/asynchronix/src/grpc/services/init_service.rs b/asynchronix/src/grpc/services/init_service.rs index 551f41e..c2a05a1 100644 --- a/asynchronix/src/grpc/services/init_service.rs +++ b/asynchronix/src/grpc/services/init_service.rs @@ -5,7 +5,7 @@ use crate::registry::EndpointRegistry; use crate::simulation::SimInit; use crate::simulation::Simulation; -use super::{timestamp_to_monotonic, to_error}; +use super::{map_execution_error, timestamp_to_monotonic, to_error}; use super::super::codegen::simulation::*; @@ -69,7 +69,12 @@ impl InitService { .ok_or_else(|| { to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field") }) - .map(|start_time| (sim_init.init(start_time), registry)) + .and_then(|start_time| { + sim_init + .init(start_time) + .map_err(|e| map_execution_error(e)) + .map(|sim| (sim, registry)) + }) }); let (reply, bench) = match reply { diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index 143482a..142f7b1 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -235,7 +235,9 @@ //! .add_model(multiplier2, multiplier2_mbox, "multiplier2") //! .add_model(delay1, delay1_mbox, "delay1") //! .add_model(delay2, delay2_mbox, "delay2") -//! .init(t0); +//! .init(t0)?; +//! +//! # Ok::<(), asynchronix::simulation::SimulationError>(()) //! ``` //! //! ## Running simulations @@ -323,23 +325,25 @@ //! # .add_model(multiplier2, multiplier2_mbox, "multiplier2") //! # .add_model(delay1, delay1_mbox, "delay1") //! # .add_model(delay2, delay2_mbox, "delay2") -//! # .init(t0); +//! # .init(t0)?; //! // Send a value to the first multiplier. -//! simu.process_event(Multiplier::input, 21.0, &input_address); +//! simu.process_event(Multiplier::input, 21.0, &input_address)?; //! //! // The simulation is still at t0 so nothing is expected at the output of the //! // second delay gate. //! assert!(output_slot.next().is_none()); //! //! // Advance simulation time until the next event and check the time and output. -//! simu.step(); +//! simu.step()?; //! assert_eq!(simu.time(), t0 + Duration::from_secs(1)); //! assert_eq!(output_slot.next(), Some(84.0)); //! //! // Get the answer to the ultimate question of life, the universe & everything. -//! simu.step(); +//! simu.step()?; //! assert_eq!(simu.time(), t0 + Duration::from_secs(2)); //! assert_eq!(output_slot.next(), Some(42.0)); +//! +//! # Ok::<(), asynchronix::simulation::SimulationError>(()) //! ``` //! //! # Message ordering guarantees diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index d4dca59..33216d1 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -113,14 +113,15 @@ //! # impl Model for ModelB {}; //! # let modelA_addr = Mailbox::::new().address(); //! # let modelB_addr = Mailbox::::new().address(); -//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH); +//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH)?; //! simu.process_event( //! |m: &mut ModelA| { //! m.output.connect(ModelB::input, modelB_addr); //! }, //! (), //! &modelA_addr -//! ); +//! )?; +//! # Ok::<(), asynchronix::simulation::SimulationError>(()) //! ``` mod mailbox; mod scheduler; @@ -143,7 +144,7 @@ use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; -use crate::executor::Executor; +use crate::executor::{Executor, ExecutorError}; use crate::model::{Context, Model, SetupContext}; use crate::ports::{InputFn, ReplierFn}; use crate::time::{AtomicTime, Clock, MonotonicTime}; @@ -223,8 +224,8 @@ impl Simulation { /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured /// simulation clock. This method blocks until all newly processed events /// have completed. - pub fn step(&mut self) { - self.step_to_next_bounded(MonotonicTime::MAX); + pub fn step(&mut self) -> Result<(), ExecutionError> { + self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ()) } /// Iteratively advances the simulation time by the specified duration, as @@ -234,10 +235,10 @@ impl Simulation { /// time have completed. The simulation time upon completion is equal to the /// initial simulation time incremented by the specified duration, whether /// or not an event was scheduled for that time. - pub fn step_by(&mut self, duration: Duration) { + pub fn step_by(&mut self, duration: Duration) -> Result<(), ExecutionError> { let target_time = self.time.read() + duration; - self.step_until_unchecked(target_time); + self.step_until_unchecked(target_time) } /// Iteratively advances the simulation time until the specified deadline, @@ -247,16 +248,14 @@ impl Simulation { /// time have completed. The simulation time upon completion is equal to the /// specified target time, whether or not an event was scheduled for that /// time. - pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), SchedulingError> { + pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { if self.time.read() >= target_time { - return Err(SchedulingError::InvalidScheduledTime); + return Err(ExecutionError::InvalidTargetTime(target_time)); } - self.step_until_unchecked(target_time); - - Ok(()) + self.step_until_unchecked(target_time) } - /// Returns a scheduler handle. + /// Returns an owned scheduler handle. pub fn scheduler(&self) -> Scheduler { Scheduler::new(self.scheduler_queue.clone(), self.time.reader()) } @@ -265,15 +264,20 @@ impl Simulation { /// /// Simulation time remains unchanged. The periodicity of the action, if /// any, is ignored. - pub fn process(&mut self, action: Action) { + pub fn process(&mut self, action: Action) -> Result<(), ExecutionError> { action.spawn_and_forget(&self.executor); - self.executor.run(); + self.run() } /// Processes an event immediately, blocking until completion. /// /// Simulation time remains unchanged. - pub fn process_event(&mut self, func: F, arg: T, address: impl Into>) + pub fn process_event( + &mut self, + func: F, + arg: T, + address: impl Into>, + ) -> Result<(), ExecutionError> where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -297,18 +301,19 @@ impl Simulation { }; self.executor.spawn_and_forget(fut); - self.executor.run(); + self.run() } /// Processes a query immediately, blocking until completion. /// - /// Simulation time remains unchanged. + /// Simulation time remains unchanged. If the targeted model was not added + /// to the simulation, an `ExecutionError::InvalidQuery` is returned. pub fn process_query( &mut self, func: F, arg: T, address: impl Into>, - ) -> Result + ) -> Result where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S>, @@ -338,9 +343,17 @@ impl Simulation { }; self.executor.spawn_and_forget(fut); - self.executor.run(); + self.run()?; - reply_reader.try_read().map_err(|_| QueryError {}) + reply_reader + .try_read() + .map_err(|_| ExecutionError::BadQuery) + } + + fn run(&mut self) -> Result<(), ExecutionError> { + self.executor.run().map_err(|e| match e { + ExecutorError::Deadlock => ExecutionError::Deadlock(Vec::new()), + }) } /// Advances simulation time to that of the next scheduled action if its @@ -349,7 +362,10 @@ impl Simulation { /// /// If at least one action was found that satisfied the time bound, the /// corresponding new simulation time is returned. - fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option { + fn step_to_next_bounded( + &mut self, + upper_time_bound: MonotonicTime, + ) -> Result, ExecutionError> { // Function pulling the next action. If the action is periodic, it is // immediately re-scheduled. fn pull_next_action(scheduler_queue: &mut MutexGuard) -> Action { @@ -380,7 +396,10 @@ impl Simulation { // Move to the next scheduled time. let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let mut current_key = peek_next_key(&mut scheduler_queue)?; + let mut current_key = match peek_next_key(&mut scheduler_queue) { + Some(key) => key, + None => return Ok(None), + }; self.time.write(current_key.0); loop { @@ -420,9 +439,9 @@ impl Simulation { let current_time = current_key.0; // TODO: check synchronization status? self.clock.synchronize(current_time); - self.executor.run(); + self.run()?; - return Some(current_time); + return Ok(Some(current_time)); } }; } @@ -437,18 +456,19 @@ impl Simulation { /// /// This method does not check whether the specified time lies in the future /// of the current simulation time. - fn step_until_unchecked(&mut self, target_time: MonotonicTime) { + fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { loop { match self.step_to_next_bounded(target_time) { // The target time was reached exactly. - Some(t) if t == target_time => return, + Ok(Some(t)) if t == target_time => return Ok(()), // No actions are scheduled before or at the target time. - None => { + Ok(None) => { // Update the simulation time. self.time.write(target_time); self.clock.synchronize(target_time); - return; + return Ok(()); } + Err(e) => return Err(e), // The target time was not reached yet. _ => {} } @@ -479,6 +499,141 @@ impl fmt::Display for QueryError { impl Error for QueryError {} +/// Information regarding a deadlocked model. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DeadlockInfo { + model_name: String, + mailbox_size: usize, +} + +/// An error returned upon simulation execution failure. +/// +/// Note that if a `Deadlock`, `ModelError` or `ModelPanic` is returned, any +/// subsequent attempt to run the simulation will return `Terminated`. +#[derive(Debug)] +pub enum ExecutionError { + /// The simulation has deadlocked. + /// + /// Enlists all models with non-empty mailboxes. + Deadlock(Vec), + /// A model has aborted the simulation. + ModelError { + /// Name of the model. + model_name: String, + /// Error registered by the model. + error: Box, + }, + /// A panic was caught during execution with the message contained in the + /// payload. + Panic(String), + /// The specified target simulation time is in the past of the current + /// simulation time. + InvalidTargetTime(MonotonicTime), + /// The query was invalid and did not obtain a response. + BadQuery, + /// The simulation has been terminated due to an earlier deadlock, model + /// error or model panic. + Terminated, +} + +impl fmt::Display for ExecutionError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Deadlock(list) => { + f.write_str( + "a simulation deadlock has been detected that involves the following models: ", + )?; + let mut first_item = true; + for info in list { + if first_item { + first_item = false; + } else { + f.write_str(", ")?; + } + write!( + f, + "'{}' ({} item{} in mailbox)", + info.model_name, + info.mailbox_size, + if info.mailbox_size == 1 { "" } else { "s" } + )?; + } + + Ok(()) + } + Self::ModelError { model_name, error } => { + write!( + f, + "the simulation has been aborted by model '{}' with the following error: {}", + model_name, error + ) + } + Self::Panic(msg) => { + f.write_str("a panic has been caught during simulation:\n")?; + f.write_str(msg) + } + Self::InvalidTargetTime(time) => { + write!( + f, + "target simulation stamp {} lies in the past of the current simulation time", + time + ) + } + Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"), + Self::Terminated => f.write_str("the simulation has been terminated"), + } + } +} + +impl Error for ExecutionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + if let Self::ModelError { error, .. } = &self { + Some(error.as_ref()) + } else { + None + } + } +} + +/// An error returned upon simulation execution or scheduling failure. +#[derive(Debug)] +pub enum SimulationError { + /// The execution of the simulation failed. + ExecutionError(ExecutionError), + /// An attempt to schedule an item failed. + SchedulingError(SchedulingError), +} + +impl fmt::Display for SimulationError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::ExecutionError(e) => e.fmt(f), + Self::SchedulingError(e) => e.fmt(f), + } + } +} + +impl Error for SimulationError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::ExecutionError(e) => e.source(), + Self::SchedulingError(e) => e.source(), + } + } +} + +impl From for SimulationError { + fn from(e: ExecutionError) -> Self { + Self::ExecutionError(e) + } +} + +impl From for SimulationError { + fn from(e: SchedulingError) -> Self { + Self::SchedulingError(e) + } +} + /// Adds a model and its mailbox to the simulation bench. pub(crate) fn add_model( mut model: M, diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index 9d02f12..2accfdf 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -63,6 +63,13 @@ impl Scheduler { /// model, these events are guaranteed to be processed according to the /// scheduling order of the actions. pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> { + // The scheduler queue must always be locked when reading the time, + // otherwise the following race could occur: + // 1) this method reads the time and concludes that it is not too late + // to schedule the action, + // 2) the `Simulation` object takes the lock, increments simulation time + // and runs the simulation step, + // 3) this method takes the lock and schedules the now-outdated action. let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); let now = self.time(); diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index cf75244..43106a7 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -8,7 +8,7 @@ use crate::time::{Clock, NoClock}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation}; +use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Simulation}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -82,12 +82,15 @@ impl SimInit { /// Builds a simulation initialized at the specified simulation time, /// executing the [`Model::init()`](crate::model::Model::init) method on all /// model initializers. - pub fn init(mut self, start_time: MonotonicTime) -> Simulation { + pub fn init(mut self, start_time: MonotonicTime) -> Result { self.time.write(start_time); self.clock.synchronize(start_time); - self.executor.run(); - Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock) + let mut simulation = + Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock); + simulation.run()?; + + Ok(simulation) } } diff --git a/asynchronix/tests/model_scheduling.rs b/asynchronix/tests/model_scheduling.rs index 5b70a1c..76bf12c 100644 --- a/asynchronix/tests/model_scheduling.rs +++ b/asynchronix/tests/model_scheduling.rs @@ -38,13 +38,13 @@ fn model_schedule_event() { let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); - simu.process_event(TestModel::trigger, (), addr); - simu.step(); + simu.process_event(TestModel::trigger, (), addr).unwrap(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert!(output.next().is_some()); - simu.step(); + simu.step().unwrap(); assert!(output.next().is_none()); } @@ -93,13 +93,13 @@ fn model_cancel_future_keyed_event() { let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); - simu.process_event(TestModel::trigger, (), addr); - simu.step(); + simu.process_event(TestModel::trigger, (), addr).unwrap(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(1)); assert_eq!(output.next(), Some(1)); - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(1)); assert!(output.next().is_none()); } @@ -149,14 +149,14 @@ fn model_cancel_same_time_keyed_event() { let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); - simu.process_event(TestModel::trigger, (), addr); - simu.step(); + simu.process_event(TestModel::trigger, (), addr).unwrap(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(output.next(), Some(1)); assert!(output.next().is_none()); - simu.step(); + simu.step().unwrap(); assert!(output.next().is_none()); } @@ -192,13 +192,13 @@ fn model_schedule_periodic_event() { let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); - simu.process_event(TestModel::trigger, (), addr); + simu.process_event(TestModel::trigger, (), addr).unwrap(); // Move to the next events at t0 + 2s + k*3s. for k in 0..10 { - simu.step(); + simu.step().unwrap(); assert_eq!( simu.time(), t0 + Duration::from_secs(2) + k * Duration::from_secs(3) @@ -243,16 +243,16 @@ fn model_cancel_periodic_event() { let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); - simu.process_event(TestModel::trigger, (), addr); + simu.process_event(TestModel::trigger, (), addr).unwrap(); - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert!(output.next().is_some()); assert!(output.next().is_none()); - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert!(output.next().is_none()); } diff --git a/asynchronix/tests/simulation_deadlock.rs b/asynchronix/tests/simulation_deadlock.rs new file mode 100644 index 0000000..87cebd4 --- /dev/null +++ b/asynchronix/tests/simulation_deadlock.rs @@ -0,0 +1,98 @@ +//! Deadlock-detection for model loops. + +use asynchronix::model::Model; +use asynchronix::ports::{Output, Requestor}; +use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; +use asynchronix::time::MonotonicTime; + +#[derive(Default)] +struct TestModel { + output: Output<()>, + requestor: Requestor<(), ()>, +} +impl TestModel { + async fn activate_output(&mut self) { + self.output.send(()).await; + } + async fn activate_requestor(&mut self) { + let _ = self.requestor.send(()).await; + } +} +impl Model for TestModel {} + +/// Overflows a mailbox by sending 2 messages in loopback for each incoming +/// message. +#[test] +fn deadlock_on_mailbox_overflow() { + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + + // Make two self-connections so that each outgoing message generates two + // incoming messages. + model + .output + .connect(TestModel::activate_output, addr.clone()); + model + .output + .connect(TestModel::activate_output, addr.clone()); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); + + assert!(matches!( + simu.process_event(TestModel::activate_output, (), addr), + Err(ExecutionError::Deadlock(_)) + )); +} + +/// Generates a deadlock with a query loopback. +#[test] +fn deadlock_on_query_loopback() { + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + + model + .requestor + .connect(TestModel::activate_requestor, addr.clone()); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); + + assert!(matches!( + simu.process_event(TestModel::activate_requestor, (), addr), + Err(ExecutionError::Deadlock(_)) + )); +} + +/// Generates a deadlock with a query loopback involving several models. +#[test] +fn deadlock_on_transitive_query_loopback() { + let mut model1 = TestModel::default(); + let mut model2 = TestModel::default(); + let mbox1 = Mailbox::new(); + let mbox2 = Mailbox::new(); + let addr1 = mbox1.address(); + let addr2 = mbox2.address(); + + model1 + .requestor + .connect(TestModel::activate_requestor, addr2.clone()); + + model2 + .requestor + .connect(TestModel::activate_requestor, addr1.clone()); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::new() + .add_model(model1, mbox1, "") + .add_model(model2, mbox2, "") + .init(t0) + .unwrap(); + + assert!(matches!( + simu.process_event(TestModel::activate_requestor, (), addr1), + Err(ExecutionError::Deadlock(_)) + )); +} diff --git a/asynchronix/tests/simulation_scheduling.rs b/asynchronix/tests/simulation_scheduling.rs index e432317..cda492d 100644 --- a/asynchronix/tests/simulation_scheduling.rs +++ b/asynchronix/tests/simulation_scheduling.rs @@ -38,7 +38,7 @@ fn passthrough_bench( model.output.connect_sink(&out_stream); let addr = mbox.address(); - let simu = SimInit::new().add_model(model, mbox, "").init(t0); + let simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); (simu, addr, out_stream) } @@ -64,7 +64,7 @@ fn simulation_schedule_events() { .unwrap(); // Move to the 1st event at t0+2s. - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert!(output.next().is_some()); @@ -74,12 +74,12 @@ fn simulation_schedule_events() { .unwrap(); // Move to the 2nd event at t0+3s. - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(3)); assert!(output.next().is_some()); // Move to the 3rd event at t0+6s. - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(6)); assert!(output.next().is_some()); assert!(output.next().is_none()); @@ -110,7 +110,7 @@ fn simulation_schedule_keyed_events() { .unwrap(); // Move to the 1st event at t0+1. - simu.step(); + simu.step().unwrap(); // Try to cancel the 1st event after it has already taken place and check // that the cancellation had no effect. @@ -121,7 +121,7 @@ fn simulation_schedule_keyed_events() { // Cancel the second event (t0+2) before it is meant to takes place and // check that we move directly to the 3rd event. event_t2_1.cancel(); - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(output.next(), Some(22)); assert!(output.next().is_none()); @@ -156,7 +156,7 @@ fn simulation_schedule_periodic_events() { // Move to the next events at t0 + 3s + k*2s. for k in 0..10 { - simu.step(); + simu.step().unwrap(); assert_eq!( simu.time(), t0 + Duration::from_secs(3) + k * Duration::from_secs(2) @@ -195,7 +195,7 @@ fn simulation_schedule_periodic_keyed_events() { .unwrap(); // Move to the next event at t0+3s. - simu.step(); + simu.step().unwrap(); assert_eq!(simu.time(), t0 + Duration::from_secs(3)); assert_eq!(output.next(), Some(1)); assert_eq!(output.next(), Some(2)); @@ -206,7 +206,7 @@ fn simulation_schedule_periodic_keyed_events() { // Move to the next events at t0 + 3s + k*2s. for k in 1..10 { - simu.step(); + simu.step().unwrap(); assert_eq!( simu.time(), t0 + Duration::from_secs(3) + k * Duration::from_secs(2) @@ -263,7 +263,8 @@ fn timestamp_bench( let simu = SimInit::new() .add_model(model, mbox, "") .set_clock(clock) - .init(t0); + .init(t0) + .unwrap(); (simu, addr, stamp_stream) } @@ -320,7 +321,7 @@ fn simulation_system_clock_from_instant() { measured_time, ); - simu.step(); + simu.step().unwrap(); } } } @@ -383,7 +384,7 @@ fn simulation_system_clock_from_system_time() { measured_time, ); - simu.step(); + simu.step().unwrap(); } } } @@ -431,6 +432,6 @@ fn simulation_auto_system_clock() { measured_time, ); - simu.step(); + simu.step().unwrap(); } } diff --git a/asynchronix/tests/tests.rs b/asynchronix/tests/tests.rs index a5ca706..03f88ef 100644 --- a/asynchronix/tests/tests.rs +++ b/asynchronix/tests/tests.rs @@ -1,4 +1,6 @@ #[cfg(not(asynchronix_loom))] mod model_scheduling; #[cfg(not(asynchronix_loom))] +mod simulation_deadlock; +#[cfg(not(asynchronix_loom))] mod simulation_scheduling; From e7b64524e04d8eeca9ecad8d7573b238050ce31d Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 28 Oct 2024 11:15:47 +0100 Subject: [PATCH 2/2] Report deadlocked models and their malbox size --- asynchronix/src/channel.rs | 45 +++++- asynchronix/src/channel/queue.rs | 86 +++++++++- asynchronix/src/dev_hooks.rs | 2 +- asynchronix/src/grpc/services/init_service.rs | 2 +- asynchronix/src/ports/source.rs | 2 +- asynchronix/src/simulation.rs | 50 +++--- asynchronix/src/simulation/sim_init.rs | 19 ++- asynchronix/tests/simulation_deadlock.rs | 151 +++++++++++++++--- 8 files changed, 304 insertions(+), 53 deletions(-) diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index 03ad9a1..d7eb413 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -93,6 +93,13 @@ impl Receiver { } } + /// Creates a new observer. + pub(crate) fn observer(&self) -> impl ChannelObserver { + Observer { + inner: self.inner.clone(), + } + } + /// Receives and executes a message asynchronously, if necessary waiting /// until one becomes available. pub(crate) async fn recv( @@ -116,12 +123,13 @@ impl Receiver { // Decrement the count of in-flight messages. THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1)); - // Consume the message to obtain a boxed future. + // Take the message to obtain a boxed future. let fut = msg.call_once(model, context, self.future_box.take().unwrap()); - // Now that `msg` was consumed and its slot in the queue was - // freed, signal to one awaiting sender that one slot is + // Now that the message was taken, drop `msg` to free its slot + // in the queue and signal to one awaiting sender that a slot is // available for sending. + drop(msg); self.inner.sender_signal.notify_one(); // Await the future provided by the message. @@ -290,6 +298,37 @@ impl Clone for Sender { } } +/// A model-independent handle to a channel that can observe the current number +/// of messages. +pub(crate) trait ChannelObserver: Send { + /// Returns the current number of messages in the channel. + /// + /// # Warning + /// + /// The returned result is only meaningful if it can be established than + /// there are no concurrent send or receive operations on the channel. + /// Otherwise, the returned value may neither reflect the current state nor + /// the past state of the channel, and may be greater than the capacity of + /// the channel. + fn len(&self) -> usize; +} + +/// A handle to a channel that can observe the current number of messages. +/// +/// Multiple [`Observer`]s can be created using the [`Receiver::observer`] +/// method or via cloning. +#[derive(Clone)] +pub(crate) struct Observer { + /// Shared data. + inner: Arc>, +} + +impl ChannelObserver for Observer { + fn len(&self) -> usize { + self.inner.queue.len() + } +} + impl Drop for Sender { fn drop(&mut self) { // Decrease the reference count of senders. diff --git a/asynchronix/src/channel/queue.rs b/asynchronix/src/channel/queue.rs index 4c4f37b..1d5336b 100644 --- a/asynchronix/src/channel/queue.rs +++ b/asynchronix/src/channel/queue.rs @@ -122,7 +122,7 @@ pub(super) struct Queue { /// and the producer. The reason it is shared is that the drop handler of /// the last `Inner` owner (which may be a producer) needs access to the /// dequeue position. - dequeue_pos: CachePadded>, + dequeue_pos: CachePadded, /// Buffer holding the closures and their stamps. buffer: Box<[Slot]>, @@ -160,7 +160,7 @@ impl Queue { Queue { enqueue_pos: CachePadded::new(AtomicUsize::new(0)), - dequeue_pos: CachePadded::new(UnsafeCell::new(0)), + dequeue_pos: CachePadded::new(AtomicUsize::new(0)), buffer: buffer.into(), right_mask, closed_channel_mask, @@ -241,7 +241,7 @@ impl Queue { /// /// This method may not be called concurrently from multiple threads. pub(super) unsafe fn pop(&self) -> Result, PopError> { - let dequeue_pos = self.dequeue_pos.with(|p| *p); + let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed); let index = dequeue_pos & self.right_mask; let slot = &self.buffer[index]; let stamp = slot.stamp.load(Ordering::Acquire); @@ -251,10 +251,10 @@ impl Queue { // closure can be popped. debug_or_loom_assert_eq!(stamp, dequeue_pos + 1); - // Only this thread can access the dequeue position so there is no + // Only this thread can modify the dequeue position so there is no // need to increment the position atomically with a `fetch_add`. self.dequeue_pos - .with_mut(|p| *p = self.next_queue_pos(dequeue_pos)); + .store(self.next_queue_pos(dequeue_pos), Ordering::Relaxed); // Extract the closure from the slot and set the stamp to the value of // the dequeue position increased by one sequence increment. @@ -318,6 +318,30 @@ impl Queue { self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0 } + /// Returns the number of items in the queue. + /// + /// # Warning + /// + /// While this method is safe by Rust's standard, the returned result is + /// only meaningful if it can be established than there are no concurrent + /// `push` or `pop` operations. Otherwise, the returned value may neither + /// reflect the current state nor the past state of the queue, and may be + /// greater than the capacity of the queue. + pub(super) fn len(&self) -> usize { + let enqueue_pos = self.enqueue_pos.load(Ordering::Relaxed); + let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed); + let enqueue_idx = enqueue_pos & (self.right_mask >> 1); + let dequeue_idx = dequeue_pos & (self.right_mask >> 1); + + // Establish whether the sequence numbers of the enqueue and dequeue + // positions differ. If yes, it means the enqueue position has wrapped + // around one more time so the difference between indices must be + // increased by the buffer capacity. + let carry_flag = (enqueue_pos & !self.right_mask) != (dequeue_pos & !self.right_mask); + + (enqueue_idx + (carry_flag as usize) * self.buffer.len()) - dequeue_idx + } + /// Increment the queue position, incrementing the sequence count as well if /// the index wraps to 0. /// @@ -423,6 +447,12 @@ impl Consumer { fn close(&self) { self.inner.close(); } + + /// Returns the number of items. + #[cfg(not(asynchronix_loom))] + fn len(&self) -> usize { + self.inner.len() + } } #[cfg(test)] @@ -569,6 +599,52 @@ mod tests { fn queue_mpsc_capacity_three() { queue_mpsc(3); } + + #[test] + fn queue_len() { + let (p, mut c) = queue(4); + + let _ = p.push(|b| RecycleBox::recycle(b, 0)); + assert_eq!(c.len(), 1); + let _ = p.push(|b| RecycleBox::recycle(b, 1)); + assert_eq!(c.len(), 2); + let _ = c.pop(); + assert_eq!(c.len(), 1); + let _ = p.push(|b| RecycleBox::recycle(b, 2)); + assert_eq!(c.len(), 2); + let _ = p.push(|b| RecycleBox::recycle(b, 3)); + assert_eq!(c.len(), 3); + let _ = c.pop(); + assert_eq!(c.len(), 2); + let _ = p.push(|b| RecycleBox::recycle(b, 4)); + assert_eq!(c.len(), 3); + let _ = c.pop(); + assert_eq!(c.len(), 2); + let _ = p.push(|b| RecycleBox::recycle(b, 5)); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 6)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 7)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 8)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 9)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = c.pop(); + assert_eq!(c.len(), 2); + let _ = c.pop(); + assert_eq!(c.len(), 1); + let _ = c.pop(); + assert_eq!(c.len(), 0); + } } /// Loom tests. diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 535a5de..8bb7bd3 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -43,6 +43,6 @@ impl Executor { /// Let the executor run, blocking until all futures have completed or until /// the executor deadlocks. pub fn run(&mut self) { - self.0.run(); + self.0.run().unwrap(); } } diff --git a/asynchronix/src/grpc/services/init_service.rs b/asynchronix/src/grpc/services/init_service.rs index c2a05a1..1e485b0 100644 --- a/asynchronix/src/grpc/services/init_service.rs +++ b/asynchronix/src/grpc/services/init_service.rs @@ -72,7 +72,7 @@ impl InitService { .and_then(|start_time| { sim_init .init(start_time) - .map_err(|e| map_execution_error(e)) + .map_err(map_execution_error) .map(|sim| (sim, registry)) }) }); diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index faab6e5..3a92481 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -26,7 +26,7 @@ use super::ReplierFn; /// The `EventSource` port is similar to an [`Output`](crate::ports::Output) /// port in that it can send events to connected input ports. It is not meant, /// however, to be instantiated as a member of a model, but rather as a -/// simulation monitoring endpoint instantiated during bench assembly. +/// simulation control endpoint instantiated during bench assembly. pub struct EventSource { broadcaster: Arc>>, } diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 33216d1..9d8c1fb 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -144,6 +144,7 @@ use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; +use crate::channel::ChannelObserver; use crate::executor::{Executor, ExecutorError}; use crate::model::{Context, Model, SetupContext}; use crate::ports::{InputFn, ReplierFn}; @@ -194,6 +195,8 @@ pub struct Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + observers: Vec<(String, Box)>, + is_terminated: bool, } impl Simulation { @@ -203,12 +206,15 @@ impl Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + observers: Vec<(String, Box)>, ) -> Self { Self { executor, scheduler_queue, time, clock, + observers, + is_terminated: false, } } @@ -350,9 +356,28 @@ impl Simulation { .map_err(|_| ExecutionError::BadQuery) } + /// Runs the executor. fn run(&mut self) -> Result<(), ExecutionError> { + if self.is_terminated { + return Err(ExecutionError::Terminated); + } + self.executor.run().map_err(|e| match e { - ExecutorError::Deadlock => ExecutionError::Deadlock(Vec::new()), + ExecutorError::Deadlock => { + self.is_terminated = true; + let mut deadlock_info = Vec::new(); + for (name, observer) in &self.observers { + let mailbox_size = observer.len(); + if mailbox_size != 0 { + deadlock_info.push(DeadlockInfo { + model_name: name.clone(), + mailbox_size, + }); + } + } + + ExecutionError::Deadlock(deadlock_info) + } }) } @@ -484,26 +509,13 @@ impl fmt::Debug for Simulation { } } -/// Error returned when a query did not obtain a response. -/// -/// This can happen either because the model targeted by the address was not -/// added to the simulation or due to a simulation deadlock. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct QueryError {} - -impl fmt::Display for QueryError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "the query did not receive a response") - } -} - -impl Error for QueryError {} - /// Information regarding a deadlocked model. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct DeadlockInfo { - model_name: String, - mailbox_size: usize, + /// Name of the deadlocked model. + pub model_name: String, + /// Number of messages in the mailbox. + pub mailbox_size: usize, } /// An error returned upon simulation execution failure. diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 43106a7..efeacf4 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -1,6 +1,7 @@ use std::fmt; use std::sync::{Arc, Mutex}; +use crate::channel::ChannelObserver; use crate::executor::{Executor, SimulationContext}; use crate::model::Model; use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime}; @@ -16,6 +17,7 @@ pub struct SimInit { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + observers: Vec<(String, Box)>, } impl SimInit { @@ -49,6 +51,7 @@ impl SimInit { scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time, clock: Box::new(NoClock::new()), + observers: Vec::new(), } } @@ -58,13 +61,16 @@ impl SimInit { /// is used for convenience for the model instance identification (e.g. for /// logging purposes). pub fn add_model( - self, + mut self, model: M, mailbox: Mailbox, name: impl Into, ) -> Self { + let name = name.into(); + self.observers + .push((name.clone(), Box::new(mailbox.0.observer()))); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); - add_model(model, mailbox, name.into(), scheduler, &self.executor); + add_model(model, mailbox, name, scheduler, &self.executor); self } @@ -86,8 +92,13 @@ impl SimInit { self.time.write(start_time); self.clock.synchronize(start_time); - let mut simulation = - Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock); + let mut simulation = Simulation::new( + self.executor, + self.scheduler_queue, + self.time, + self.clock, + self.observers, + ); simulation.run()?; Ok(simulation) diff --git a/asynchronix/tests/simulation_deadlock.rs b/asynchronix/tests/simulation_deadlock.rs index 87cebd4..609b1d1 100644 --- a/asynchronix/tests/simulation_deadlock.rs +++ b/asynchronix/tests/simulation_deadlock.rs @@ -2,7 +2,7 @@ use asynchronix::model::Model; use asynchronix::ports::{Output, Requestor}; -use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; +use asynchronix::simulation::{DeadlockInfo, ExecutionError, Mailbox, SimInit}; use asynchronix::time::MonotonicTime; #[derive(Default)] @@ -24,8 +24,11 @@ impl Model for TestModel {} /// message. #[test] fn deadlock_on_mailbox_overflow() { + const MODEL_NAME: &str = "testmodel"; + const MAILBOX_SIZE: usize = 5; + let mut model = TestModel::default(); - let mbox = Mailbox::new(); + let mbox = Mailbox::with_capacity(MAILBOX_SIZE); let addr = mbox.address(); // Make two self-connections so that each outgoing message generates two @@ -38,17 +41,33 @@ fn deadlock_on_mailbox_overflow() { .connect(TestModel::activate_output, addr.clone()); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); + let mut simu = SimInit::new() + .add_model(model, mbox, MODEL_NAME) + .init(t0) + .unwrap(); - assert!(matches!( - simu.process_event(TestModel::activate_output, (), addr), - Err(ExecutionError::Deadlock(_)) - )); + match simu.process_event(TestModel::activate_output, (), addr) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect only 1 deadlocked model. + assert_eq!(deadlock_info.len(), 1); + // We expect the mailbox to be full. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL_NAME.into(), + mailbox_size: MAILBOX_SIZE + } + ) + } + _ => panic!("deadlock not detected"), + } } /// Generates a deadlock with a query loopback. #[test] fn deadlock_on_query_loopback() { + const MODEL_NAME: &str = "testmodel"; + let mut model = TestModel::default(); let mbox = Mailbox::new(); let addr = mbox.address(); @@ -58,17 +77,34 @@ fn deadlock_on_query_loopback() { .connect(TestModel::activate_requestor, addr.clone()); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); + let mut simu = SimInit::new() + .add_model(model, mbox, MODEL_NAME) + .init(t0) + .unwrap(); - assert!(matches!( - simu.process_event(TestModel::activate_requestor, (), addr), - Err(ExecutionError::Deadlock(_)) - )); + match simu.process_query(TestModel::activate_requestor, (), addr) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect only 1 deadlocked model. + assert_eq!(deadlock_info.len(), 1); + // We expect the mailbox to have a single query. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL_NAME.into(), + mailbox_size: 1, + } + ); + } + _ => panic!("deadlock not detected"), + } } /// Generates a deadlock with a query loopback involving several models. #[test] fn deadlock_on_transitive_query_loopback() { + const MODEL1_NAME: &str = "testmodel1"; + const MODEL2_NAME: &str = "testmodel2"; + let mut model1 = TestModel::default(); let mut model2 = TestModel::default(); let mbox1 = Mailbox::new(); @@ -78,7 +114,7 @@ fn deadlock_on_transitive_query_loopback() { model1 .requestor - .connect(TestModel::activate_requestor, addr2.clone()); + .connect(TestModel::activate_requestor, addr2); model2 .requestor @@ -86,13 +122,90 @@ fn deadlock_on_transitive_query_loopback() { let t0 = MonotonicTime::EPOCH; let mut simu = SimInit::new() - .add_model(model1, mbox1, "") - .add_model(model2, mbox2, "") + .add_model(model1, mbox1, MODEL1_NAME) + .add_model(model2, mbox2, MODEL2_NAME) .init(t0) .unwrap(); - assert!(matches!( - simu.process_event(TestModel::activate_requestor, (), addr1), - Err(ExecutionError::Deadlock(_)) - )); + match simu.process_query(TestModel::activate_requestor, (), addr1) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect only 1 deadlocked model. + assert_eq!(deadlock_info.len(), 1); + // We expect the mailbox of this model to have a single query. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL1_NAME.into(), + mailbox_size: 1, + } + ); + } + _ => panic!("deadlock not detected"), + } +} + +/// Generates deadlocks with query loopbacks on several models at the same time. +#[test] +fn deadlock_on_multiple_query_loopback() { + const MODEL0_NAME: &str = "testmodel0"; + const MODEL1_NAME: &str = "testmodel1"; + const MODEL2_NAME: &str = "testmodel2"; + + let mut model0 = TestModel::default(); + let mut model1 = TestModel::default(); + let mut model2 = TestModel::default(); + let mbox0 = Mailbox::new(); + let mbox1 = Mailbox::new(); + let mbox2 = Mailbox::new(); + let addr0 = mbox0.address(); + let addr1 = mbox1.address(); + let addr2 = mbox2.address(); + + model0 + .requestor + .connect(TestModel::activate_requestor, addr1.clone()); + + model0 + .requestor + .connect(TestModel::activate_requestor, addr2.clone()); + + model1 + .requestor + .connect(TestModel::activate_requestor, addr1); + + model2 + .requestor + .connect(TestModel::activate_requestor, addr2); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::new() + .add_model(model0, mbox0, MODEL0_NAME) + .add_model(model1, mbox1, MODEL1_NAME) + .add_model(model2, mbox2, MODEL2_NAME) + .init(t0) + .unwrap(); + + match simu.process_query(TestModel::activate_requestor, (), addr0) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect 2 deadlocked models. + assert_eq!(deadlock_info.len(), 2); + // We expect the mailbox of each deadlocked model to have a single + // query. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL1_NAME.into(), + mailbox_size: 1, + } + ); + assert_eq!( + deadlock_info[1], + DeadlockInfo { + model_name: MODEL2_NAME.into(), + mailbox_size: 1, + } + ); + } + _ => panic!("deadlock not detected"), + } }