1
0
forked from ROMEO/nexosim

Merge pull request #51 from asynchronics/feature/deadlock-detection

Feature/deadlock detection
This commit is contained in:
Jauhien Piatlicki 2024-10-29 11:02:55 +01:00 committed by GitHub
commit 8f7057689c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 829 additions and 245 deletions

View File

@ -25,7 +25,7 @@ use std::time::Duration;
use asynchronix::model::{Model, SetupContext}; use asynchronix::model::{Model, SetupContext};
use asynchronix::ports::{EventBuffer, Output}; use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit, SimulationError};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
mod stepper_motor; mod stepper_motor;
@ -84,7 +84,7 @@ impl Model for MotorAssembly {
} }
} }
fn main() { fn main() -> Result<(), SimulationError> {
// --------------- // ---------------
// Bench assembly. // Bench assembly.
// --------------- // ---------------
@ -107,7 +107,7 @@ fn main() {
// Assembly and initialization. // Assembly and initialization.
let mut simu = SimInit::new() let mut simu = SimInit::new()
.add_model(assembly, assembly_mbox, "assembly") .add_model(assembly, assembly_mbox, "assembly")
.init(t0); .init(t0)?;
let scheduler = simu.scheduler(); let scheduler = simu.scheduler();
@ -132,10 +132,10 @@ fn main() {
.unwrap(); .unwrap();
// Advance simulation time to two next events. // Advance simulation time to two next events.
simu.step(); simu.step()?;
t += Duration::new(2, 0); t += Duration::new(2, 0);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
simu.step(); simu.step()?;
t += Duration::new(0, 100_000_000); t += Duration::new(0, 100_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
@ -147,7 +147,7 @@ fn main() {
// Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to // Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to
// 9 position increments. // 9 position increments.
simu.step_by(Duration::new(0, 900_000_000)); simu.step_by(Duration::new(0, 900_000_000))?;
t += Duration::new(0, 900_000_000); t += Duration::new(0, 900_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
for _ in 0..9 { for _ in 0..9 {
@ -155,4 +155,6 @@ fn main() {
assert_eq!(position.next(), Some(pos)); assert_eq!(position.next(), Some(pos));
} }
assert!(position.next().is_none()); assert!(position.next().is_none());
Ok(())
} }

View File

@ -35,7 +35,7 @@ use std::time::Duration;
use asynchronix::model::{Context, InitializedModel, Model}; use asynchronix::model::{Context, InitializedModel, Model};
use asynchronix::ports::{EventSlot, Output}; use asynchronix::ports::{EventSlot, Output};
use asynchronix::simulation::{ActionKey, Mailbox, SimInit}; use asynchronix::simulation::{ActionKey, Mailbox, SimInit, SimulationError};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
/// Water pump. /// Water pump.
@ -332,7 +332,7 @@ pub enum WaterSenseState {
NotEmpty, NotEmpty,
} }
fn main() { fn main() -> Result<(), SimulationError> {
// --------------- // ---------------
// Bench assembly. // Bench assembly.
// --------------- // ---------------
@ -375,7 +375,7 @@ fn main() {
.add_model(controller, controller_mbox, "controller") .add_model(controller, controller_mbox, "controller")
.add_model(pump, pump_mbox, "pump") .add_model(pump, pump_mbox, "pump")
.add_model(tank, tank_mbox, "tank") .add_model(tank, tank_mbox, "tank")
.init(t0); .init(t0)?;
let scheduler = simu.scheduler(); let scheduler = simu.scheduler();
@ -388,10 +388,10 @@ fn main() {
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
// Brew one espresso shot with the default brew time. // Brew one espresso shot with the default brew time.
simu.process_event(Controller::brew_cmd, (), &controller_addr); simu.process_event(Controller::brew_cmd, (), &controller_addr)?;
assert_eq!(flow_rate.next(), Some(pump_flow_rate)); assert_eq!(flow_rate.next(), Some(pump_flow_rate));
simu.step(); simu.step()?;
t += Controller::DEFAULT_BREW_TIME; t += Controller::DEFAULT_BREW_TIME;
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
assert_eq!(flow_rate.next(), Some(0.0)); assert_eq!(flow_rate.next(), Some(0.0));
@ -400,33 +400,33 @@ fn main() {
let volume_per_shot = pump_flow_rate * Controller::DEFAULT_BREW_TIME.as_secs_f64(); let volume_per_shot = pump_flow_rate * Controller::DEFAULT_BREW_TIME.as_secs_f64();
let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who cares about floating-point rounding errors? let shots_per_tank = (init_tank_volume / volume_per_shot) as u64; // YOLO--who cares about floating-point rounding errors?
for _ in 0..(shots_per_tank - 1) { for _ in 0..(shots_per_tank - 1) {
simu.process_event(Controller::brew_cmd, (), &controller_addr); simu.process_event(Controller::brew_cmd, (), &controller_addr)?;
assert_eq!(flow_rate.next(), Some(pump_flow_rate)); assert_eq!(flow_rate.next(), Some(pump_flow_rate));
simu.step(); simu.step()?;
t += Controller::DEFAULT_BREW_TIME; t += Controller::DEFAULT_BREW_TIME;
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
assert_eq!(flow_rate.next(), Some(0.0)); assert_eq!(flow_rate.next(), Some(0.0));
} }
// Check that the tank becomes empty before the completion of the next shot. // Check that the tank becomes empty before the completion of the next shot.
simu.process_event(Controller::brew_cmd, (), &controller_addr); simu.process_event(Controller::brew_cmd, (), &controller_addr)?;
simu.step(); simu.step()?;
assert!(simu.time() < t + Controller::DEFAULT_BREW_TIME); assert!(simu.time() < t + Controller::DEFAULT_BREW_TIME);
t = simu.time(); t = simu.time();
assert_eq!(flow_rate.next(), Some(0.0)); assert_eq!(flow_rate.next(), Some(0.0));
// Try to brew another shot while the tank is still empty. // Try to brew another shot while the tank is still empty.
simu.process_event(Controller::brew_cmd, (), &controller_addr); simu.process_event(Controller::brew_cmd, (), &controller_addr)?;
assert!(flow_rate.next().is_none()); assert!(flow_rate.next().is_none());
// Change the brew time and fill up the tank. // Change the brew time and fill up the tank.
let brew_time = Duration::new(30, 0); let brew_time = Duration::new(30, 0);
simu.process_event(Controller::brew_time, brew_time, &controller_addr); simu.process_event(Controller::brew_time, brew_time, &controller_addr)?;
simu.process_event(Tank::fill, 1.0e-3, tank_addr); simu.process_event(Tank::fill, 1.0e-3, tank_addr)?;
simu.process_event(Controller::brew_cmd, (), &controller_addr); simu.process_event(Controller::brew_cmd, (), &controller_addr)?;
assert_eq!(flow_rate.next(), Some(pump_flow_rate)); assert_eq!(flow_rate.next(), Some(pump_flow_rate));
simu.step(); simu.step()?;
t += brew_time; t += brew_time;
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
assert_eq!(flow_rate.next(), Some(0.0)); assert_eq!(flow_rate.next(), Some(0.0));
@ -440,11 +440,13 @@ fn main() {
&controller_addr, &controller_addr,
) )
.unwrap(); .unwrap();
simu.process_event(Controller::brew_cmd, (), &controller_addr); simu.process_event(Controller::brew_cmd, (), &controller_addr)?;
assert_eq!(flow_rate.next(), Some(pump_flow_rate)); assert_eq!(flow_rate.next(), Some(pump_flow_rate));
simu.step(); simu.step()?;
t += Duration::from_secs(15); t += Duration::from_secs(15);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
assert_eq!(flow_rate.next(), Some(0.0)); assert_eq!(flow_rate.next(), Some(0.0));
Ok(())
} }

View File

@ -32,7 +32,7 @@ use mio::{Events, Interest, Poll, Token};
use asynchronix::model::{Context, InitializedModel, Model, SetupContext}; use asynchronix::model::{Context, InitializedModel, Model, SetupContext};
use asynchronix::ports::{EventBuffer, Output}; use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit, SimulationError};
use asynchronix::time::{AutoSystemClock, MonotonicTime}; use asynchronix::time::{AutoSystemClock, MonotonicTime};
const DELTA: Duration = Duration::from_millis(2); const DELTA: Duration = Duration::from_millis(2);
@ -184,7 +184,7 @@ impl Drop for Listener {
} }
} }
fn main() { fn main() -> Result<(), SimulationError> {
// --------------- // ---------------
// Bench assembly. // Bench assembly.
// --------------- // ---------------
@ -210,7 +210,7 @@ fn main() {
let mut simu = SimInit::new() let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener") .add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new()) .set_clock(AutoSystemClock::new())
.init(t0); .init(t0)?;
// ---------- // ----------
// Simulation. // Simulation.
@ -231,7 +231,7 @@ fn main() {
}); });
// Advance simulation, external messages will be collected. // Advance simulation, external messages will be collected.
simu.step_by(Duration::from_secs(2)); simu.step_by(Duration::from_secs(2))?;
// Check collected external messages. // Check collected external messages.
let mut packets = 0_u32; let mut packets = 0_u32;
@ -244,4 +244,6 @@ fn main() {
assert_eq!(message.next(), None); assert_eq!(message.next(), None);
sender_handle.join().unwrap(); sender_handle.join().unwrap();
Ok(())
} }

View File

@ -28,7 +28,7 @@
//! ``` //! ```
use asynchronix::model::Model; use asynchronix::model::Model;
use asynchronix::ports::{EventSlot, Output, Requestor}; use asynchronix::ports::{EventSlot, Output, Requestor};
use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit, SimulationError};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
/// Power supply. /// Power supply.
@ -99,7 +99,7 @@ impl Load {
impl Model for Load {} impl Model for Load {}
fn main() { fn main() -> Result<(), SimulationError> {
// --------------- // ---------------
// Bench assembly. // Bench assembly.
// --------------- // ---------------
@ -144,7 +144,7 @@ fn main() {
.add_model(load1, load1_mbox, "load1") .add_model(load1, load1_mbox, "load1")
.add_model(load2, load2_mbox, "load2") .add_model(load2, load2_mbox, "load2")
.add_model(load3, load3_mbox, "load3") .add_model(load3, load3_mbox, "load3")
.init(t0); .init(t0)?;
// ---------- // ----------
// Simulation. // Simulation.
@ -158,7 +158,7 @@ fn main() {
// Vary the supply voltage, check the load and power supply consumptions. // Vary the supply voltage, check the load and power supply consumptions.
for voltage in [10.0, 15.0, 20.0] { for voltage in [10.0, 15.0, 20.0] {
simu.process_event(PowerSupply::voltage_setting, voltage, &psu_addr); simu.process_event(PowerSupply::voltage_setting, voltage, &psu_addr)?;
let v_square = voltage * voltage; let v_square = voltage * voltage;
assert!(same_power(load1_power.next().unwrap(), v_square / r1)); assert!(same_power(load1_power.next().unwrap(), v_square / r1));
@ -169,4 +169,6 @@ fn main() {
v_square * (1.0 / r1 + 1.0 / r2 + 1.0 / r3) v_square * (1.0 / r1 + 1.0 / r2 + 1.0 / r3)
)); ));
} }
Ok(())
} }

View File

@ -205,7 +205,7 @@ impl Driver {
impl Model for Driver {} impl Model for Driver {}
#[allow(dead_code)] #[allow(dead_code)]
fn main() { fn main() -> Result<(), asynchronix::simulation::SimulationError> {
// --------------- // ---------------
// Bench assembly. // Bench assembly.
// --------------- // ---------------
@ -235,7 +235,7 @@ fn main() {
let mut simu = SimInit::new() let mut simu = SimInit::new()
.add_model(driver, driver_mbox, "driver") .add_model(driver, driver_mbox, "driver")
.add_model(motor, motor_mbox, "motor") .add_model(motor, motor_mbox, "motor")
.init(t0); .init(t0)?;
let scheduler = simu.scheduler(); let scheduler = simu.scheduler();
@ -260,10 +260,10 @@ fn main() {
.unwrap(); .unwrap();
// Advance simulation time to two next events. // Advance simulation time to two next events.
simu.step(); simu.step()?;
t += Duration::new(2, 0); t += Duration::new(2, 0);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
simu.step(); simu.step()?;
t += Duration::new(0, 100_000_000); t += Duration::new(0, 100_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
@ -275,7 +275,7 @@ fn main() {
// Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to // Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to
// 9 position increments. // 9 position increments.
simu.step_by(Duration::new(0, 900_000_000)); simu.step_by(Duration::new(0, 900_000_000))?;
t += Duration::new(0, 900_000_000); t += Duration::new(0, 900_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
for _ in 0..9 { for _ in 0..9 {
@ -285,24 +285,24 @@ fn main() {
assert!(position.next().is_none()); assert!(position.next().is_none());
// Increase the load beyond the torque limit for a 1A driver current. // Increase the load beyond the torque limit for a 1A driver current.
simu.process_event(Motor::load, 2.0, &motor_addr); simu.process_event(Motor::load, 2.0, &motor_addr)?;
// Advance simulation time and check that the motor is blocked. // Advance simulation time and check that the motor is blocked.
simu.step(); simu.step()?;
t += Duration::new(0, 100_000_000); t += Duration::new(0, 100_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
assert!(position.next().is_none()); assert!(position.next().is_none());
// Do it again. // Do it again.
simu.step(); simu.step()?;
t += Duration::new(0, 100_000_000); t += Duration::new(0, 100_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
assert!(position.next().is_none()); assert!(position.next().is_none());
// Decrease the load below the torque limit for a 1A driver current and // Decrease the load below the torque limit for a 1A driver current and
// advance simulation time. // advance simulation time.
simu.process_event(Motor::load, 0.5, &motor_addr); simu.process_event(Motor::load, 0.5, &motor_addr)?;
simu.step(); simu.step()?;
t += Duration::new(0, 100_000_000); t += Duration::new(0, 100_000_000);
// The motor should start moving again, but since the phase was incremented // The motor should start moving again, but since the phase was incremented
@ -314,7 +314,7 @@ fn main() {
// Advance simulation time by 0.7s, which with a 10Hz PPS should correspond to // Advance simulation time by 0.7s, which with a 10Hz PPS should correspond to
// 7 position increments. // 7 position increments.
simu.step_by(Duration::new(0, 700_000_000)); simu.step_by(Duration::new(0, 700_000_000))?;
t += Duration::new(0, 700_000_000); t += Duration::new(0, 700_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
for _ in 0..7 { for _ in 0..7 {
@ -325,8 +325,8 @@ fn main() {
// Now make the motor rotate in the opposite direction. Note that this // Now make the motor rotate in the opposite direction. Note that this
// driver only accounts for a new PPS at the next pulse. // driver only accounts for a new PPS at the next pulse.
simu.process_event(Driver::pulse_rate, -10.0, &driver_addr); simu.process_event(Driver::pulse_rate, -10.0, &driver_addr)?;
simu.step(); simu.step()?;
t += Duration::new(0, 100_000_000); t += Duration::new(0, 100_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
pos = (pos + 1) % Motor::STEPS_PER_REV; pos = (pos + 1) % Motor::STEPS_PER_REV;
@ -334,9 +334,11 @@ fn main() {
// Advance simulation time by 1.9s, which with a -10Hz PPS should correspond // Advance simulation time by 1.9s, which with a -10Hz PPS should correspond
// to 19 position decrements. // to 19 position decrements.
simu.step_by(Duration::new(1, 900_000_000)); simu.step_by(Duration::new(1, 900_000_000))?;
t += Duration::new(1, 900_000_000); t += Duration::new(1, 900_000_000);
assert_eq!(simu.time(), t); assert_eq!(simu.time(), t);
pos = (pos + Motor::STEPS_PER_REV - 19) % Motor::STEPS_PER_REV; pos = (pos + Motor::STEPS_PER_REV - 19) % Motor::STEPS_PER_REV;
assert_eq!(position.by_ref().last(), Some(pos)); assert_eq!(position.by_ref().last(), Some(pos));
Ok(())
} }

View File

@ -4,6 +4,7 @@
mod queue; mod queue;
use std::cell::Cell;
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
@ -20,6 +21,14 @@ use recycle_box::coerce_box;
use crate::model::{Context, Model}; use crate::model::{Context, Model};
// Counts the difference between the number of sent and received messages for
// this thread.
//
// This is used by the executor to make sure that all messages have been
// received upon completion of a simulation step, i.e. that no deadlock
// occurred.
thread_local! { pub(crate) static THREAD_MSG_COUNT: Cell<isize> = const { Cell::new(0) }; }
/// Data shared between the receiver and the senders. /// Data shared between the receiver and the senders.
struct Inner<M> { struct Inner<M> {
/// Non-blocking internal queue. /// Non-blocking internal queue.
@ -84,6 +93,13 @@ impl<M: Model> Receiver<M> {
} }
} }
/// Creates a new observer.
pub(crate) fn observer(&self) -> impl ChannelObserver {
Observer {
inner: self.inner.clone(),
}
}
/// Receives and executes a message asynchronously, if necessary waiting /// Receives and executes a message asynchronously, if necessary waiting
/// until one becomes available. /// until one becomes available.
pub(crate) async fn recv( pub(crate) async fn recv(
@ -104,12 +120,16 @@ impl<M: Model> Receiver<M> {
match msg { match msg {
Some(mut msg) => { Some(mut msg) => {
// Consume the message to obtain a boxed future. // Decrement the count of in-flight messages.
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));
// Take the message to obtain a boxed future.
let fut = msg.call_once(model, context, self.future_box.take().unwrap()); let fut = msg.call_once(model, context, self.future_box.take().unwrap());
// Now that `msg` was consumed and its slot in the queue was // Now that the message was taken, drop `msg` to free its slot
// freed, signal to one awaiting sender that one slot is // in the queue and signal to one awaiting sender that a slot is
// available for sending. // available for sending.
drop(msg);
self.inner.sender_signal.notify_one(); self.inner.sender_signal.notify_one();
// Await the future provided by the message. // Await the future provided by the message.
@ -219,6 +239,9 @@ impl<M: Model> Sender<M> {
if success { if success {
self.inner.receiver_signal.notify(); self.inner.receiver_signal.notify();
// Increment the count of in-flight messages.
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_add(1));
Ok(()) Ok(())
} else { } else {
Err(SendError) Err(SendError)
@ -275,6 +298,37 @@ impl<M> Clone for Sender<M> {
} }
} }
/// A model-independent handle to a channel that can observe the current number
/// of messages.
pub(crate) trait ChannelObserver: Send {
/// Returns the current number of messages in the channel.
///
/// # Warning
///
/// The returned result is only meaningful if it can be established than
/// there are no concurrent send or receive operations on the channel.
/// Otherwise, the returned value may neither reflect the current state nor
/// the past state of the channel, and may be greater than the capacity of
/// the channel.
fn len(&self) -> usize;
}
/// A handle to a channel that can observe the current number of messages.
///
/// Multiple [`Observer`]s can be created using the [`Receiver::observer`]
/// method or via cloning.
#[derive(Clone)]
pub(crate) struct Observer<M: 'static> {
/// Shared data.
inner: Arc<Inner<M>>,
}
impl<M: Model> ChannelObserver for Observer<M> {
fn len(&self) -> usize {
self.inner.queue.len()
}
}
impl<M: 'static> Drop for Sender<M> { impl<M: 'static> Drop for Sender<M> {
fn drop(&mut self) { fn drop(&mut self) {
// Decrease the reference count of senders. // Decrease the reference count of senders.

View File

@ -122,7 +122,7 @@ pub(super) struct Queue<T: ?Sized> {
/// and the producer. The reason it is shared is that the drop handler of /// and the producer. The reason it is shared is that the drop handler of
/// the last `Inner` owner (which may be a producer) needs access to the /// the last `Inner` owner (which may be a producer) needs access to the
/// dequeue position. /// dequeue position.
dequeue_pos: CachePadded<UnsafeCell<usize>>, dequeue_pos: CachePadded<AtomicUsize>,
/// Buffer holding the closures and their stamps. /// Buffer holding the closures and their stamps.
buffer: Box<[Slot<T>]>, buffer: Box<[Slot<T>]>,
@ -160,7 +160,7 @@ impl<T: ?Sized> Queue<T> {
Queue { Queue {
enqueue_pos: CachePadded::new(AtomicUsize::new(0)), enqueue_pos: CachePadded::new(AtomicUsize::new(0)),
dequeue_pos: CachePadded::new(UnsafeCell::new(0)), dequeue_pos: CachePadded::new(AtomicUsize::new(0)),
buffer: buffer.into(), buffer: buffer.into(),
right_mask, right_mask,
closed_channel_mask, closed_channel_mask,
@ -241,7 +241,7 @@ impl<T: ?Sized> Queue<T> {
/// ///
/// This method may not be called concurrently from multiple threads. /// This method may not be called concurrently from multiple threads.
pub(super) unsafe fn pop(&self) -> Result<MessageBorrow<'_, T>, PopError> { pub(super) unsafe fn pop(&self) -> Result<MessageBorrow<'_, T>, PopError> {
let dequeue_pos = self.dequeue_pos.with(|p| *p); let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed);
let index = dequeue_pos & self.right_mask; let index = dequeue_pos & self.right_mask;
let slot = &self.buffer[index]; let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire); let stamp = slot.stamp.load(Ordering::Acquire);
@ -251,10 +251,10 @@ impl<T: ?Sized> Queue<T> {
// closure can be popped. // closure can be popped.
debug_or_loom_assert_eq!(stamp, dequeue_pos + 1); debug_or_loom_assert_eq!(stamp, dequeue_pos + 1);
// Only this thread can access the dequeue position so there is no // Only this thread can modify the dequeue position so there is no
// need to increment the position atomically with a `fetch_add`. // need to increment the position atomically with a `fetch_add`.
self.dequeue_pos self.dequeue_pos
.with_mut(|p| *p = self.next_queue_pos(dequeue_pos)); .store(self.next_queue_pos(dequeue_pos), Ordering::Relaxed);
// Extract the closure from the slot and set the stamp to the value of // Extract the closure from the slot and set the stamp to the value of
// the dequeue position increased by one sequence increment. // the dequeue position increased by one sequence increment.
@ -318,6 +318,30 @@ impl<T: ?Sized> Queue<T> {
self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0 self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0
} }
/// Returns the number of items in the queue.
///
/// # Warning
///
/// While this method is safe by Rust's standard, the returned result is
/// only meaningful if it can be established than there are no concurrent
/// `push` or `pop` operations. Otherwise, the returned value may neither
/// reflect the current state nor the past state of the queue, and may be
/// greater than the capacity of the queue.
pub(super) fn len(&self) -> usize {
let enqueue_pos = self.enqueue_pos.load(Ordering::Relaxed);
let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed);
let enqueue_idx = enqueue_pos & (self.right_mask >> 1);
let dequeue_idx = dequeue_pos & (self.right_mask >> 1);
// Establish whether the sequence numbers of the enqueue and dequeue
// positions differ. If yes, it means the enqueue position has wrapped
// around one more time so the difference between indices must be
// increased by the buffer capacity.
let carry_flag = (enqueue_pos & !self.right_mask) != (dequeue_pos & !self.right_mask);
(enqueue_idx + (carry_flag as usize) * self.buffer.len()) - dequeue_idx
}
/// Increment the queue position, incrementing the sequence count as well if /// Increment the queue position, incrementing the sequence count as well if
/// the index wraps to 0. /// the index wraps to 0.
/// ///
@ -423,6 +447,12 @@ impl<T: ?Sized> Consumer<T> {
fn close(&self) { fn close(&self) {
self.inner.close(); self.inner.close();
} }
/// Returns the number of items.
#[cfg(not(asynchronix_loom))]
fn len(&self) -> usize {
self.inner.len()
}
} }
#[cfg(test)] #[cfg(test)]
@ -569,6 +599,52 @@ mod tests {
fn queue_mpsc_capacity_three() { fn queue_mpsc_capacity_three() {
queue_mpsc(3); queue_mpsc(3);
} }
#[test]
fn queue_len() {
let (p, mut c) = queue(4);
let _ = p.push(|b| RecycleBox::recycle(b, 0));
assert_eq!(c.len(), 1);
let _ = p.push(|b| RecycleBox::recycle(b, 1));
assert_eq!(c.len(), 2);
let _ = c.pop();
assert_eq!(c.len(), 1);
let _ = p.push(|b| RecycleBox::recycle(b, 2));
assert_eq!(c.len(), 2);
let _ = p.push(|b| RecycleBox::recycle(b, 3));
assert_eq!(c.len(), 3);
let _ = c.pop();
assert_eq!(c.len(), 2);
let _ = p.push(|b| RecycleBox::recycle(b, 4));
assert_eq!(c.len(), 3);
let _ = c.pop();
assert_eq!(c.len(), 2);
let _ = p.push(|b| RecycleBox::recycle(b, 5));
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 6));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 7));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 8));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 9));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = c.pop();
assert_eq!(c.len(), 2);
let _ = c.pop();
assert_eq!(c.len(), 1);
let _ = c.pop();
assert_eq!(c.len(), 0);
}
} }
/// Loom tests. /// Loom tests.

View File

@ -43,6 +43,6 @@ impl Executor {
/// Let the executor run, blocking until all futures have completed or until /// Let the executor run, blocking until all futures have completed or until
/// the executor deadlocks. /// the executor deadlocks.
pub fn run(&mut self) { pub fn run(&mut self) {
self.0.run(); self.0.run().unwrap();
} }
} }

View File

@ -15,6 +15,12 @@ use task::Promise;
/// Unique identifier for executor instances. /// Unique identifier for executor instances.
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(PartialEq, Eq, Debug)]
pub(crate) enum ExecutorError {
/// The simulation has deadlocked.
Deadlock,
}
/// Context common to all executor types. /// Context common to all executor types.
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct SimulationContext { pub(crate) struct SimulationContext {
@ -43,8 +49,8 @@ impl Executor {
/// ///
/// # Panics /// # Panics
/// ///
/// This will panic if the specified number of threads is zero or is more /// This will panic if the specified number of threads is zero or more than
/// than `usize::BITS`. /// `usize::BITS`.
pub(crate) fn new_multi_threaded( pub(crate) fn new_multi_threaded(
num_threads: usize, num_threads: usize,
simulation_context: SimulationContext, simulation_context: SimulationContext,
@ -85,11 +91,19 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock. /// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) { pub(crate) fn run(&mut self) -> Result<(), ExecutorError> {
match self { let msg_count = match self {
Self::StExecutor(executor) => executor.run(), Self::StExecutor(executor) => executor.run(),
Self::MtExecutor(executor) => executor.run(), Self::MtExecutor(executor) => executor.run(),
};
if msg_count != 0 {
assert!(msg_count > 0);
return Err(ExecutorError::Deadlock);
} }
Ok(())
} }
} }
@ -98,7 +112,7 @@ mod tests {
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use futures_channel::{mpsc, oneshot}; use futures_channel::mpsc;
use futures_util::StreamExt; use futures_util::StreamExt;
use super::*; use super::*;
@ -131,47 +145,6 @@ mod tests {
} }
} }
fn executor_deadlock(mut executor: Executor) {
let (_sender1, receiver1) = oneshot::channel::<()>();
let (_sender2, receiver2) = oneshot::channel::<()>();
let launch_count = Arc::new(AtomicUsize::new(0));
let completion_count = Arc::new(AtomicUsize::new(0));
executor.spawn_and_forget({
let launch_count = launch_count.clone();
let completion_count = completion_count.clone();
async move {
launch_count.fetch_add(1, Ordering::Relaxed);
let _ = receiver2.await;
completion_count.fetch_add(1, Ordering::Relaxed);
}
});
executor.spawn_and_forget({
let launch_count = launch_count.clone();
let completion_count = completion_count.clone();
async move {
launch_count.fetch_add(1, Ordering::Relaxed);
let _ = receiver1.await;
completion_count.fetch_add(1, Ordering::Relaxed);
}
});
executor.run();
// Check that the executor returns on deadlock, i.e. none of the task has
// completed.
assert_eq!(launch_count.load(Ordering::Relaxed), 2);
assert_eq!(completion_count.load(Ordering::Relaxed), 0);
// Drop the executor and thus the receiver tasks before the senders,
// failing which the senders may signal that the channel has been
// dropped and wake the tasks outside the executor.
drop(executor);
}
fn executor_drop_cycle(mut executor: Executor) { fn executor_drop_cycle(mut executor: Executor) {
let (sender1, mut receiver1) = mpsc::channel(2); let (sender1, mut receiver1) = mpsc::channel(2);
let (sender2, mut receiver2) = mpsc::channel(2); let (sender2, mut receiver2) = mpsc::channel(2);
@ -223,7 +196,7 @@ mod tests {
} }
}); });
executor.run(); executor.run().unwrap();
// Make sure that all tasks are eventually dropped even though each task // Make sure that all tasks are eventually dropped even though each task
// wakes the others when dropped. // wakes the others when dropped.
@ -231,20 +204,6 @@ mod tests {
assert_eq!(drop_count.load(Ordering::Relaxed), 3); assert_eq!(drop_count.load(Ordering::Relaxed), 3);
} }
#[test]
fn executor_deadlock_st() {
executor_deadlock(Executor::new_single_threaded(dummy_simulation_context()));
}
#[test]
fn executor_deadlock_mt() {
executor_deadlock(Executor::new_multi_threaded(3, dummy_simulation_context()));
}
#[test]
fn executor_deadlock_mt_one_worker() {
executor_deadlock(Executor::new_multi_threaded(1, dummy_simulation_context()));
}
#[test] #[test]
fn executor_drop_cycle_st() { fn executor_drop_cycle_st() {
executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context())); executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context()));

View File

@ -48,7 +48,7 @@ use std::cell::Cell;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::panic::{self, AssertUnwindSafe}; use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -56,8 +56,9 @@ use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker}; use crossbeam_utils::sync::{Parker, Unparker};
use slab::Slab; use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable}; use crate::channel;
use super::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT}; use crate::executor::task::{self, CancelToken, Promise, Runnable};
use crate::executor::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local; use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::util::rng::Rng; use crate::util::rng::Rng;
use pool_manager::PoolManager; use pool_manager::PoolManager;
@ -224,7 +225,10 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock. /// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) { ///
/// The number of unprocessed messages is returned. It should always be 0
/// unless a deadlock occurred.
pub(crate) fn run(&mut self) -> isize {
self.context.pool_manager.activate_worker(); self.context.pool_manager.activate_worker();
loop { loop {
@ -232,7 +236,7 @@ impl Executor {
panic::resume_unwind(worker_panic); panic::resume_unwind(worker_panic);
} }
if self.context.pool_manager.pool_is_idle() { if self.context.pool_manager.pool_is_idle() {
return; return self.context.msg_count.load(Ordering::Relaxed);
} }
self.parker.park(); self.parker.park();
@ -298,6 +302,11 @@ struct ExecutorContext {
executor_unparker: Unparker, executor_unparker: Unparker,
/// Manager for all worker threads. /// Manager for all worker threads.
pool_manager: PoolManager, pool_manager: PoolManager,
/// Difference between the number of sent and received messages.
///
/// This counter is only updated by worker threads before they park and is
/// therefore only consistent once all workers are parked.
msg_count: AtomicIsize,
} }
impl ExecutorContext { impl ExecutorContext {
@ -320,6 +329,7 @@ impl ExecutorContext {
stealers.into_boxed_slice(), stealers.into_boxed_slice(),
worker_unparkers, worker_unparkers,
), ),
msg_count: AtomicIsize::new(0),
} }
} }
} }
@ -456,6 +466,15 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
let local_queue = &worker.local_queue; let local_queue = &worker.local_queue;
let fast_slot = &worker.fast_slot; let fast_slot = &worker.fast_slot;
// Update the global message counter.
let update_msg_count = || {
let thread_msg_count = channel::THREAD_MSG_COUNT.replace(0);
worker
.executor_context
.msg_count
.fetch_add(thread_msg_count, Ordering::Relaxed);
};
let result = panic::catch_unwind(AssertUnwindSafe(|| { let result = panic::catch_unwind(AssertUnwindSafe(|| {
// Set how long to spin when searching for a task. // Set how long to spin when searching for a task.
const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000); const MAX_SEARCH_DURATION: Duration = Duration::from_nanos(1000);
@ -468,9 +487,10 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// Try to deactivate the worker. // Try to deactivate the worker.
if pool_manager.try_set_worker_inactive(id) { if pool_manager.try_set_worker_inactive(id) {
parker.park();
// No need to call `begin_worker_search()`: this was done by the // No need to call `begin_worker_search()`: this was done by the
// thread that unparked the worker. // thread that unparked the worker.
update_msg_count();
parker.park();
} else if injector.is_empty() { } else if injector.is_empty() {
// This worker could not be deactivated because it was the last // This worker could not be deactivated because it was the last
// active worker. In such case, the call to // active worker. In such case, the call to
@ -479,6 +499,7 @@ fn run_local_worker(worker: &Worker, id: usize, parker: Parker) {
// not activate a new worker, which is why some tasks may now be // not activate a new worker, which is why some tasks may now be
// visible in the injector queue. // visible in the injector queue.
pool_manager.set_all_workers_inactive(); pool_manager.set_all_workers_inactive();
update_msg_count();
executor_unparker.unpark(); executor_unparker.unpark();
parker.park(); parker.park();
// No need to call `begin_worker_search()`: this was done by the // No need to call `begin_worker_search()`: this was done by the

View File

@ -8,6 +8,7 @@ use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable}; use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID; use super::NEXT_EXECUTOR_ID;
use crate::channel;
use crate::executor::{SimulationContext, SIMULATION_CONTEXT}; use crate::executor::{SimulationContext, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local; use crate::macros::scoped_thread_local::scoped_thread_local;
@ -105,7 +106,13 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or /// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock. /// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) { ///
/// The number of unprocessed messages is returned. It should always be 0
/// unless a deadlock occurred.
pub(crate) fn run(&mut self) -> isize {
// In case this executor is nested in another one, reset the counter of in-flight messages.
let msg_count_stash = channel::THREAD_MSG_COUNT.replace(self.context.msg_count);
SIMULATION_CONTEXT.set(&self.simulation_context, || { SIMULATION_CONTEXT.set(&self.simulation_context, || {
ACTIVE_TASKS.set(&self.active_tasks, || { ACTIVE_TASKS.set(&self.active_tasks, || {
EXECUTOR_CONTEXT.set(&self.context, || loop { EXECUTOR_CONTEXT.set(&self.context, || loop {
@ -118,6 +125,10 @@ impl Executor {
}) })
}) })
}); });
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
self.context.msg_count
} }
} }
@ -168,6 +179,8 @@ struct ExecutorContext {
/// Unique executor identifier inherited by all tasks spawned on this /// Unique executor identifier inherited by all tasks spawned on this
/// executor instance. /// executor instance.
executor_id: usize, executor_id: usize,
/// Number of in-flight messages.
msg_count: isize,
} }
impl ExecutorContext { impl ExecutorContext {
@ -176,6 +189,7 @@ impl ExecutorContext {
Self { Self {
queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)), queue: RefCell::new(Vec::with_capacity(QUEUE_MIN_CAPACITY)),
executor_id, executor_id,
msg_count: 0,
} }
} }
} }

View File

@ -10,14 +10,20 @@ import "google/protobuf/empty.proto";
enum ErrorCode { enum ErrorCode {
INTERNAL_ERROR = 0; INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1; SIMULATION_NOT_STARTED = 1;
MISSING_ARGUMENT = 2; SIMULATION_TERMINATED = 2;
INVALID_TIME = 3; SIMULATION_DEADLOCK = 3;
INVALID_DURATION = 4; SIMULATION_MODEL_ERROR = 4;
INVALID_MESSAGE = 5; SIMULATION_PANIC = 5;
INVALID_KEY = 6; SIMULATION_BAD_QUERY = 6;
SOURCE_NOT_FOUND = 10; SIMULATION_TIME_OUT_OF_RANGE = 7;
SINK_NOT_FOUND = 11; MISSING_ARGUMENT = 10;
SIMULATION_TIME_OUT_OF_RANGE = 12; INVALID_TIME = 11;
INVALID_DURATION = 12;
INVALID_PERIOD = 13;
INVALID_MESSAGE = 14;
INVALID_KEY = 15;
SOURCE_NOT_FOUND = 20;
SINK_NOT_FOUND = 21;
} }
message Error { message Error {

View File

@ -338,14 +338,20 @@ pub mod any_request {
pub enum ErrorCode { pub enum ErrorCode {
InternalError = 0, InternalError = 0,
SimulationNotStarted = 1, SimulationNotStarted = 1,
MissingArgument = 2, SimulationTerminated = 2,
InvalidTime = 3, SimulationDeadlock = 3,
InvalidDuration = 4, SimulationModelError = 4,
InvalidMessage = 5, SimulationPanic = 5,
InvalidKey = 6, SimulationBadQuery = 6,
SourceNotFound = 10, SimulationTimeOutOfRange = 22,
SinkNotFound = 11, MissingArgument = 7,
SimulationTimeOutOfRange = 12, InvalidTime = 8,
InvalidDuration = 9,
InvalidPeriod = 10,
InvalidMessage = 11,
InvalidKey = 12,
SourceNotFound = 20,
SinkNotFound = 21,
} }
impl ErrorCode { impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition. /// String value of the enum field names used in the ProtoBuf definition.
@ -356,14 +362,20 @@ impl ErrorCode {
match self { match self {
ErrorCode::InternalError => "INTERNAL_ERROR", ErrorCode::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationModelError => "SIMULATION_MODEL_ERROR",
ErrorCode::SimulationPanic => "SIMULATION_PANIC",
ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
ErrorCode::MissingArgument => "MISSING_ARGUMENT", ErrorCode::MissingArgument => "MISSING_ARGUMENT",
ErrorCode::InvalidTime => "INVALID_TIME", ErrorCode::InvalidTime => "INVALID_TIME",
ErrorCode::InvalidDuration => "INVALID_DURATION", ErrorCode::InvalidDuration => "INVALID_DURATION",
ErrorCode::InvalidPeriod => "INVALID_PERIOD",
ErrorCode::InvalidMessage => "INVALID_MESSAGE", ErrorCode::InvalidMessage => "INVALID_MESSAGE",
ErrorCode::InvalidKey => "INVALID_KEY", ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND", ErrorCode::SinkNotFound => "SINK_NOT_FOUND",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
} }
} }
/// Creates an enum from field names used in the ProtoBuf definition. /// Creates an enum from field names used in the ProtoBuf definition.
@ -371,14 +383,20 @@ impl ErrorCode {
match value { match value {
"INTERNAL_ERROR" => Some(Self::InternalError), "INTERNAL_ERROR" => Some(Self::InternalError),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MODEL_ERROR" => Some(Self::SimulationModelError),
"SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
"MISSING_ARGUMENT" => Some(Self::MissingArgument), "MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime), "INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_DURATION" => Some(Self::InvalidDuration), "INVALID_DURATION" => Some(Self::InvalidDuration),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_MESSAGE" => Some(Self::InvalidMessage), "INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey), "INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound), "SINK_NOT_FOUND" => Some(Self::SinkNotFound),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
_ => None, _ => None,
} }
} }

View File

@ -8,6 +8,7 @@ use prost_types::Timestamp;
use tai_time::MonotonicTime; use tai_time::MonotonicTime;
use super::codegen::simulation::{Error, ErrorCode}; use super::codegen::simulation::{Error, ErrorCode};
use crate::simulation::ExecutionError;
pub(crate) use controller_service::ControllerService; pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService; pub(crate) use init_service::InitService;
@ -29,6 +30,21 @@ fn simulation_not_started_error() -> Error {
) )
} }
/// Map an `ExecutionError` to a Protobuf error.
fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::ModelError { .. } => ErrorCode::SimulationModelError,
ExecutionError::Panic(_) => ErrorCode::SimulationPanic,
ExecutionError::BadQuery => ErrorCode::SimulationBadQuery,
ExecutionError::Terminated => ErrorCode::SimulationTerminated,
ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime,
};
let error_message = error.to_string();
to_error(error_code, error_message)
}
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. /// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
/// ///
/// This will fail if the time is outside the protobuf-specified range for /// This will fail if the time is outside the protobuf-specified range for

