diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index 03ad9a1..d7eb413 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -93,6 +93,13 @@ impl Receiver { } } + /// Creates a new observer. + pub(crate) fn observer(&self) -> impl ChannelObserver { + Observer { + inner: self.inner.clone(), + } + } + /// Receives and executes a message asynchronously, if necessary waiting /// until one becomes available. pub(crate) async fn recv( @@ -116,12 +123,13 @@ impl Receiver { // Decrement the count of in-flight messages. THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1)); - // Consume the message to obtain a boxed future. + // Take the message to obtain a boxed future. let fut = msg.call_once(model, context, self.future_box.take().unwrap()); - // Now that `msg` was consumed and its slot in the queue was - // freed, signal to one awaiting sender that one slot is + // Now that the message was taken, drop `msg` to free its slot + // in the queue and signal to one awaiting sender that a slot is // available for sending. + drop(msg); self.inner.sender_signal.notify_one(); // Await the future provided by the message. @@ -290,6 +298,37 @@ impl Clone for Sender { } } +/// A model-independent handle to a channel that can observe the current number +/// of messages. +pub(crate) trait ChannelObserver: Send { + /// Returns the current number of messages in the channel. + /// + /// # Warning + /// + /// The returned result is only meaningful if it can be established than + /// there are no concurrent send or receive operations on the channel. + /// Otherwise, the returned value may neither reflect the current state nor + /// the past state of the channel, and may be greater than the capacity of + /// the channel. + fn len(&self) -> usize; +} + +/// A handle to a channel that can observe the current number of messages. +/// +/// Multiple [`Observer`]s can be created using the [`Receiver::observer`] +/// method or via cloning. +#[derive(Clone)] +pub(crate) struct Observer { + /// Shared data. + inner: Arc>, +} + +impl ChannelObserver for Observer { + fn len(&self) -> usize { + self.inner.queue.len() + } +} + impl Drop for Sender { fn drop(&mut self) { // Decrease the reference count of senders. diff --git a/asynchronix/src/channel/queue.rs b/asynchronix/src/channel/queue.rs index 4c4f37b..1d5336b 100644 --- a/asynchronix/src/channel/queue.rs +++ b/asynchronix/src/channel/queue.rs @@ -122,7 +122,7 @@ pub(super) struct Queue { /// and the producer. The reason it is shared is that the drop handler of /// the last `Inner` owner (which may be a producer) needs access to the /// dequeue position. - dequeue_pos: CachePadded>, + dequeue_pos: CachePadded, /// Buffer holding the closures and their stamps. buffer: Box<[Slot]>, @@ -160,7 +160,7 @@ impl Queue { Queue { enqueue_pos: CachePadded::new(AtomicUsize::new(0)), - dequeue_pos: CachePadded::new(UnsafeCell::new(0)), + dequeue_pos: CachePadded::new(AtomicUsize::new(0)), buffer: buffer.into(), right_mask, closed_channel_mask, @@ -241,7 +241,7 @@ impl Queue { /// /// This method may not be called concurrently from multiple threads. pub(super) unsafe fn pop(&self) -> Result, PopError> { - let dequeue_pos = self.dequeue_pos.with(|p| *p); + let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed); let index = dequeue_pos & self.right_mask; let slot = &self.buffer[index]; let stamp = slot.stamp.load(Ordering::Acquire); @@ -251,10 +251,10 @@ impl Queue { // closure can be popped. debug_or_loom_assert_eq!(stamp, dequeue_pos + 1); - // Only this thread can access the dequeue position so there is no + // Only this thread can modify the dequeue position so there is no // need to increment the position atomically with a `fetch_add`. self.dequeue_pos - .with_mut(|p| *p = self.next_queue_pos(dequeue_pos)); + .store(self.next_queue_pos(dequeue_pos), Ordering::Relaxed); // Extract the closure from the slot and set the stamp to the value of // the dequeue position increased by one sequence increment. @@ -318,6 +318,30 @@ impl Queue { self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0 } + /// Returns the number of items in the queue. + /// + /// # Warning + /// + /// While this method is safe by Rust's standard, the returned result is + /// only meaningful if it can be established than there are no concurrent + /// `push` or `pop` operations. Otherwise, the returned value may neither + /// reflect the current state nor the past state of the queue, and may be + /// greater than the capacity of the queue. + pub(super) fn len(&self) -> usize { + let enqueue_pos = self.enqueue_pos.load(Ordering::Relaxed); + let dequeue_pos = self.dequeue_pos.load(Ordering::Relaxed); + let enqueue_idx = enqueue_pos & (self.right_mask >> 1); + let dequeue_idx = dequeue_pos & (self.right_mask >> 1); + + // Establish whether the sequence numbers of the enqueue and dequeue + // positions differ. If yes, it means the enqueue position has wrapped + // around one more time so the difference between indices must be + // increased by the buffer capacity. + let carry_flag = (enqueue_pos & !self.right_mask) != (dequeue_pos & !self.right_mask); + + (enqueue_idx + (carry_flag as usize) * self.buffer.len()) - dequeue_idx + } + /// Increment the queue position, incrementing the sequence count as well if /// the index wraps to 0. /// @@ -423,6 +447,12 @@ impl Consumer { fn close(&self) { self.inner.close(); } + + /// Returns the number of items. + #[cfg(not(asynchronix_loom))] + fn len(&self) -> usize { + self.inner.len() + } } #[cfg(test)] @@ -569,6 +599,52 @@ mod tests { fn queue_mpsc_capacity_three() { queue_mpsc(3); } + + #[test] + fn queue_len() { + let (p, mut c) = queue(4); + + let _ = p.push(|b| RecycleBox::recycle(b, 0)); + assert_eq!(c.len(), 1); + let _ = p.push(|b| RecycleBox::recycle(b, 1)); + assert_eq!(c.len(), 2); + let _ = c.pop(); + assert_eq!(c.len(), 1); + let _ = p.push(|b| RecycleBox::recycle(b, 2)); + assert_eq!(c.len(), 2); + let _ = p.push(|b| RecycleBox::recycle(b, 3)); + assert_eq!(c.len(), 3); + let _ = c.pop(); + assert_eq!(c.len(), 2); + let _ = p.push(|b| RecycleBox::recycle(b, 4)); + assert_eq!(c.len(), 3); + let _ = c.pop(); + assert_eq!(c.len(), 2); + let _ = p.push(|b| RecycleBox::recycle(b, 5)); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 6)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 7)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 8)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = p.push(|b| RecycleBox::recycle(b, 9)); + assert_eq!(c.len(), 4); + let _ = c.pop(); + assert_eq!(c.len(), 3); + let _ = c.pop(); + assert_eq!(c.len(), 2); + let _ = c.pop(); + assert_eq!(c.len(), 1); + let _ = c.pop(); + assert_eq!(c.len(), 0); + } } /// Loom tests. diff --git a/asynchronix/src/dev_hooks.rs b/asynchronix/src/dev_hooks.rs index 535a5de..8bb7bd3 100644 --- a/asynchronix/src/dev_hooks.rs +++ b/asynchronix/src/dev_hooks.rs @@ -43,6 +43,6 @@ impl Executor { /// Let the executor run, blocking until all futures have completed or until /// the executor deadlocks. pub fn run(&mut self) { - self.0.run(); + self.0.run().unwrap(); } } diff --git a/asynchronix/src/grpc/services/init_service.rs b/asynchronix/src/grpc/services/init_service.rs index c2a05a1..1e485b0 100644 --- a/asynchronix/src/grpc/services/init_service.rs +++ b/asynchronix/src/grpc/services/init_service.rs @@ -72,7 +72,7 @@ impl InitService { .and_then(|start_time| { sim_init .init(start_time) - .map_err(|e| map_execution_error(e)) + .map_err(map_execution_error) .map(|sim| (sim, registry)) }) }); diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index faab6e5..3a92481 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -26,7 +26,7 @@ use super::ReplierFn; /// The `EventSource` port is similar to an [`Output`](crate::ports::Output) /// port in that it can send events to connected input ports. It is not meant, /// however, to be instantiated as a member of a model, but rather as a -/// simulation monitoring endpoint instantiated during bench assembly. +/// simulation control endpoint instantiated during bench assembly. pub struct EventSource { broadcaster: Arc>>, } diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 33216d1..9d8c1fb 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -144,6 +144,7 @@ use std::time::Duration; use recycle_box::{coerce_box, RecycleBox}; +use crate::channel::ChannelObserver; use crate::executor::{Executor, ExecutorError}; use crate::model::{Context, Model, SetupContext}; use crate::ports::{InputFn, ReplierFn}; @@ -194,6 +195,8 @@ pub struct Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + observers: Vec<(String, Box)>, + is_terminated: bool, } impl Simulation { @@ -203,12 +206,15 @@ impl Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + observers: Vec<(String, Box)>, ) -> Self { Self { executor, scheduler_queue, time, clock, + observers, + is_terminated: false, } } @@ -350,9 +356,28 @@ impl Simulation { .map_err(|_| ExecutionError::BadQuery) } + /// Runs the executor. fn run(&mut self) -> Result<(), ExecutionError> { + if self.is_terminated { + return Err(ExecutionError::Terminated); + } + self.executor.run().map_err(|e| match e { - ExecutorError::Deadlock => ExecutionError::Deadlock(Vec::new()), + ExecutorError::Deadlock => { + self.is_terminated = true; + let mut deadlock_info = Vec::new(); + for (name, observer) in &self.observers { + let mailbox_size = observer.len(); + if mailbox_size != 0 { + deadlock_info.push(DeadlockInfo { + model_name: name.clone(), + mailbox_size, + }); + } + } + + ExecutionError::Deadlock(deadlock_info) + } }) } @@ -484,26 +509,13 @@ impl fmt::Debug for Simulation { } } -/// Error returned when a query did not obtain a response. -/// -/// This can happen either because the model targeted by the address was not -/// added to the simulation or due to a simulation deadlock. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct QueryError {} - -impl fmt::Display for QueryError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "the query did not receive a response") - } -} - -impl Error for QueryError {} - /// Information regarding a deadlocked model. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct DeadlockInfo { - model_name: String, - mailbox_size: usize, + /// Name of the deadlocked model. + pub model_name: String, + /// Number of messages in the mailbox. + pub mailbox_size: usize, } /// An error returned upon simulation execution failure. diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 43106a7..efeacf4 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -1,6 +1,7 @@ use std::fmt; use std::sync::{Arc, Mutex}; +use crate::channel::ChannelObserver; use crate::executor::{Executor, SimulationContext}; use crate::model::Model; use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime}; @@ -16,6 +17,7 @@ pub struct SimInit { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + observers: Vec<(String, Box)>, } impl SimInit { @@ -49,6 +51,7 @@ impl SimInit { scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time, clock: Box::new(NoClock::new()), + observers: Vec::new(), } } @@ -58,13 +61,16 @@ impl SimInit { /// is used for convenience for the model instance identification (e.g. for /// logging purposes). pub fn add_model( - self, + mut self, model: M, mailbox: Mailbox, name: impl Into, ) -> Self { + let name = name.into(); + self.observers + .push((name.clone(), Box::new(mailbox.0.observer()))); let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); - add_model(model, mailbox, name.into(), scheduler, &self.executor); + add_model(model, mailbox, name, scheduler, &self.executor); self } @@ -86,8 +92,13 @@ impl SimInit { self.time.write(start_time); self.clock.synchronize(start_time); - let mut simulation = - Simulation::new(self.executor, self.scheduler_queue, self.time, self.clock); + let mut simulation = Simulation::new( + self.executor, + self.scheduler_queue, + self.time, + self.clock, + self.observers, + ); simulation.run()?; Ok(simulation) diff --git a/asynchronix/tests/simulation_deadlock.rs b/asynchronix/tests/simulation_deadlock.rs index 87cebd4..609b1d1 100644 --- a/asynchronix/tests/simulation_deadlock.rs +++ b/asynchronix/tests/simulation_deadlock.rs @@ -2,7 +2,7 @@ use asynchronix::model::Model; use asynchronix::ports::{Output, Requestor}; -use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; +use asynchronix::simulation::{DeadlockInfo, ExecutionError, Mailbox, SimInit}; use asynchronix::time::MonotonicTime; #[derive(Default)] @@ -24,8 +24,11 @@ impl Model for TestModel {} /// message. #[test] fn deadlock_on_mailbox_overflow() { + const MODEL_NAME: &str = "testmodel"; + const MAILBOX_SIZE: usize = 5; + let mut model = TestModel::default(); - let mbox = Mailbox::new(); + let mbox = Mailbox::with_capacity(MAILBOX_SIZE); let addr = mbox.address(); // Make two self-connections so that each outgoing message generates two @@ -38,17 +41,33 @@ fn deadlock_on_mailbox_overflow() { .connect(TestModel::activate_output, addr.clone()); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); + let mut simu = SimInit::new() + .add_model(model, mbox, MODEL_NAME) + .init(t0) + .unwrap(); - assert!(matches!( - simu.process_event(TestModel::activate_output, (), addr), - Err(ExecutionError::Deadlock(_)) - )); + match simu.process_event(TestModel::activate_output, (), addr) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect only 1 deadlocked model. + assert_eq!(deadlock_info.len(), 1); + // We expect the mailbox to be full. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL_NAME.into(), + mailbox_size: MAILBOX_SIZE + } + ) + } + _ => panic!("deadlock not detected"), + } } /// Generates a deadlock with a query loopback. #[test] fn deadlock_on_query_loopback() { + const MODEL_NAME: &str = "testmodel"; + let mut model = TestModel::default(); let mbox = Mailbox::new(); let addr = mbox.address(); @@ -58,17 +77,34 @@ fn deadlock_on_query_loopback() { .connect(TestModel::activate_requestor, addr.clone()); let t0 = MonotonicTime::EPOCH; - let mut simu = SimInit::new().add_model(model, mbox, "").init(t0).unwrap(); + let mut simu = SimInit::new() + .add_model(model, mbox, MODEL_NAME) + .init(t0) + .unwrap(); - assert!(matches!( - simu.process_event(TestModel::activate_requestor, (), addr), - Err(ExecutionError::Deadlock(_)) - )); + match simu.process_query(TestModel::activate_requestor, (), addr) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect only 1 deadlocked model. + assert_eq!(deadlock_info.len(), 1); + // We expect the mailbox to have a single query. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL_NAME.into(), + mailbox_size: 1, + } + ); + } + _ => panic!("deadlock not detected"), + } } /// Generates a deadlock with a query loopback involving several models. #[test] fn deadlock_on_transitive_query_loopback() { + const MODEL1_NAME: &str = "testmodel1"; + const MODEL2_NAME: &str = "testmodel2"; + let mut model1 = TestModel::default(); let mut model2 = TestModel::default(); let mbox1 = Mailbox::new(); @@ -78,7 +114,7 @@ fn deadlock_on_transitive_query_loopback() { model1 .requestor - .connect(TestModel::activate_requestor, addr2.clone()); + .connect(TestModel::activate_requestor, addr2); model2 .requestor @@ -86,13 +122,90 @@ fn deadlock_on_transitive_query_loopback() { let t0 = MonotonicTime::EPOCH; let mut simu = SimInit::new() - .add_model(model1, mbox1, "") - .add_model(model2, mbox2, "") + .add_model(model1, mbox1, MODEL1_NAME) + .add_model(model2, mbox2, MODEL2_NAME) .init(t0) .unwrap(); - assert!(matches!( - simu.process_event(TestModel::activate_requestor, (), addr1), - Err(ExecutionError::Deadlock(_)) - )); + match simu.process_query(TestModel::activate_requestor, (), addr1) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect only 1 deadlocked model. + assert_eq!(deadlock_info.len(), 1); + // We expect the mailbox of this model to have a single query. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL1_NAME.into(), + mailbox_size: 1, + } + ); + } + _ => panic!("deadlock not detected"), + } +} + +/// Generates deadlocks with query loopbacks on several models at the same time. +#[test] +fn deadlock_on_multiple_query_loopback() { + const MODEL0_NAME: &str = "testmodel0"; + const MODEL1_NAME: &str = "testmodel1"; + const MODEL2_NAME: &str = "testmodel2"; + + let mut model0 = TestModel::default(); + let mut model1 = TestModel::default(); + let mut model2 = TestModel::default(); + let mbox0 = Mailbox::new(); + let mbox1 = Mailbox::new(); + let mbox2 = Mailbox::new(); + let addr0 = mbox0.address(); + let addr1 = mbox1.address(); + let addr2 = mbox2.address(); + + model0 + .requestor + .connect(TestModel::activate_requestor, addr1.clone()); + + model0 + .requestor + .connect(TestModel::activate_requestor, addr2.clone()); + + model1 + .requestor + .connect(TestModel::activate_requestor, addr1); + + model2 + .requestor + .connect(TestModel::activate_requestor, addr2); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::new() + .add_model(model0, mbox0, MODEL0_NAME) + .add_model(model1, mbox1, MODEL1_NAME) + .add_model(model2, mbox2, MODEL2_NAME) + .init(t0) + .unwrap(); + + match simu.process_query(TestModel::activate_requestor, (), addr0) { + Err(ExecutionError::Deadlock(deadlock_info)) => { + // We expect 2 deadlocked models. + assert_eq!(deadlock_info.len(), 2); + // We expect the mailbox of each deadlocked model to have a single + // query. + assert_eq!( + deadlock_info[0], + DeadlockInfo { + model_name: MODEL1_NAME.into(), + mailbox_size: 1, + } + ); + assert_eq!( + deadlock_info[1], + DeadlockInfo { + model_name: MODEL2_NAME.into(), + mailbox_size: 1, + } + ); + } + _ => panic!("deadlock not detected"), + } }