1
0
forked from ROMEO/nexosim

Report deadlocked models and their malbox size

This commit is contained in:
Serge Barral 2024-10-28 11:15:47 +01:00
parent 1cfaa00f9e
commit e7b64524e0
8 changed files with 304 additions and 53 deletions

View File

@ -93,6 +93,13 @@ impl<M: Model> Receiver<M> {
} }
} }
/// Creates a new observer.
pub(crate) fn observer(&self) -> impl ChannelObserver {
Observer {
inner: self.inner.clone(),
}
}
/// Receives and executes a message asynchronously, if necessary waiting /// Receives and executes a message asynchronously, if necessary waiting
/// until one becomes available. /// until one becomes available.
pub(crate) async fn recv( pub(crate) async fn recv(
@ -116,12 +123,13 @@ impl<M: Model> Receiver<M> {
// Decrement the count of in-flight messages. // Decrement the count of in-flight messages.
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1)); THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));
// Consume the message to obtain a boxed future. // Take the message to obtain a boxed future.
let fut = msg.call_once(model, context, self.future_box.take().unwrap()); let fut = msg.call_once(model, context, self.future_box.take().unwrap());
// Now that `msg` was consumed and its slot in the queue was // Now that the message was taken, drop `msg` to free its slot
// freed, signal to one awaiting sender that one slot is // in the queue and signal to one awaiting sender that a slot is
// available for sending. // available for sending.
drop(msg);
self.inner.sender_signal.notify_one(); self.inner.sender_signal.notify_one();
// Await the future provided by the message. // Await the future provided by the message.
@ -290,6 +298,37 @@ impl<M> Clone for Sender<M> {
} }
} }
/// A model-independent handle to a channel that can observe the current number
/// of messages.
pub(crate) trait ChannelObserver: Send {
/// Returns the current number of messages in the channel.
///
/// # Warning
///
/// The returned result is only meaningful if it can be established than
/// there are no concurrent send or receive operations on the channel.
/// Otherwise, the returned value may neither reflect the current state nor
/// the past state of the channel, and may be greater than the capacity of
/// the channel.
fn len(&self) -> usize;
}
/// A handle to a channel that can observe the current number of messages.
///
/// Multiple [`Observer`]s can be created using the [`Receiver::observer`]
/// method or via cloning.
#[derive(Clone)]
pub(crate) struct Observer<M: 'static> {
/// Shared data.
inner: Arc<Inner<M>>,
}
impl<M: Model> ChannelObserver for Observer<M> {
fn len(&self) -> usize {
self.inner.queue.len()
}
}
impl<M: 'static> Drop for Sender<M> { impl<M: 'static> Drop for Sender<M> {
fn drop(&mut self) { fn drop(&mut self) {
// Decrease the reference count of senders. // Decrease the reference count of senders.

View File

@ -122,7 +122,7 @@ pub(super) struct Queue<T: ?Sized> {
/// and the producer. The reason it is shared is that the drop handler of /// and the producer. The reason it is shared is that the drop handler of
/// the last `Inner` owner (which may be a producer) needs access to the /// the last `Inner` owner (which may be a producer) needs access to the
/// dequeue position. /// dequeue position.
dequeue_pos: CachePadded<UnsafeCell<usize>>, dequeue_pos: CachePadded<AtomicUsize>,
/// Buffer holding the closures and their stamps. /// Buffer holding the closures and their stamps.
buffer: Box<[Slot<T>]>, buffer: Box<[Slot<T>]>,
@ -160,7 +160,7 @@ impl<T: ?Sized> Queue<T> {
Queue { Queue {
enqueue_pos: CachePadded::new(AtomicUsize::new(0)), enqueue_pos: CachePadded::new(AtomicUsize::new(0)),
dequeue_pos: CachePadded::new(UnsafeCell::new(0)), dequeue_pos: CachePadded::new(AtomicUsize::new(0)),
buffer: buffer.into(), buffer: buffer.into(),
right_mask, right_mask,
closed_channel_mask, closed_channel_mask,
@ -241,7 +241,7 @@ impl<T: ?Sized> Queue<T> {
/// ///
/// This method may not be called concurrently from multiple threads. /// This method may not be called concurrently from multiple threads.
pub(super) unsafe fn pop(&self) -> Result<MessageBorrow<'_, T>, PopError> { pub(super) unsafe fn pop(&self) -> Result<MessageBorrow<'_, T>, PopError> {
let dequeue_pos = self.dequeue_pos.with(|p| *p); let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed);
let index = dequeue_pos & self.right_mask; let index = dequeue_pos & self.right_mask;
let slot = &self.buffer[index]; let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire); let stamp = slot.stamp.load(Ordering::Acquire);
@ -251,10 +251,10 @@ impl<T: ?Sized> Queue<T> {
// closure can be popped. // closure can be popped.
debug_or_loom_assert_eq!(stamp, dequeue_pos + 1); debug_or_loom_assert_eq!(stamp, dequeue_pos + 1);
// Only this thread can access the dequeue position so there is no // Only this thread can modify the dequeue position so there is no
// need to increment the position atomically with a `fetch_add`. // need to increment the position atomically with a `fetch_add`.
self.dequeue_pos self.dequeue_pos
.with_mut(|p| *p = self.next_queue_pos(dequeue_pos)); .store(self.next_queue_pos(dequeue_pos), Ordering::Relaxed);
// Extract the closure from the slot and set the stamp to the value of // Extract the closure from the slot and set the stamp to the value of
// the dequeue position increased by one sequence increment. // the dequeue position increased by one sequence increment.
@ -318,6 +318,30 @@ impl<T: ?Sized> Queue<T> {
self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0 self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0
} }
/// Returns the number of items in the queue.
///
/// # Warning
///
/// While this method is safe by Rust's standard, the returned result is
/// only meaningful if it can be established than there are no concurrent
/// `push` or `pop` operations. Otherwise, the returned value may neither
/// reflect the current state nor the past state of the queue, and may be
/// greater than the capacity of the queue.
pub(super) fn len(&self) -> usize {
let enqueue_pos = self.enqueue_pos.load(Ordering::Relaxed);
let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed);
let enqueue_idx = enqueue_pos & (self.right_mask >> 1);
let dequeue_idx = dequeue_pos & (self.right_mask >> 1);
// Establish whether the sequence numbers of the enqueue and dequeue
// positions differ. If yes, it means the enqueue position has wrapped
// around one more time so the difference between indices must be
// increased by the buffer capacity.
let carry_flag = (enqueue_pos & !self.right_mask) != (dequeue_pos & !self.right_mask);
(enqueue_idx + (carry_flag as usize) * self.buffer.len()) - dequeue_idx
}
/// Increment the queue position, incrementing the sequence count as well if /// Increment the queue position, incrementing the sequence count as well if
/// the index wraps to 0. /// the index wraps to 0.
/// ///
@ -423,6 +447,12 @@ impl<T: ?Sized> Consumer<T> {
fn close(&self) { fn close(&self) {
self.inner.close(); self.inner.close();
} }
/// Returns the number of items.
#[cfg(not(asynchronix_loom))]
fn len(&self) -> usize {
self.inner.len()
}
} }
#[cfg(test)] #[cfg(test)]
@ -569,6 +599,52 @@ mod tests {
fn queue_mpsc_capacity_three() { fn queue_mpsc_capacity_three() {
queue_mpsc(3); queue_mpsc(3);
} }
#[test]
fn queue_len() {
let (p, mut c) = queue(4);
let _ = p.push(|b| RecycleBox::recycle(b, 0));
assert_eq!(c.len(), 1);
let _ = p.push(|b| RecycleBox::recycle(b, 1));
assert_eq!(c.len(), 2);
let _ = c.pop();
assert_eq!(c.len(), 1);
let _ = p.push(|b| RecycleBox::recycle(b, 2));
assert_eq!(c.len(), 2);
let _ = p.push(|b| RecycleBox::recycle(b, 3));
assert_eq!(c.len(), 3);
let _ = c.pop();
assert_eq!(c.len(), 2);
let _ = p.push(|b| RecycleBox::recycle(b, 4));
assert_eq!(c.len(), 3);
let _ = c.pop();
assert_eq!(c.len(), 2);
let _ = p.push(|b| RecycleBox::recycle(b, 5));
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 6));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 7));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 8));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = p.push(|b| RecycleBox::recycle(b, 9));
assert_eq!(c.len(), 4);
let _ = c.pop();
assert_eq!(c.len(), 3);
let _ = c.pop();
assert_eq!(c.len(), 2);
let _ = c.pop();
assert_eq!(c.len(), 1);
let _ = c.pop();
assert_eq!(c.len(), 0);
}
} }
/// Loom tests. /// Loom tests.