View File

@ -8,8 +8,8 @@ use crate::simulation::Simulation;
use super::super::codegen::simulation::*; use super::super::codegen::simulation::*;
use super::{ use super::{
monotonic_to_timestamp, simulation_not_started_error, timestamp_to_monotonic, to_error, map_execution_error, monotonic_to_timestamp, simulation_not_started_error,
to_positive_duration, to_strictly_positive_duration, timestamp_to_monotonic, to_error, to_positive_duration, to_strictly_positive_duration,
}; };
/// Protobuf-based simulation manager. /// Protobuf-based simulation manager.
@ -61,18 +61,19 @@ impl ControllerService {
/// processed events have completed. /// processed events have completed.
pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply { pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply {
let reply = match self { let reply = match self {
Self::Started { simulation, .. } => { Self::Started { simulation, .. } => match simulation.step() {
simulation.step(); Ok(()) => {
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { step_reply::Result::Time(timestamp)
step_reply::Result::Time(timestamp) } else {
} else { step_reply::Result::Error(to_error(
step_reply::Result::Error(to_error( ErrorCode::SimulationTimeOutOfRange,
ErrorCode::SimulationTimeOutOfRange, "the final simulation time is out of range",
"the final simulation time is out of range", ))
)) }
} }
} Err(e) => step_reply::Result::Error(map_execution_error(e)),
},
Self::NotStarted => step_reply::Result::Error(simulation_not_started_error()), Self::NotStarted => step_reply::Result::Error(simulation_not_started_error()),
}; };
@ -117,7 +118,7 @@ impl ControllerService {
"the specified deadline lies in the past", "the specified deadline lies in the past",
))?; ))?;
simulation.step_by(duration); simulation.step_by(duration).map_err(map_execution_error)?;
} }
}; };
@ -221,7 +222,7 @@ impl ControllerService {
} }
}); });
simulation.process(action); simulation.process(action).map_err(map_execution_error)?;
Ok(key_id) Ok(key_id)
}(), }(),
@ -315,9 +316,7 @@ impl ControllerService {
) )
})?; })?;
simulation.process(event); simulation.process(event).map_err(map_execution_error)
Ok(())
}(), }(),
Self::NotStarted => Err(simulation_not_started_error()), Self::NotStarted => Err(simulation_not_started_error()),
}; };
@ -360,11 +359,11 @@ impl ControllerService {
) )
})?; })?;
simulation.process(query); simulation.process(query).map_err(map_execution_error)?;
let replies = promise.take_collect().ok_or(to_error( let replies = promise.take_collect().ok_or(to_error(
ErrorCode::InternalError, ErrorCode::SimulationBadQuery,
"a reply to the query was expected but none was available".to_string(), "a reply to the query was expected but none was available; maybe the target model was not added to the simulation?".to_string(),
))?; ))?;
replies.map_err(|e| { replies.map_err(|e| {

View File

@ -5,7 +5,7 @@ use crate::registry::EndpointRegistry;
use crate::simulation::SimInit; use crate::simulation::SimInit;
use crate::simulation::Simulation; use crate::simulation::Simulation;
use super::{timestamp_to_monotonic, to_error}; use super::{map_execution_error, timestamp_to_monotonic, to_error};
use super::super::codegen::simulation::*; use super::super::codegen::simulation::*;
@ -69,7 +69,12 @@ impl InitService {
.ok_or_else(|| { .ok_or_else(|| {
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field") to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
}) })
.map(|start_time| (sim_init.init(start_time), registry)) .and_then(|start_time| {
sim_init
.init(start_time)
.map_err(map_execution_error)
.map(|sim| (sim, registry))
})
}); });
let (reply, bench) = match reply { let (reply, bench) = match reply {

View File

@ -235,7 +235,9 @@
//! .add_model(multiplier2, multiplier2_mbox, "multiplier2") //! .add_model(multiplier2, multiplier2_mbox, "multiplier2")
//! .add_model(delay1, delay1_mbox, "delay1") //! .add_model(delay1, delay1_mbox, "delay1")
//! .add_model(delay2, delay2_mbox, "delay2") //! .add_model(delay2, delay2_mbox, "delay2")
//! .init(t0); //! .init(t0)?;
//!
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ``` //! ```
//! //!
//! ## Running simulations //! ## Running simulations
@ -323,23 +325,25 @@
//! # .add_model(multiplier2, multiplier2_mbox, "multiplier2") //! # .add_model(multiplier2, multiplier2_mbox, "multiplier2")
//! # .add_model(delay1, delay1_mbox, "delay1") //! # .add_model(delay1, delay1_mbox, "delay1")
//! # .add_model(delay2, delay2_mbox, "delay2") //! # .add_model(delay2, delay2_mbox, "delay2")
//! # .init(t0); //! # .init(t0)?;
//! // Send a value to the first multiplier. //! // Send a value to the first multiplier.
//! simu.process_event(Multiplier::input, 21.0, &input_address); //! simu.process_event(Multiplier::input, 21.0, &input_address)?;
//! //!
//! // The simulation is still at t0 so nothing is expected at the output of the //! // The simulation is still at t0 so nothing is expected at the output of the
//! // second delay gate. //! // second delay gate.
//! assert!(output_slot.next().is_none()); //! assert!(output_slot.next().is_none());
//! //!
//! // Advance simulation time until the next event and check the time and output. //! // Advance simulation time until the next event and check the time and output.
//! simu.step(); //! simu.step()?;
//! assert_eq!(simu.time(), t0 + Duration::from_secs(1)); //! assert_eq!(simu.time(), t0 + Duration::from_secs(1));
//! assert_eq!(output_slot.next(), Some(84.0)); //! assert_eq!(output_slot.next(), Some(84.0));
//! //!
//! // Get the answer to the ultimate question of life, the universe & everything. //! // Get the answer to the ultimate question of life, the universe & everything.
//! simu.step(); //! simu.step()?;
//! assert_eq!(simu.time(), t0 + Duration::from_secs(2)); //! assert_eq!(simu.time(), t0 + Duration::from_secs(2));
//! assert_eq!(output_slot.next(), Some(42.0)); //! assert_eq!(output_slot.next(), Some(42.0));
//!
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ``` //! ```
//! //!
//! # Message ordering guarantees //! # Message ordering guarantees

View File

@ -26,7 +26,7 @@ use super::ReplierFn;
/// The `EventSource` port is similar to an [`Output`](crate::ports::Output) /// The `EventSource` port is similar to an [`Output`](crate::ports::Output)
/// port in that it can send events to connected input ports. It is not meant, /// port in that it can send events to connected input ports. It is not meant,
/// however, to be instantiated as a member of a model, but rather as a /// however, to be instantiated as a member of a model, but rather as a
/// simulation monitoring endpoint instantiated during bench assembly. /// simulation control endpoint instantiated during bench assembly.
pub struct EventSource<T: Clone + Send + 'static> { pub struct EventSource<T: Clone + Send + 'static> {
broadcaster: Arc<Mutex<EventBroadcaster<T>>>, broadcaster: Arc<Mutex<EventBroadcaster<T>>>,
} }

View File

@ -113,14 +113,15 @@
//! # impl Model for ModelB {}; //! # impl Model for ModelB {};
//! # let modelA_addr = Mailbox::<ModelA>::new().address(); //! # let modelA_addr = Mailbox::<ModelA>::new().address();
//! # let modelB_addr = Mailbox::<ModelB>::new().address(); //! # let modelB_addr = Mailbox::<ModelB>::new().address();
//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH); //! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH)?;
//! simu.process_event( //! simu.process_event(
//! |m: &mut ModelA| { //! |m: &mut ModelA| {
//! m.output.connect(ModelB::input, modelB_addr); //! m.output.connect(ModelB::input, modelB_addr);
//! }, //! },
//! (), //! (),
//! &modelA_addr //! &modelA_addr
//! ); //! )?;
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ``` //! ```
mod mailbox; mod mailbox;
mod scheduler; mod scheduler;
@ -143,7 +144,8 @@ use std::time::Duration;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::executor::Executor; use crate::channel::ChannelObserver;
use crate::executor::{Executor, ExecutorError};
use crate::model::{Context, Model, SetupContext}; use crate::model::{Context, Model, SetupContext};
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
use crate::time::{AtomicTime, Clock, MonotonicTime}; use crate::time::{AtomicTime, Clock, MonotonicTime};
@ -193,6 +195,8 @@ pub struct Simulation {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock>, clock: Box<dyn Clock>,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
is_terminated: bool,
} }
impl Simulation { impl Simulation {
@ -202,12 +206,15 @@ impl Simulation {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
) -> Self { ) -> Self {
Self { Self {
executor, executor,
scheduler_queue, scheduler_queue,
time, time,
clock, clock,
observers,
is_terminated: false,
} }
} }
@ -223,8 +230,8 @@ impl Simulation {
/// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured
/// simulation clock. This method blocks until all newly processed events /// simulation clock. This method blocks until all newly processed events
/// have completed. /// have completed.
pub fn step(&mut self) { pub fn step(&mut self) -> Result<(), ExecutionError> {
self.step_to_next_bounded(MonotonicTime::MAX); self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ())
} }
/// Iteratively advances the simulation time by the specified duration, as /// Iteratively advances the simulation time by the specified duration, as
@ -234,10 +241,10 @@ impl Simulation {
/// time have completed. The simulation time upon completion is equal to the /// time have completed. The simulation time upon completion is equal to the
/// initial simulation time incremented by the specified duration, whether /// initial simulation time incremented by the specified duration, whether
/// or not an event was scheduled for that time. /// or not an event was scheduled for that time.
pub fn step_by(&mut self, duration: Duration) { pub fn step_by(&mut self, duration: Duration) -> Result<(), ExecutionError> {
let target_time = self.time.read() + duration; let target_time = self.time.read() + duration;
self.step_until_unchecked(target_time); self.step_until_unchecked(target_time)
} }
/// Iteratively advances the simulation time until the specified deadline, /// Iteratively advances the simulation time until the specified deadline,
@ -247,16 +254,14 @@ impl Simulation {
/// time have completed. The simulation time upon completion is equal to the /// time have completed. The simulation time upon completion is equal to the
/// specified target time, whether or not an event was scheduled for that /// specified target time, whether or not an event was scheduled for that
/// time. /// time.
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), SchedulingError> { pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> {
if self.time.read() >= target_time { if self.time.read() >= target_time {
return Err(SchedulingError::InvalidScheduledTime); return Err(ExecutionError::InvalidTargetTime(target_time));
} }
self.step_until_unchecked(target_time); self.step_until_unchecked(target_time)
Ok(())
} }
/// Returns a scheduler handle. /// Returns an owned scheduler handle.
pub fn scheduler(&self) -> Scheduler { pub fn scheduler(&self) -> Scheduler {
Scheduler::new(self.scheduler_queue.clone(), self.time.reader()) Scheduler::new(self.scheduler_queue.clone(), self.time.reader())
} }
@ -265,15 +270,20 @@ impl Simulation {
/// ///
/// Simulation time remains unchanged. The periodicity of the action, if /// Simulation time remains unchanged. The periodicity of the action, if
/// any, is ignored. /// any, is ignored.
pub fn process(&mut self, action: Action) { pub fn process(&mut self, action: Action) -> Result<(), ExecutionError> {
action.spawn_and_forget(&self.executor); action.spawn_and_forget(&self.executor);
self.executor.run(); self.run()
} }
/// Processes an event immediately, blocking until completion. /// Processes an event immediately, blocking until completion.
/// ///
/// Simulation time remains unchanged. /// Simulation time remains unchanged.
pub fn process_event<M, F, T, S>(&mut self, func: F, arg: T, address: impl Into<Address<M>>) pub fn process_event<M, F, T, S>(
&mut self,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), ExecutionError>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S>,
@ -297,18 +307,19 @@ impl Simulation {
}; };
self.executor.spawn_and_forget(fut); self.executor.spawn_and_forget(fut);
self.executor.run(); self.run()
} }
/// Processes a query immediately, blocking until completion. /// Processes a query immediately, blocking until completion.
/// ///
/// Simulation time remains unchanged. /// Simulation time remains unchanged. If the targeted model was not added
/// to the simulation, an `ExecutionError::InvalidQuery` is returned.
pub fn process_query<M, F, T, R, S>( pub fn process_query<M, F, T, R, S>(
&mut self, &mut self,
func: F, func: F,
arg: T, arg: T,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> Result<R, QueryError> ) -> Result<R, ExecutionError>
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>, F: for<'a> ReplierFn<'a, M, T, R, S>,
@ -338,9 +349,36 @@ impl Simulation {
}; };
self.executor.spawn_and_forget(fut); self.executor.spawn_and_forget(fut);
self.executor.run(); self.run()?;
reply_reader.try_read().map_err(|_| QueryError {}) reply_reader
.try_read()
.map_err(|_| ExecutionError::BadQuery)
}
/// Runs the executor.
fn run(&mut self) -> Result<(), ExecutionError> {
if self.is_terminated {
return Err(ExecutionError::Terminated);
}
self.executor.run().map_err(|e| match e {
ExecutorError::Deadlock => {
self.is_terminated = true;
let mut deadlock_info = Vec::new();
for (name, observer) in &self.observers {
let mailbox_size = observer.len();
if mailbox_size != 0 {
deadlock_info.push(DeadlockInfo {
model_name: name.clone(),
mailbox_size,
});
}
}
ExecutionError::Deadlock(deadlock_info)
}
})
} }
/// Advances simulation time to that of the next scheduled action if its /// Advances simulation time to that of the next scheduled action if its
@ -349,7 +387,10 @@ impl Simulation {
/// ///
/// If at least one action was found that satisfied the time bound, the /// If at least one action was found that satisfied the time bound, the
/// corresponding new simulation time is returned. /// corresponding new simulation time is returned.
fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option<MonotonicTime> { fn step_to_next_bounded(
&mut self,
upper_time_bound: MonotonicTime,
) -> Result<Option<MonotonicTime>, ExecutionError> {
// Function pulling the next action. If the action is periodic, it is // Function pulling the next action. If the action is periodic, it is
// immediately re-scheduled. // immediately re-scheduled.
fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action { fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action {
@ -380,7 +421,10 @@ impl Simulation {
// Move to the next scheduled time. // Move to the next scheduled time.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = peek_next_key(&mut scheduler_queue)?; let mut current_key = match peek_next_key(&mut scheduler_queue) {
Some(key) => key,
None => return Ok(None),
};
self.time.write(current_key.0); self.time.write(current_key.0);
loop { loop {
@ -420,9 +464,9 @@ impl Simulation {
let current_time = current_key.0; let current_time = current_key.0;
// TODO: check synchronization status? // TODO: check synchronization status?
self.clock.synchronize(current_time); self.clock.synchronize(current_time);
self.executor.run(); self.run()?;
return Some(current_time); return Ok(Some(current_time));
} }
}; };
} }
@ -437,18 +481,19 @@ impl Simulation {
/// ///
/// This method does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
fn step_until_unchecked(&mut self, target_time: MonotonicTime) { fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> {
loop { loop {
match self.step_to_next_bounded(target_time) { match self.step_to_next_bounded(target_time) {
// The target time was reached exactly. // The target time was reached exactly.
Some(t) if t == target_time => return, Ok(Some(t)) if t == target_time => return Ok(()),
// No actions are scheduled before or at the target time. // No actions are scheduled before or at the target time.
None => { Ok(None) => {
// Update the simulation time. // Update the simulation time.
self.time.write(target_time); self.time.write(target_time);
self.clock.synchronize(target_time); self.clock.synchronize(target_time);
return; return Ok(());
} }
Err(e) => return Err(e),
// The target time was not reached yet. // The target time was not reached yet.
_ => {} _ => {}
} }
@ -464,20 +509,142 @@ impl fmt::Debug for Simulation {
} }
} }
/// Error returned when a query did not obtain a response. /// Information regarding a deadlocked model.
/// #[derive(Clone, Debug, PartialEq, Eq, Hash)]
/// This can happen either because the model targeted by the address was not pub struct DeadlockInfo {
/// added to the simulation or due to a simulation deadlock. /// Name of the deadlocked model.
#[derive(Debug, PartialEq, Eq, Clone, Copy)] pub model_name: String,
pub struct QueryError {} /// Number of messages in the mailbox.
pub mailbox_size: usize,
}
impl fmt::Display for QueryError { /// An error returned upon simulation execution failure.
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { ///
write!(fmt, "the query did not receive a response") /// Note that if a `Deadlock`, `ModelError` or `ModelPanic` is returned, any
/// subsequent attempt to run the simulation will return `Terminated`.
#[derive(Debug)]
pub enum ExecutionError {
/// The simulation has deadlocked.
///
/// Enlists all models with non-empty mailboxes.
Deadlock(Vec<DeadlockInfo>),
/// A model has aborted the simulation.
ModelError {
/// Name of the model.
model_name: String,
/// Error registered by the model.
error: Box<dyn Error>,
},
/// A panic was caught during execution with the message contained in the
/// payload.
Panic(String),
/// The specified target simulation time is in the past of the current
/// simulation time.
InvalidTargetTime(MonotonicTime),
/// The query was invalid and did not obtain a response.
BadQuery,
/// The simulation has been terminated due to an earlier deadlock, model
/// error or model panic.
Terminated,
}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Deadlock(list) => {
f.write_str(
"a simulation deadlock has been detected that involves the following models: ",
)?;
let mut first_item = true;
for info in list {
if first_item {
first_item = false;
} else {
f.write_str(", ")?;
}
write!(
f,
"'{}' ({} item{} in mailbox)",
info.model_name,
info.mailbox_size,
if info.mailbox_size == 1 { "" } else { "s" }
)?;
}
Ok(())
}
Self::ModelError { model_name, error } => {
write!(
f,
"the simulation has been aborted by model '{}' with the following error: {}",
model_name, error
)
}
Self::Panic(msg) => {
f.write_str("a panic has been caught during simulation:\n")?;
f.write_str(msg)
}
Self::InvalidTargetTime(time) => {
write!(
f,
"target simulation stamp {} lies in the past of the current simulation time",
time
)
}
Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"),
Self::Terminated => f.write_str("the simulation has been terminated"),
}
} }
} }
impl Error for QueryError {} impl Error for ExecutionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
if let Self::ModelError { error, .. } = &self {
Some(error.as_ref())
} else {
None
}
}
}
/// An error returned upon simulation execution or scheduling failure.
#[derive(Debug)]
pub enum SimulationError {
/// The execution of the simulation failed.
ExecutionError(ExecutionError),
/// An attempt to schedule an item failed.
SchedulingError(SchedulingError),
}
impl fmt::Display for SimulationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::ExecutionError(e) => e.fmt(f),
Self::SchedulingError(e) => e.fmt(f),
}
}
}
impl Error for SimulationError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::ExecutionError(e) => e.source(),
Self::SchedulingError(e) => e.source(),
}
}
}
impl From<ExecutionError> for SimulationError {
fn from(e: ExecutionError) -> Self {
Self::ExecutionError(e)
}
}
impl From<SchedulingError> for SimulationError {
fn from(e: SchedulingError) -> Self {
Self::SchedulingError(e)
}
}
/// Adds a model and its mailbox to the simulation bench. /// Adds a model and its mailbox to the simulation bench.
pub(crate) fn add_model<M: Model>( pub(crate) fn add_model<M: Model>(

View File

@ -63,6 +63,13 @@ impl Scheduler {
/// model, these events are guaranteed to be processed according to the /// model, these events are guaranteed to be processed according to the
/// scheduling order of the actions. /// scheduling order of the actions.
pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> { pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> {
// The scheduler queue must always be locked when reading the time,
// otherwise the following race could occur:
// 1) this method reads the time and concludes that it is not too late
// to schedule the action,
// 2) the `Simulation` object takes the lock, increments simulation time
// and runs the simulation step,
// 3) this method takes the lock and schedules the now-outdated action.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time(); let now = self.time();

View File

@ -1,6 +1,7 @@
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::channel::ChannelObserver;
use crate::executor::{Executor, SimulationContext}; use crate::executor::{Executor, SimulationContext};
use crate::model::Model; use crate::model::Model;
use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime}; use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime};
@ -8,7 +9,7 @@ use crate::time::{Clock, NoClock};
use crate::util::priority_queue::PriorityQueue; use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell; use crate::util::sync_cell::SyncCell;
use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation}; use super::{add_model, ExecutionError, Mailbox, Scheduler, SchedulerQueue, Simulation};
/// Builder for a multi-threaded, discrete-event simulation. /// Builder for a multi-threaded, discrete-event simulation.
pub struct SimInit { pub struct SimInit {
@ -16,6 +17,7 @@ pub struct SimInit {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
} }
impl SimInit { impl SimInit {
@ -49,6 +51,7 @@ impl SimInit {
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
time, time,
clock: Box::new(NoClock::new()), clock: Box::new(NoClock::new()),
observers: Vec::new(),
} }
} }
@ -58,13 +61,16 @@ impl SimInit {
/// is used for convenience for the model instance identification (e.g. for /// is used for convenience for the model instance identification (e.g. for
/// logging purposes). /// logging purposes).
pub fn add_model<M: Model>( pub fn add_model<M: Model>(
self, mut self,
model: M, model: M,
mailbox: Mailbox<M>, mailbox: Mailbox<M>,
name: impl Into<String>, name: impl Into<String>,
) -> Self { ) -> Self {
let name = name.into();
self.observers
.push((name.clone(), Box::new(mailbox.0.observer())));
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
add_model(model, mailbox, name.into(), scheduler, &self.executor); add_model(model, mailbox, name, scheduler, &self.executor);
self self
} }
@ -82,12 +88,20 @@ impl SimInit {
/// Builds a simulation initialized at the specified simulation time, /// Builds a simulation initialized at the specified simulation time,
/// executing the [`Model::init()`](crate::model::Model::init) method on all /// executing the [`Model::init()`](crate::model::Model::init) method on all
/// model initializers. /// model initializers.
pub fn init(mut self, start_time: MonotonicTime) -> Simulation { pub fn init(mut self, start_time: MonotonicTime) -> Result<Simulation, ExecutionError> {
self.time.write(start_time); self.time.write(start_time);
self.clock.synchronize(start_time); self.clock.synchronize(start_time);
self.executor.run();
Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock) let mut simulation = Simulation::new(
self.executor,
self.scheduler_queue,
self.time,
self.clock,
self.observers,
);
simulation.run()?;
Ok(simulation)
} }
} }

View File

@ -38,13 +38,13 @@ fn model_schedule_event() {
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap();
simu.process_event(TestModel::trigger, (), addr); simu.process_event(TestModel::trigger, (), addr).unwrap();
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(simu.time(), t0 + Duration::from_secs(2));
assert!(output.next().is_some()); assert!(output.next().is_some());
simu.step(); simu.step().unwrap();
assert!(output.next().is_none()); assert!(output.next().is_none());
} }
@ -93,13 +93,13 @@ fn model_cancel_future_keyed_event() {
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap();
simu.process_event(TestModel::trigger, (), addr); simu.process_event(TestModel::trigger, (), addr).unwrap();
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(1)); assert_eq!(simu.time(), t0 + Duration::from_secs(1));
assert_eq!(output.next(), Some(1)); assert_eq!(output.next(), Some(1));
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(1)); assert_eq!(simu.time(), t0 + Duration::from_secs(1));
assert!(output.next().is_none()); assert!(output.next().is_none());
} }
@ -149,14 +149,14 @@ fn model_cancel_same_time_keyed_event() {
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap();
simu.process_event(TestModel::trigger, (), addr); simu.process_event(TestModel::trigger, (), addr).unwrap();
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(simu.time(), t0 + Duration::from_secs(2));
assert_eq!(output.next(), Some(1)); assert_eq!(output.next(), Some(1));
assert!(output.next().is_none()); assert!(output.next().is_none());
simu.step(); simu.step().unwrap();
assert!(output.next().is_none()); assert!(output.next().is_none());
} }
@ -192,13 +192,13 @@ fn model_schedule_periodic_event() {
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap();
simu.process_event(TestModel::trigger, (), addr); simu.process_event(TestModel::trigger, (), addr).unwrap();
// Move to the next events at t0 + 2s + k*3s. // Move to the next events at t0 + 2s + k*3s.
for k in 0..10 { for k in 0..10 {
simu.step(); simu.step().unwrap();
assert_eq!( assert_eq!(
simu.time(), simu.time(),
t0 + Duration::from_secs(2) + k * Duration::from_secs(3) t0 + Duration::from_secs(2) + k * Duration::from_secs(3)
@ -243,16 +243,16 @@ fn model_cancel_periodic_event() {
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0); let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap();
simu.process_event(TestModel::trigger, (), addr); simu.process_event(TestModel::trigger, (), addr).unwrap();
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(simu.time(), t0 + Duration::from_secs(2));
assert!(output.next().is_some()); assert!(output.next().is_some());
assert!(output.next().is_none()); assert!(output.next().is_none());
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(simu.time(), t0 + Duration::from_secs(2));
assert!(output.next().is_none()); assert!(output.next().is_none());
} }

View File

@ -0,0 +1,211 @@
//! Deadlock-detection for model loops.
use asynchronix::model::Model;
use asynchronix::ports::{Output, Requestor};
use asynchronix::simulation::{DeadlockInfo, ExecutionError, Mailbox, SimInit};
use asynchronix::time::MonotonicTime;
#[derive(Default)]
struct TestModel {
output: Output<()>,
requestor: Requestor<(), ()>,
}
impl TestModel {
async fn activate_output(&mut self) {
self.output.send(()).await;
}
async fn activate_requestor(&mut self) {
let _ = self.requestor.send(()).await;
}
}
impl Model for TestModel {}
/// Overflows a mailbox by sending 2 messages in loopback for each incoming
/// message.
#[test]
fn deadlock_on_mailbox_overflow() {
const MODEL_NAME: &str = "testmodel";
const MAILBOX_SIZE: usize = 5;
let mut model = TestModel::default();
let mbox = Mailbox::with_capacity(MAILBOX_SIZE);
let addr = mbox.address();
// Make two self-connections so that each outgoing message generates two
// incoming messages.
model
.output
.connect(TestModel::activate_output, addr.clone());
model
.output
.connect(TestModel::activate_output, addr.clone());
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new()
.add_model(model, mbox, MODEL_NAME)
.init(t0)
.unwrap();
match simu.process_event(TestModel::activate_output, (), addr) {
Err(ExecutionError::Deadlock(deadlock_info)) => {
// We expect only 1 deadlocked model.
assert_eq!(deadlock_info.len(), 1);
// We expect the mailbox to be full.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL_NAME.into(),
mailbox_size: MAILBOX_SIZE
}
)
}
_ => panic!("deadlock not detected"),
}
}
/// Generates a deadlock with a query loopback.
#[test]
fn deadlock_on_query_loopback() {
const MODEL_NAME: &str = "testmodel";
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
model
.requestor
.connect(TestModel::activate_requestor, addr.clone());
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new()
.add_model(model, mbox, MODEL_NAME)
.init(t0)
.unwrap();
match simu.process_query(TestModel::activate_requestor, (), addr) {
Err(ExecutionError::Deadlock(deadlock_info)) => {
// We expect only 1 deadlocked model.
assert_eq!(deadlock_info.len(), 1);
// We expect the mailbox to have a single query.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL_NAME.into(),
mailbox_size: 1,
}
);
}
_ => panic!("deadlock not detected"),
}
}
/// Generates a deadlock with a query loopback involving several models.
#[test]
fn deadlock_on_transitive_query_loopback() {
const MODEL1_NAME: &str = "testmodel1";
const MODEL2_NAME: &str = "testmodel2";
let mut model1 = TestModel::default();
let mut model2 = TestModel::default();
let mbox1 = Mailbox::new();
let mbox2 = Mailbox::new();
let addr1 = mbox1.address();
let addr2 = mbox2.address();
model1
.requestor
.connect(TestModel::activate_requestor, addr2);
model2
.requestor
.connect(TestModel::activate_requestor, addr1.clone());
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new()
.add_model(model1, mbox1, MODEL1_NAME)
.add_model(model2, mbox2, MODEL2_NAME)
.init(t0)
.unwrap();
match simu.process_query(TestModel::activate_requestor, (), addr1) {
Err(ExecutionError::Deadlock(deadlock_info)) => {
// We expect only 1 deadlocked model.
assert_eq!(deadlock_info.len(), 1);
// We expect the mailbox of this model to have a single query.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL1_NAME.into(),
mailbox_size: 1,
}
);
}
_ => panic!("deadlock not detected"),
}
}
/// Generates deadlocks with query loopbacks on several models at the same time.
#[test]
fn deadlock_on_multiple_query_loopback() {
const MODEL0_NAME: &str = "testmodel0";
const MODEL1_NAME: &str = "testmodel1";
const MODEL2_NAME: &str = "testmodel2";
let mut model0 = TestModel::default();
let mut model1 = TestModel::default();
let mut model2 = TestModel::default();
let mbox0 = Mailbox::new();
let mbox1 = Mailbox::new();
let mbox2 = Mailbox::new();
let addr0 = mbox0.address();
let addr1 = mbox1.address();
let addr2 = mbox2.address();
model0
.requestor
.connect(TestModel::activate_requestor, addr1.clone());
model0
.requestor
.connect(TestModel::activate_requestor, addr2.clone());
model1
.requestor
.connect(TestModel::activate_requestor, addr1);
model2
.requestor
.connect(TestModel::activate_requestor, addr2);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new()
.add_model(model0, mbox0, MODEL0_NAME)
.add_model(model1, mbox1, MODEL1_NAME)
.add_model(model2, mbox2, MODEL2_NAME)
.init(t0)
.unwrap();
match simu.process_query(TestModel::activate_requestor, (), addr0) {
Err(ExecutionError::Deadlock(deadlock_info)) => {
// We expect 2 deadlocked models.
assert_eq!(deadlock_info.len(), 2);
// We expect the mailbox of each deadlocked model to have a single
// query.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL1_NAME.into(),
mailbox_size: 1,
}
);
assert_eq!(
deadlock_info[1],
DeadlockInfo {
model_name: MODEL2_NAME.into(),
mailbox_size: 1,
}
);
}
_ => panic!("deadlock not detected"),
}
}