View File

@ -43,6 +43,6 @@ impl Executor {
/// Let the executor run, blocking until all futures have completed or until /// Let the executor run, blocking until all futures have completed or until
/// the executor deadlocks. /// the executor deadlocks.
pub fn run(&mut self) { pub fn run(&mut self) {
self.0.run(); self.0.run().unwrap();
} }
} }

View File

@ -72,7 +72,7 @@ impl InitService {
.and_then(|start_time| { .and_then(|start_time| {
sim_init sim_init
.init(start_time) .init(start_time)
.map_err(|e| map_execution_error(e)) .map_err(map_execution_error)
.map(|sim| (sim, registry)) .map(|sim| (sim, registry))
}) })
}); });

View File

@ -26,7 +26,7 @@ use super::ReplierFn;
/// The `EventSource` port is similar to an [`Output`](crate::ports::Output) /// The `EventSource` port is similar to an [`Output`](crate::ports::Output)
/// port in that it can send events to connected input ports. It is not meant, /// port in that it can send events to connected input ports. It is not meant,
/// however, to be instantiated as a member of a model, but rather as a /// however, to be instantiated as a member of a model, but rather as a
/// simulation monitoring endpoint instantiated during bench assembly. /// simulation control endpoint instantiated during bench assembly.
pub struct EventSource<T: Clone + Send + 'static> { pub struct EventSource<T: Clone + Send + 'static> {
broadcaster: Arc<Mutex<EventBroadcaster<T>>>, broadcaster: Arc<Mutex<EventBroadcaster<T>>>,
} }

View File

@ -144,6 +144,7 @@ use std::time::Duration;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel::ChannelObserver;
use crate::executor::{Executor, ExecutorError}; use crate::executor::{Executor, ExecutorError};
use crate::model::{Context, Model, SetupContext}; use crate::model::{Context, Model, SetupContext};
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
@ -194,6 +195,8 @@ pub struct Simulation {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock>, clock: Box<dyn Clock>,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
is_terminated: bool,
} }
impl Simulation { impl Simulation {
@ -203,12 +206,15 @@ impl Simulation {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
) -> Self { ) -> Self {
Self { Self {
executor, executor,
scheduler_queue, scheduler_queue,
time, time,
clock, clock,
observers,
is_terminated: false,
} }
} }
@ -350,9 +356,28 @@ impl Simulation {
.map_err(|_| ExecutionError::BadQuery) .map_err(|_| ExecutionError::BadQuery)
} }
/// Runs the executor.
fn run(&mut self) -> Result<(), ExecutionError> { fn run(&mut self) -> Result<(), ExecutionError> {
if self.is_terminated {
return Err(ExecutionError::Terminated);
}
self.executor.run().map_err(|e| match e { self.executor.run().map_err(|e| match e {
ExecutorError::Deadlock => ExecutionError::Deadlock(Vec::new()), ExecutorError::Deadlock => {
self.is_terminated = true;
let mut deadlock_info = Vec::new();
for (name, observer) in &self.observers {
let mailbox_size = observer.len();
if mailbox_size != 0 {
deadlock_info.push(DeadlockInfo {
model_name: name.clone(),
mailbox_size,
});
}
}
ExecutionError::Deadlock(deadlock_info)
}
}) })
} }
@ -484,26 +509,13 @@ impl fmt::Debug for Simulation {
} }
} }
/// Error returned when a query did not obtain a response.
///
/// This can happen either because the model targeted by the address was not
/// added to the simulation or due to a simulation deadlock.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct QueryError {}
impl fmt::Display for QueryError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "the query did not receive a response")
}
}
impl Error for QueryError {}
/// Information regarding a deadlocked model. /// Information regarding a deadlocked model.
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DeadlockInfo { pub struct DeadlockInfo {
model_name: String, /// Name of the deadlocked model.
mailbox_size: usize, pub model_name: String,
/// Number of messages in the mailbox.
pub mailbox_size: usize,
} }
/// An error returned upon simulation execution failure. /// An error returned upon simulation execution failure.