View File

@ -38,7 +38,7 @@ fn passthrough_bench<T: Clone + Send + 'static>(
model.output.connect_sink(&out_stream); model.output.connect_sink(&out_stream);
let addr = mbox.address(); let addr = mbox.address();
let simu = SimInit::new().add_model(model, mbox, "").init(t0); let simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap();
(simu, addr, out_stream) (simu, addr, out_stream)
} }
@ -64,7 +64,7 @@ fn simulation_schedule_events() {
.unwrap(); .unwrap();
// Move to the 1st event at t0+2s. // Move to the 1st event at t0+2s.
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(simu.time(), t0 + Duration::from_secs(2));
assert!(output.next().is_some()); assert!(output.next().is_some());
@ -74,12 +74,12 @@ fn simulation_schedule_events() {
.unwrap(); .unwrap();
// Move to the 2nd event at t0+3s. // Move to the 2nd event at t0+3s.
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(3)); assert_eq!(simu.time(), t0 + Duration::from_secs(3));
assert!(output.next().is_some()); assert!(output.next().is_some());
// Move to the 3rd event at t0+6s. // Move to the 3rd event at t0+6s.
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(6)); assert_eq!(simu.time(), t0 + Duration::from_secs(6));
assert!(output.next().is_some()); assert!(output.next().is_some());
assert!(output.next().is_none()); assert!(output.next().is_none());
@ -110,7 +110,7 @@ fn simulation_schedule_keyed_events() {
.unwrap(); .unwrap();
// Move to the 1st event at t0+1. // Move to the 1st event at t0+1.
simu.step(); simu.step().unwrap();
// Try to cancel the 1st event after it has already taken place and check // Try to cancel the 1st event after it has already taken place and check
// that the cancellation had no effect. // that the cancellation had no effect.
@ -121,7 +121,7 @@ fn simulation_schedule_keyed_events() {
// Cancel the second event (t0+2) before it is meant to takes place and // Cancel the second event (t0+2) before it is meant to takes place and
// check that we move directly to the 3rd event. // check that we move directly to the 3rd event.
event_t2_1.cancel(); event_t2_1.cancel();
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(2)); assert_eq!(simu.time(), t0 + Duration::from_secs(2));
assert_eq!(output.next(), Some(22)); assert_eq!(output.next(), Some(22));
assert!(output.next().is_none()); assert!(output.next().is_none());
@ -156,7 +156,7 @@ fn simulation_schedule_periodic_events() {
// Move to the next events at t0 + 3s + k*2s. // Move to the next events at t0 + 3s + k*2s.
for k in 0..10 { for k in 0..10 {
simu.step(); simu.step().unwrap();
assert_eq!( assert_eq!(
simu.time(), simu.time(),
t0 + Duration::from_secs(3) + k * Duration::from_secs(2) t0 + Duration::from_secs(3) + k * Duration::from_secs(2)
@ -195,7 +195,7 @@ fn simulation_schedule_periodic_keyed_events() {
.unwrap(); .unwrap();
// Move to the next event at t0+3s. // Move to the next event at t0+3s.
simu.step(); simu.step().unwrap();
assert_eq!(simu.time(), t0 + Duration::from_secs(3)); assert_eq!(simu.time(), t0 + Duration::from_secs(3));
assert_eq!(output.next(), Some(1)); assert_eq!(output.next(), Some(1));
assert_eq!(output.next(), Some(2)); assert_eq!(output.next(), Some(2));
@ -206,7 +206,7 @@ fn simulation_schedule_periodic_keyed_events() {
// Move to the next events at t0 + 3s + k*2s. // Move to the next events at t0 + 3s + k*2s.
for k in 1..10 { for k in 1..10 {
simu.step(); simu.step().unwrap();
assert_eq!( assert_eq!(
simu.time(), simu.time(),
t0 + Duration::from_secs(3) + k * Duration::from_secs(2) t0 + Duration::from_secs(3) + k * Duration::from_secs(2)
@ -263,7 +263,8 @@ fn timestamp_bench(
let simu = SimInit::new() let simu = SimInit::new()
.add_model(model, mbox, "") .add_model(model, mbox, "")
.set_clock(clock) .set_clock(clock)
.init(t0); .init(t0)
.unwrap();
(simu, addr, stamp_stream) (simu, addr, stamp_stream)
} }
@ -320,7 +321,7 @@ fn simulation_system_clock_from_instant() {
measured_time, measured_time,
); );
simu.step(); simu.step().unwrap();
} }
} }
} }
@ -383,7 +384,7 @@ fn simulation_system_clock_from_system_time() {
measured_time, measured_time,
); );
simu.step(); simu.step().unwrap();
} }
} }
} }
@ -431,6 +432,6 @@ fn simulation_auto_system_clock() {
measured_time, measured_time,
); );
simu.step(); simu.step().unwrap();
} }
} }

View File

@ -1,4 +1,6 @@
#[cfg(not(asynchronix_loom))] #[cfg(not(asynchronix_loom))]
mod model_scheduling; mod model_scheduling;
#[cfg(not(asynchronix_loom))] #[cfg(not(asynchronix_loom))]
mod simulation_deadlock;
#[cfg(not(asynchronix_loom))]
mod simulation_scheduling; mod simulation_scheduling;