View File

@ -1,6 +1,7 @@
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::channel::ChannelObserver;
use crate::executor::{Executor, SimulationContext}; use crate::executor::{Executor, SimulationContext};
use crate::model::Model; use crate::model::Model;
use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime}; use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime};
@ -16,6 +17,7 @@ pub struct SimInit {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
observers: Vec<(String, Box<dyn ChannelObserver>)>,
} }
impl SimInit { impl SimInit {
@ -49,6 +51,7 @@ impl SimInit {
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
time, time,
clock: Box::new(NoClock::new()), clock: Box::new(NoClock::new()),
observers: Vec::new(),
} }
} }
@ -58,13 +61,16 @@ impl SimInit {
/// is used for convenience for the model instance identification (e.g. for /// is used for convenience for the model instance identification (e.g. for
/// logging purposes). /// logging purposes).
pub fn add_model<M: Model>( pub fn add_model<M: Model>(
self, mut self,
model: M, model: M,
mailbox: Mailbox<M>, mailbox: Mailbox<M>,
name: impl Into<String>, name: impl Into<String>,
) -> Self { ) -> Self {
let name = name.into();
self.observers
.push((name.clone(), Box::new(mailbox.0.observer())));
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
add_model(model, mailbox, name.into(), scheduler, &self.executor); add_model(model, mailbox, name, scheduler, &self.executor);
self self
} }
@ -86,8 +92,13 @@ impl SimInit {
self.time.write(start_time); self.time.write(start_time);
self.clock.synchronize(start_time); self.clock.synchronize(start_time);
let mut simulation = let mut simulation = Simulation::new(
Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock); self.executor,
self.scheduler_queue,
self.time,
self.clock,
self.observers,
);
simulation.run()?; simulation.run()?;
Ok(simulation) Ok(simulation)

View File

@ -2,7 +2,7 @@
use asynchronix::model::Model; use asynchronix::model::Model;
use asynchronix::ports::{Output, Requestor}; use asynchronix::ports::{Output, Requestor};
use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; use asynchronix::simulation::{DeadlockInfo, ExecutionError, Mailbox, SimInit};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
#[derive(Default)] #[derive(Default)]
@ -24,8 +24,11 @@ impl Model for TestModel {}
/// message. /// message.
#[test] #[test]
fn deadlock_on_mailbox_overflow() { fn deadlock_on_mailbox_overflow() {
const MODEL_NAME: &str = "testmodel";
const MAILBOX_SIZE: usize = 5;
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::with_capacity(MAILBOX_SIZE);
let addr = mbox.address(); let addr = mbox.address();
// Make two self-connections so that each outgoing message generates two // Make two self-connections so that each outgoing message generates two
@ -38,17 +41,33 @@ fn deadlock_on_mailbox_overflow() {
.connect(TestModel::activate_output, addr.clone()); .connect(TestModel::activate_output, addr.clone());
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); let mut simu = SimInit::new()
.add_model(model, mbox, MODEL_NAME)
.init(t0)
.unwrap();
assert!(matches!( match simu.process_event(TestModel::activate_output, (), addr) {
simu.process_event(TestModel::activate_output, (), addr), Err(ExecutionError::Deadlock(deadlock_info)) => {
Err(ExecutionError::Deadlock(_)) // We expect only 1 deadlocked model.
)); assert_eq!(deadlock_info.len(), 1);
// We expect the mailbox to be full.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL_NAME.into(),
mailbox_size: MAILBOX_SIZE
}
)
}
_ => panic!("deadlock not detected"),
}
} }
/// Generates a deadlock with a query loopback. /// Generates a deadlock with a query loopback.
#[test] #[test]
fn deadlock_on_query_loopback() { fn deadlock_on_query_loopback() {
const MODEL_NAME: &str = "testmodel";
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let addr = mbox.address(); let addr = mbox.address();
@ -58,17 +77,34 @@ fn deadlock_on_query_loopback() {
.connect(TestModel::activate_requestor, addr.clone()); .connect(TestModel::activate_requestor, addr.clone());
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); let mut simu = SimInit::new()
.add_model(model, mbox, MODEL_NAME)
.init(t0)
.unwrap();
assert!(matches!( match simu.process_query(TestModel::activate_requestor, (), addr) {
simu.process_event(TestModel::activate_requestor, (), addr), Err(ExecutionError::Deadlock(deadlock_info)) => {
Err(ExecutionError::Deadlock(_)) // We expect only 1 deadlocked model.
)); assert_eq!(deadlock_info.len(), 1);
// We expect the mailbox to have a single query.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL_NAME.into(),
mailbox_size: 1,
}
);
}
_ => panic!("deadlock not detected"),
}
} }
/// Generates a deadlock with a query loopback involving several models. /// Generates a deadlock with a query loopback involving several models.
#[test] #[test]
fn deadlock_on_transitive_query_loopback() { fn deadlock_on_transitive_query_loopback() {
const MODEL1_NAME: &str = "testmodel1";
const MODEL2_NAME: &str = "testmodel2";
let mut model1 = TestModel::default(); let mut model1 = TestModel::default();
let mut model2 = TestModel::default(); let mut model2 = TestModel::default();
let mbox1 = Mailbox::new(); let mbox1 = Mailbox::new();
@ -78,7 +114,7 @@ fn deadlock_on_transitive_query_loopback() {
model1 model1
.requestor .requestor
.connect(TestModel::activate_requestor, addr2.clone()); .connect(TestModel::activate_requestor, addr2);
model2 model2
.requestor .requestor
@ -86,13 +122,90 @@ fn deadlock_on_transitive_query_loopback() {
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new() let mut simu = SimInit::new()
.add_model(model1, mbox1, "") .add_model(model1, mbox1, MODEL1_NAME)
.add_model(model2, mbox2, "") .add_model(model2, mbox2, MODEL2_NAME)
.init(t0) .init(t0)
.unwrap(); .unwrap();
assert!(matches!( match simu.process_query(TestModel::activate_requestor, (), addr1) {
simu.process_event(TestModel::activate_requestor, (), addr1), Err(ExecutionError::Deadlock(deadlock_info)) => {
Err(ExecutionError::Deadlock(_)) // We expect only 1 deadlocked model.
)); assert_eq!(deadlock_info.len(), 1);
// We expect the mailbox of this model to have a single query.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL1_NAME.into(),
mailbox_size: 1,
}
);
}
_ => panic!("deadlock not detected"),
}
}
/// Generates deadlocks with query loopbacks on several models at the same time.
#[test]
fn deadlock_on_multiple_query_loopback() {
const MODEL0_NAME: &str = "testmodel0";
const MODEL1_NAME: &str = "testmodel1";
const MODEL2_NAME: &str = "testmodel2";
let mut model0 = TestModel::default();
let mut model1 = TestModel::default();
let mut model2 = TestModel::default();
let mbox0 = Mailbox::new();
let mbox1 = Mailbox::new();
let mbox2 = Mailbox::new();
let addr0 = mbox0.address();
let addr1 = mbox1.address();
let addr2 = mbox2.address();
model0
.requestor
.connect(TestModel::activate_requestor, addr1.clone());
model0
.requestor
.connect(TestModel::activate_requestor, addr2.clone());
model1
.requestor
.connect(TestModel::activate_requestor, addr1);
model2
.requestor
.connect(TestModel::activate_requestor, addr2);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::new()
.add_model(model0, mbox0, MODEL0_NAME)
.add_model(model1, mbox1, MODEL1_NAME)
.add_model(model2, mbox2, MODEL2_NAME)
.init(t0)
.unwrap();
match simu.process_query(TestModel::activate_requestor, (), addr0) {
Err(ExecutionError::Deadlock(deadlock_info)) => {
// We expect 2 deadlocked models.
assert_eq!(deadlock_info.len(), 2);
// We expect the mailbox of each deadlocked model to have a single
// query.
assert_eq!(
deadlock_info[0],
DeadlockInfo {
model_name: MODEL1_NAME.into(),
mailbox_size: 1,
}
);
assert_eq!(
deadlock_info[1],
DeadlockInfo {
model_name: MODEL2_NAME.into(),
mailbox_size: 1,
}
);
}
_ => panic!("deadlock not detected"),
}
} }