From 1be2f48a00b8ebbf74ff0c590ec7b35d095c0dd2 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 6 Mar 2024 16:16:55 +0100 Subject: [PATCH] Revert "Merge pull request #12 from asynchronics/feature/event-sinks" This reverts commit 7e881afb638ccc0dbcfc7b539fc152dc923d63e1, reversing changes made to 9d78e4f72a4c6ff459fc386b2f25beae40b94429. --- .github/workflows/loom.yml | 1 + Cargo.toml | 1 - asynchronix/Cargo.toml | 1 - asynchronix/examples/espresso_machine.rs | 5 +- asynchronix/examples/power_supply.rs | 14 +- asynchronix/examples/stepper_motor.rs | 5 +- asynchronix/src/channel.rs | 4 +- .../src/executor/task/tests/general.rs | 3 +- asynchronix/src/executor/tests.rs | 2 + asynchronix/src/lib.rs | 12 +- asynchronix/src/model.rs | 2 +- asynchronix/src/model/ports.rs | 54 ++- asynchronix/src/model/ports/broadcaster.rs | 221 +++------- .../src/model/ports/broadcaster/task_set.rs | 74 ++-- asynchronix/src/model/ports/sender.rs | 82 ++-- asynchronix/src/simulation.rs | 16 +- asynchronix/src/simulation/endpoints.rs | 69 +++ asynchronix/src/simulation/sim_init.rs | 5 +- asynchronix/src/simulation/sink.rs | 171 -------- asynchronix/src/time/scheduler.rs | 274 +++++++----- asynchronix/src/util.rs | 1 + asynchronix/src/util/priority_queue.rs | 4 +- asynchronix/src/util/spsc_queue.rs | 393 ++++++++++++++++++ asynchronix/tests/model_scheduling.rs | 17 +- asynchronix/tests/simulation_scheduling.rs | 12 +- 25 files changed, 870 insertions(+), 573 deletions(-) create mode 100644 asynchronix/src/simulation/endpoints.rs delete mode 100644 asynchronix/src/simulation/sink.rs create mode 100644 asynchronix/src/util/spsc_queue.rs diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index e739386..7b2c805 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -13,6 +13,7 @@ on: - 'asynchronix/src/model/ports/broadcaster.rs' - 'asynchronix/src/model/ports/broadcaster/**' - 'asynchronix/src/util/slot.rs' + - 'asynchronix/src/util/spsc_queue.rs' - 'asynchronix/src/util/sync_cell.rs' jobs: diff --git a/Cargo.toml b/Cargo.toml index c24bd10..94974a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,2 @@ [workspace] members = ["asynchronix"] -resolver = "2" diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index e3e7c59..2d6b306 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -28,7 +28,6 @@ dev-logs = [] [dependencies] async-event = "0.1" crossbeam-utils = "0.8" -crossbeam-queue = "0.3" diatomic-waker = "0.1" futures-task = "0.3" multishot = "0.3" diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index dd482e8..8167717 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -36,7 +36,7 @@ use std::pin::Pin; use std::time::Duration; use asynchronix::model::{InitializedModel, Model, Output}; -use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; /// Water pump. @@ -364,8 +364,7 @@ fn main() { pump.flow_rate.connect(Tank::set_flow_rate, &tank_mbox); // Model handles for simulation. - let mut flow_rate = EventSlot::new(); - pump.flow_rate.connect_sink(&flow_rate); + let mut flow_rate = pump.flow_rate.connect_slot().0; let controller_addr = controller_mbox.address(); let tank_addr = tank_mbox.address(); diff --git a/asynchronix/examples/power_supply.rs b/asynchronix/examples/power_supply.rs index 233913b..a477912 100644 --- a/asynchronix/examples/power_supply.rs +++ b/asynchronix/examples/power_supply.rs @@ -27,7 +27,7 @@ //! └──────────┘ //! ``` use asynchronix::model::{Model, Output, Requestor}; -use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::time::MonotonicTime; /// Power supply. @@ -124,14 +124,10 @@ fn main() { psu.pwr_out.connect(Load::pwr_in, &load3_mbox); // Model handles for simulation. - let mut psu_power = EventSlot::new(); - let mut load1_power = EventSlot::new(); - let mut load2_power = EventSlot::new(); - let mut load3_power = EventSlot::new(); - psu.power.connect_sink(&psu_power); - load1.power.connect_sink(&load1_power); - load2.power.connect_sink(&load2_power); - load3.power.connect_sink(&load3_power); + let mut psu_power = psu.power.connect_slot().0; + let mut load1_power = load1.power.connect_slot().0; + let mut load2_power = load2.power.connect_slot().0; + let mut load3_power = load3.power.connect_slot().0; let psu_addr = psu_mbox.address(); // Start time (arbitrary since models do not depend on absolute time). diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 409365a..c9937db 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -19,7 +19,7 @@ use std::pin::Pin; use std::time::Duration; use asynchronix::model::{InitializedModel, Model, Output}; -use asynchronix::simulation::{EventQueue, Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::time::{MonotonicTime, Scheduler}; /// Stepper motor. @@ -200,8 +200,7 @@ fn main() { driver.current_out.connect(Motor::current_in, &motor_mbox); // Model handles for simulation. - let mut position = EventQueue::new(); - motor.position.connect_sink(&position); + let mut position = motor.position.connect_stream().0; let motor_addr = motor_mbox.address(); let driver_addr = driver_mbox.address(); diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index f4f8b79..c732477 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -255,8 +255,8 @@ impl Sender { /// All channels are guaranteed to have different identifiers at any given /// time, but an identifier may be reused after all handles to a channel /// have been dropped. - pub(crate) fn channel_id(&self) -> NonZeroUsize { - NonZeroUsize::new(Arc::as_ptr(&self.inner) as usize).unwrap() + pub(crate) fn channel_id(&self) -> ChannelId { + ChannelId(NonZeroUsize::new(&*self.inner as *const Inner as usize).unwrap()) } } diff --git a/asynchronix/src/executor/task/tests/general.rs b/asynchronix/src/executor/task/tests/general.rs index beee857..ba4e2e3 100644 --- a/asynchronix/src/executor/task/tests/general.rs +++ b/asynchronix/src/executor/task/tests/general.rs @@ -1,6 +1,7 @@ +use std::future::Future; use std::ops::Deref; use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::thread; diff --git a/asynchronix/src/executor/tests.rs b/asynchronix/src/executor/tests.rs index 7f63b46..9e21f8d 100644 --- a/asynchronix/src/executor/tests.rs +++ b/asynchronix/src/executor/tests.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + use futures_channel::{mpsc, oneshot}; use futures_util::StreamExt; diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index c7ede82..c27bd70 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -193,7 +193,7 @@ //! # impl Model for Delay {} //! # } //! use std::time::Duration; -//! use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; +//! use asynchronix::simulation::{Mailbox, SimInit}; //! use asynchronix::time::MonotonicTime; //! //! use models::{Delay, Multiplier}; @@ -217,8 +217,7 @@ //! delay1.output.connect(Delay::input, &delay2_mbox); //! //! // Keep handles to the system input and output for the simulation. -//! let mut output_slot = EventSlot::new(); -//! delay2.output.connect_sink(&output_slot); +//! let mut output_slot = delay2.output.connect_slot().0; //! let input_address = multiplier1_mbox.address(); //! //! // Pick an arbitrary simulation start time and build the simulation. @@ -256,7 +255,7 @@ //! //! Simulation outputs can be monitored using //! [`EventSlot`](simulation::EventSlot)s and -//! [`EventQueue`](simulation::EventQueue)s, which can be connected to any +//! [`EventStream`](simulation::EventStream)s, which can be connected to any //! model's output port. While an event slot only gives access to the last value //! sent from a port, an event stream is an iterator that yields all events that //! were sent in first-in-first-out order. @@ -294,7 +293,7 @@ //! # impl Model for Delay {} //! # } //! # use std::time::Duration; -//! # use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; +//! # use asynchronix::simulation::{Mailbox, SimInit}; //! # use asynchronix::time::MonotonicTime; //! # use models::{Delay, Multiplier}; //! # let mut multiplier1 = Multiplier::default(); @@ -309,8 +308,7 @@ //! # multiplier1.output.connect(Multiplier::input, &multiplier2_mbox); //! # multiplier2.output.connect(Delay::input, &delay2_mbox); //! # delay1.output.connect(Delay::input, &delay2_mbox); -//! # let mut output_slot = EventSlot::new(); -//! # delay2.output.connect_sink(&output_slot); +//! # let mut output_slot = delay2.output.connect_slot().0; //! # let input_address = multiplier1_mbox.address(); //! # let t0 = MonotonicTime::EPOCH; //! # let mut simu = SimInit::new() diff --git a/asynchronix/src/model.rs b/asynchronix/src/model.rs index 2414717..e5ea5d1 100644 --- a/asynchronix/src/model.rs +++ b/asynchronix/src/model.rs @@ -132,7 +132,7 @@ //! can be connected to input and requestor ports when assembling the simulation //! bench. However, input ports may instead be defined as private methods if //! they are only used by the model itself to schedule future actions (see the -//! [`Scheduler`] examples). +//! [`Scheduler`](crate::time::Scheduler) examples). //! //! Changing the signature of an input or replier port is not considered to //! alter the public interface of a model provided that the event, request and diff --git a/asynchronix/src/model/ports.rs b/asynchronix/src/model/ports.rs index 6b79539..f296f16 100644 --- a/asynchronix/src/model/ports.rs +++ b/asynchronix/src/model/ports.rs @@ -14,17 +14,18 @@ //! ports should generally be preferred over requestor ports when possible. use std::fmt; +use std::sync::{Arc, Mutex}; mod broadcaster; mod sender; -use crate::model::ports::sender::EventSinkSender; use crate::model::{InputFn, Model, ReplierFn}; -use crate::simulation::{Address, EventSink}; +use crate::simulation::{Address, EventSlot, EventStream}; +use crate::util::spsc_queue; -use broadcaster::{EventBroadcaster, QueryBroadcaster}; +use broadcaster::Broadcaster; -use self::sender::{InputSender, ReplierSender}; +use self::sender::{EventSender, EventSlotSender, EventStreamSender, QuerySender}; #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] /// Unique identifier for a connection between two ports. @@ -36,7 +37,7 @@ pub struct LineId(u64); /// methods that return no value. They broadcast events to all connected input /// ports. pub struct Output { - broadcaster: EventBroadcaster, + broadcaster: Broadcaster, next_line_id: u64, } @@ -61,23 +62,40 @@ impl Output { assert!(self.next_line_id != u64::MAX); let line_id = LineId(self.next_line_id); self.next_line_id += 1; - let sender = Box::new(InputSender::new(input, address.into().0)); + let sender = Box::new(EventSender::new(input, address.into().0)); self.broadcaster.add(sender, line_id); line_id } - /// Adds a connection to an event sink such as an - /// [`EventSlot`](crate::simulation::EventSlot) or - /// [`EventQueue`](crate::simulation::EventQueue). - pub fn connect_sink>(&mut self, sink: &S) -> LineId { + /// Adds a connection to an event stream iterator. + pub fn connect_stream(&mut self) -> (EventStream, LineId) { assert!(self.next_line_id != u64::MAX); let line_id = LineId(self.next_line_id); self.next_line_id += 1; - let sender = Box::new(EventSinkSender::new(sink.writer())); + + let (producer, consumer) = spsc_queue::spsc_queue(); + let sender = Box::new(EventStreamSender::new(producer)); + let event_stream = EventStream::new(consumer); + self.broadcaster.add(sender, line_id); - line_id + (event_stream, line_id) + } + + /// Adds a connection to an event slot. + pub fn connect_slot(&mut self) -> (EventSlot, LineId) { + assert!(self.next_line_id != u64::MAX); + let line_id = LineId(self.next_line_id); + self.next_line_id += 1; + + let slot = Arc::new(Mutex::new(None)); + let sender = Box::new(EventSlotSender::new(slot.clone())); + let event_slot = EventSlot::new(slot); + + self.broadcaster.add(sender, line_id); + + (event_slot, line_id) } /// Removes the connection specified by the `LineId` parameter. @@ -100,14 +118,14 @@ impl Output { /// Broadcasts an event to all connected input ports. pub async fn send(&mut self, arg: T) { - self.broadcaster.broadcast(arg).await.unwrap(); + self.broadcaster.broadcast_event(arg).await.unwrap(); } } impl Default for Output { fn default() -> Self { Self { - broadcaster: EventBroadcaster::default(), + broadcaster: Broadcaster::default(), next_line_id: 0, } } @@ -125,7 +143,7 @@ impl fmt::Debug for Output { /// model methods that return a value. They broadcast queries to all connected /// replier ports. pub struct Requestor { - broadcaster: QueryBroadcaster, + broadcaster: Broadcaster, next_line_id: u64, } @@ -150,7 +168,7 @@ impl Requestor { assert!(self.next_line_id != u64::MAX); let line_id = LineId(self.next_line_id); self.next_line_id += 1; - let sender = Box::new(ReplierSender::new(replier, address.into().0)); + let sender = Box::new(QuerySender::new(replier, address.into().0)); self.broadcaster.add(sender, line_id); line_id @@ -176,14 +194,14 @@ impl Requestor { /// Broadcasts a query to all connected replier ports. pub async fn send(&mut self, arg: T) -> impl Iterator + '_ { - self.broadcaster.broadcast(arg).await.unwrap() + self.broadcaster.broadcast_query(arg).await.unwrap() } } impl Default for Requestor { fn default() -> Self { Self { - broadcaster: QueryBroadcaster::default(), + broadcaster: Broadcaster::default(), next_line_id: 0, } } diff --git a/asynchronix/src/model/ports/broadcaster.rs b/asynchronix/src/model/ports/broadcaster.rs index 22f525d..ff347cc 100644 --- a/asynchronix/src/model/ports/broadcaster.rs +++ b/asynchronix/src/model/ports/broadcaster.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; use std::task::{Context, Poll}; @@ -27,17 +26,28 @@ mod task_set; /// exploits this behavior by waking the main broadcast future only when all /// sender futures have been awaken, which strongly reduces overhead since /// waking a sender task does not actually schedule it on the executor. -pub(super) struct BroadcasterInner { +pub(super) struct Broadcaster { /// The list of senders with their associated line identifier. senders: Vec<(LineId, Box>)>, /// Fields explicitly borrowed by the `BroadcastFuture`. shared: Shared, - /// Phantom types. - _phantom_event: PhantomData, - _phantom_reply: PhantomData, } -impl BroadcasterInner { +impl Broadcaster { + /// Broadcasts an event to all addresses. + pub(super) async fn broadcast_event(&mut self, arg: T) -> Result<(), BroadcastError> { + match self.senders.as_mut_slice() { + // No sender. + [] => Ok(()), + // One sender. + [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), + // Multiple senders. + _ => self.broadcast(arg).await, + } + } +} + +impl Broadcaster { /// Adds a new sender associated to the specified identifier. /// /// # Panics @@ -83,25 +93,55 @@ impl BroadcasterInner { self.senders.len() } + /// Broadcasts a query to all addresses and collect all responses. + pub(super) async fn broadcast_query( + &mut self, + arg: T, + ) -> Result + '_, BroadcastError> { + match self.senders.as_mut_slice() { + // No sender. + [] => {} + // One sender. + [sender] => { + let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; + self.shared.futures_env[0].output = Some(output); + } + // Multiple senders. + _ => self.broadcast(arg).await?, + }; + + // At this point all outputs should be available so `unwrap` can be + // called on the output of each future. + let outputs = self + .shared + .futures_env + .iter_mut() + .map(|t| t.output.take().unwrap()); + + Ok(outputs) + } + /// Efficiently broadcasts a message or a query to multiple addresses. /// /// This method does not collect the responses from queries. fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> { + let futures_count = self.senders.len(); let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. - let mut iter = self + for (i, (sender, futures_env)) in self .senders .iter_mut() - .zip(self.shared.futures_env.iter_mut()); - while let Some((sender, futures_env)) = iter.next() { + .zip(self.shared.futures_env.iter_mut()) + .enumerate() + { let future_cache = futures_env .storage .take() .unwrap_or_else(|| RecycleBox::new(())); // Move the argument rather than clone it for the last future. - if iter.len() == 0 { + if i + 1 == futures_count { let future: RecycleBox> + Send + '_> = coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg))); @@ -121,7 +161,7 @@ impl BroadcasterInner { } } -impl Default for BroadcasterInner { +impl Default for Broadcaster { /// Creates an empty `Broadcaster` object. fn default() -> Self { let wake_sink = WakeSink::new(); @@ -135,141 +175,6 @@ impl Default for BroadcasterInner { futures_env: Vec::new(), storage: None, }, - _phantom_event: PhantomData, - _phantom_reply: PhantomData, - } - } -} - -/// An object that can efficiently broadcast events to several input ports. -/// -/// See `BroadcasterInner` for implementation details. -pub(super) struct EventBroadcaster { - /// The broadcaster core object. - inner: BroadcasterInner, -} - -impl EventBroadcaster { - /// Adds a new sender associated to the specified identifier. - /// - /// # Panics - /// - /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.inner.add(sender, id); - } - - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - self.inner.remove(id) - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.inner.clear(); - } - - /// Returns the number of connected senders. - pub(super) fn len(&self) -> usize { - self.inner.len() - } - - /// Broadcasts an event to all addresses. - pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), BroadcastError> { - match self.inner.senders.as_mut_slice() { - // No sender. - [] => Ok(()), - // One sender. - [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), - // Multiple senders. - _ => self.inner.broadcast(arg).await, - } - } -} - -impl Default for EventBroadcaster { - fn default() -> Self { - Self { - inner: BroadcasterInner::default(), - } - } -} - -/// An object that can efficiently broadcast queries to several replier ports. -/// -/// See `BroadcasterInner` for implementation details. -pub(super) struct QueryBroadcaster { - /// The broadcaster core object. - inner: BroadcasterInner, -} - -impl QueryBroadcaster { - /// Adds a new sender associated to the specified identifier. - /// - /// # Panics - /// - /// This method will panic if the total count of senders would reach - /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.inner.add(sender, id); - } - - /// Removes the first sender with the specified identifier, if any. - /// - /// Returns `true` if there was indeed a sender associated to the specified - /// identifier. - pub(super) fn remove(&mut self, id: LineId) -> bool { - self.inner.remove(id) - } - - /// Removes all senders. - pub(super) fn clear(&mut self) { - self.inner.clear(); - } - - /// Returns the number of connected senders. - pub(super) fn len(&self) -> usize { - self.inner.len() - } - - /// Broadcasts a query to all addresses and collect all responses. - pub(super) async fn broadcast( - &mut self, - arg: T, - ) -> Result + '_, BroadcastError> { - match self.inner.senders.as_mut_slice() { - // No sender. - [] => {} - // One sender. - [sender] => { - let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.inner.shared.futures_env[0].output = Some(output); - } - // Multiple senders. - _ => self.inner.broadcast(arg).await?, - }; - - // At this point all outputs should be available so `unwrap` can be - // called on the output of each future. - let outputs = self - .inner - .shared - .futures_env - .iter_mut() - .map(|t| t.output.take().unwrap()); - - Ok(outputs) - } -} - -impl Default for QueryBroadcaster { - fn default() -> Self { - Self { - inner: BroadcasterInner::default(), } } } @@ -418,7 +323,7 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { let scheduled_tasks = match this .shared .task_set - .take_scheduled(this.pending_futures_count) + .steal_scheduled(this.pending_futures_count) { Some(st) => st, None => return Poll::Pending, @@ -503,7 +408,9 @@ mod tests { use futures_executor::block_on; + use super::super::sender::QuerySender; use crate::channel::Receiver; + use crate::model::Model; use crate::time::Scheduler; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; @@ -534,18 +441,18 @@ mod tests { const N_RECV: usize = 4; let mut mailboxes = Vec::new(); - let mut broadcaster = EventBroadcaster::default(); + let mut broadcaster = Broadcaster::default(); for id in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(InputSender::new(Counter::inc, address)); + let sender = Box::new(EventSender::new(Counter::inc, address)); broadcaster.add(sender, LineId(id as u64)); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - block_on(broadcaster.broadcast(1)).unwrap(); + block_on(broadcaster.broadcast_event(1)).unwrap(); }); let counter = Arc::new(AtomicUsize::new(0)); @@ -582,18 +489,18 @@ mod tests { const N_RECV: usize = 4; let mut mailboxes = Vec::new(); - let mut broadcaster = QueryBroadcaster::default(); + let mut broadcaster = Broadcaster::default(); for id in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); + let sender = Box::new(QuerySender::new(Counter::fetch_inc, address)); broadcaster.add(sender, LineId(id as u64)); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - let iter = block_on(broadcaster.broadcast(1)).unwrap(); + let iter = block_on(broadcaster.broadcast_query(1)).unwrap(); let sum = iter.fold(0, |acc, val| acc + val); assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)} @@ -702,12 +609,12 @@ mod tests { let (test_event2, waker2) = test_event::(); let (test_event3, waker3) = test_event::(); - let mut broadcaster = QueryBroadcaster::default(); + let mut broadcaster = Broadcaster::default(); broadcaster.add(Box::new(test_event1), LineId(1)); broadcaster.add(Box::new(test_event2), LineId(2)); broadcaster.add(Box::new(test_event3), LineId(3)); - let mut fut = Box::pin(broadcaster.broadcast(())); + let mut fut = Box::pin(broadcaster.broadcast_query(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); let is_scheduled_waker = is_scheduled.clone(); @@ -777,11 +684,11 @@ mod tests { let (test_event1, waker1) = test_event::(); let (test_event2, waker2) = test_event::(); - let mut broadcaster = QueryBroadcaster::default(); + let mut broadcaster = Broadcaster::default(); broadcaster.add(Box::new(test_event1), LineId(1)); broadcaster.add(Box::new(test_event2), LineId(2)); - let mut fut = Box::pin(broadcaster.broadcast(())); + let mut fut = Box::pin(broadcaster.broadcast_query(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); let is_scheduled_waker = is_scheduled.clone(); diff --git a/asynchronix/src/model/ports/broadcaster/task_set.rs b/asynchronix/src/model/ports/broadcaster/task_set.rs index 9a978f1..6538ee4 100644 --- a/asynchronix/src/model/ports/broadcaster/task_set.rs +++ b/asynchronix/src/model/ports/broadcaster/task_set.rs @@ -21,37 +21,31 @@ const COUNTDOWN_MASK: u64 = !INDEX_MASK; /// scheduled tasks. const COUNTDOWN_ONE: u64 = 1 << 32; -/// An object for the efficient management of a set of tasks scheduled -/// concurrently. +/// A set of tasks that may be scheduled cheaply and can be requested to wake a +/// parent task only when a given amount of tasks have been scheduled. /// -/// The algorithm used by `TaskSet` is designed to wake up the parent task as -/// seldom as possible, ideally only when all non-completed sub-tasks have been -/// scheduled (awaken). -/// -/// A `TaskSet` maintains both a vector-based list of tasks (or more accurately, -/// task waker handles) and a linked list of the subset of tasks that are -/// currently scheduled. The latter is stored in a vector-based Treiber stack -/// which links tasks through indices rather than pointers. Using indices has -/// two advantages: (i) it makes a fully safe implementation possible and (ii) -/// it can take advantage of a single CAS to simultaneously move the head and -/// decrement the outstanding amount of tasks to be scheduled before the parent -/// task is notified. +/// This object maintains both a list of all active tasks and a list of the +/// subset of active tasks currently scheduled. The latter is stored in a +/// Treiber stack which links tasks through indices rather than pointers. Using +/// indices has two advantages: (i) it enables a fully safe implementation and +/// (ii) it makes it possible to use a single CAS to simultaneously move the +/// head and decrement the outstanding amount of tasks to be scheduled before +/// the parent task is notified. pub(super) struct TaskSet { - /// Set of all tasks, scheduled or not. + /// Set of all active tasks, scheduled or not. /// - /// In some cases, the use of `resize()` to shrink the task set may leave - /// inactive tasks at the back of the vector, in which case the length of - /// the vector will exceed `task_count`. + /// In some rare cases, the back of the vector can also contain inactive + /// (retired) tasks. tasks: Vec>, /// Head of the Treiber stack for scheduled tasks. /// - /// The lower 32 bits specify the index of the last scheduled task (head), - /// if any, whereas the upper 32 bits specify the countdown of tasks still - /// to be scheduled before the parent task is notified. + /// The lower bits specify the index of the last scheduled task, if any, + /// whereas the upper bits specify the countdown of tasks still to be + /// scheduled before the parent task is notified. head: Arc, /// A notifier used to wake the parent task. notifier: WakeSource, - /// Count of all tasks, scheduled or not. + /// Count of all active tasks, scheduled or not. task_count: usize, } @@ -71,25 +65,21 @@ impl TaskSet { } } - /// Take all scheduled tasks and returns an iterator over their indices, or - /// if there are no currently scheduled tasks returns `None` and requests a - /// notification to be sent after `pending_task_count` tasks have been - /// scheduled. + /// Steals scheduled tasks if any and returns an iterator over their + /// indices, otherwise returns `None` and requests a notification to be sent + /// after `notify_count` tasks have been scheduled. /// - /// In all cases, the list of scheduled tasks will be empty right after this - /// call. + /// In all cases, the list of scheduled tasks is guaranteed to be empty + /// after this call. /// - /// If there were scheduled tasks, no notification is requested because this - /// method is expected to be called repeatedly until it returns `None`. - /// Failure to do so will result in missed notifications. + /// If some tasks were stolen, no notification is requested. /// - /// If no tasks were scheduled, the notification is guaranteed to be - /// triggered no later than after `pending_task_count` tasks have been - /// scheduled, though it may in some cases be triggered earlier. If the - /// specified `pending_task_count` is zero then no notification is - /// requested. - pub(super) fn take_scheduled(&self, pending_task_count: usize) -> Option> { - let countdown = u32::try_from(pending_task_count).unwrap(); + /// If no tasks were stolen, the notification is guaranteed to be triggered + /// no later than after `notify_count` tasks have been scheduled, though it + /// may in some cases be triggered earlier. If the specified `notify_count` + /// is zero then no notification is requested. + pub(super) fn steal_scheduled(&self, notify_count: usize) -> Option> { + let countdown = u32::try_from(notify_count).unwrap(); let mut head = self.head.load(Ordering::Relaxed); loop { @@ -136,13 +126,13 @@ impl TaskSet { if self.head.load(Ordering::Relaxed) != EMPTY as u64 { // Dropping the iterator ensures that all tasks are put in the // sleeping state. - let _ = self.take_scheduled(0); + let _ = self.steal_scheduled(0); } } - /// Set the number of active tasks. + /// Modify the number of active tasks. /// - /// Note that this method may discard already scheduled tasks. + /// Note that this method may discard all scheduled tasks. /// /// # Panic /// @@ -210,7 +200,7 @@ impl TaskSet { } } - /// Returns `true` if one or more sub-tasks are currently scheduled. + /// Returns `true` if one or more tasks are currently scheduled. pub(super) fn has_scheduled(&self) -> bool { // Ordering: the content of the head is only used as an advisory flag so // Relaxed ordering is sufficient. diff --git a/asynchronix/src/model/ports/sender.rs b/asynchronix/src/model/ports/sender.rs index e63d055..d5dc2a2 100644 --- a/asynchronix/src/model/ports/sender.rs +++ b/asynchronix/src/model/ports/sender.rs @@ -4,23 +4,22 @@ use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use recycle_box::{coerce_box, RecycleBox}; use crate::channel; use crate::model::{InputFn, Model, ReplierFn}; -use crate::simulation::EventSinkWriter; +use crate::util::spsc_queue; -/// An event or query sender abstracting over the target model and input or -/// replier method. +/// Abstraction over `EventSender` and `QuerySender`. pub(super) trait Sender: Send { - /// Asynchronously send the event or request. fn send(&mut self, arg: T) -> RecycledFuture<'_, Result>; } -/// An object that can send events to an input port. -pub(super) struct InputSender { +/// An object that can send a payload to a model. +pub(super) struct EventSender { func: F, sender: channel::Sender, fut_storage: Option>, @@ -28,7 +27,7 @@ pub(super) struct InputSender { _phantom_closure_marker: PhantomData, } -impl InputSender +impl EventSender where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -45,15 +44,15 @@ where } } -impl Sender for InputSender +impl Sender for EventSender where M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, + F: for<'a> InputFn<'a, M, T, S> + Copy, T: Send + 'static, - S: Send + 'static, + S: Send, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let func = self.func.clone(); + let func = self.func; let fut = self.sender.send(move |model, scheduler, recycle_box| { let fut = func.call(model, arg, scheduler); @@ -67,8 +66,8 @@ where } } -/// An object that can send a request to a replier port and retrieve a response. -pub(super) struct ReplierSender { +/// An object that can send a payload to a model and retrieve a response. +pub(super) struct QuerySender { func: F, sender: channel::Sender, receiver: multishot::Receiver, @@ -77,7 +76,7 @@ pub(super) struct ReplierSender { _phantom_closure_marker: PhantomData, } -impl ReplierSender +impl QuerySender where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S>, @@ -96,16 +95,16 @@ where } } -impl Sender for ReplierSender +impl Sender for QuerySender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, + F: for<'a> ReplierFn<'a, M, T, R, S> + Copy, T: Send + 'static, R: Send + 'static, S: Send, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { - let func = self.func.clone(); + let func = self.func; let sender = &mut self.sender; let reply_receiver = &mut self.receiver; let fut_storage = &mut self.fut_storage; @@ -135,32 +134,59 @@ where } } -/// An object that can send a payload to a mutex-protected slot. -pub(super) struct EventSinkSender> { - writer: W, +/// An object that can send a payload to an unbounded queue. +pub(super) struct EventStreamSender { + producer: spsc_queue::Producer, fut_storage: Option>, - _phantom_event: PhantomData, } -impl> EventSinkSender { - pub(super) fn new(writer: W) -> Self { +impl EventStreamSender { + pub(super) fn new(producer: spsc_queue::Producer) -> Self { Self { - writer, + producer, fut_storage: None, - _phantom_event: PhantomData, } } } -impl> Sender for EventSinkSender +impl Sender for EventStreamSender where T: Send + 'static, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let writer = &self.writer; + let producer = &mut self.producer; RecycledFuture::new(&mut self.fut_storage, async move { - writer.write(arg); + producer.push(arg).map_err(|_| SendError {}) + }) + } +} + +/// An object that can send a payload to a mutex-protected slot. +pub(super) struct EventSlotSender { + slot: Arc>>, + fut_storage: Option>, +} + +impl EventSlotSender { + pub(super) fn new(slot: Arc>>) -> Self { + Self { + slot, + fut_storage: None, + } + } +} + +impl Sender for EventSlotSender +where + T: Send + 'static, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let slot = &*self.slot; + + RecycledFuture::new(&mut self.fut_storage, async move { + let mut slot = slot.lock().unwrap(); + *slot = Some(arg); Ok(()) }) diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index a99e2e7..1fba49f 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -87,12 +87,13 @@ //! Although uncommon, there is sometimes a need for connecting and/or //! disconnecting models after they have been migrated to the simulation. //! Likewise, one may want to connect or disconnect an [`EventSlot`] or -//! [`EventQueue`] after the simulation has been instantiated. +//! [`EventStream`] after the simulation has been instantiated. //! //! There is actually a very simple solution to this problem: since the -//! [`InputFn`] trait also matches closures of type `FnOnce(&mut impl Model)`, -//! it is enough to invoke [`Simulation::send_event()`] with a closure that -//! connects or disconnects a port, such as: +//! [`InputFn`](crate::model::InputFn) trait also matches closures of type +//! `FnOnce(&mut impl Model)`, it is enough to invoke +//! [`Simulation::send_event()`] with a closure that connects or disconnects a +//! port, such as: //! //! ``` //! # use asynchronix::model::{Model, Output}; @@ -118,13 +119,13 @@ //! &modelA_addr //! ); //! ``` +mod endpoints; mod mailbox; mod sim_init; -mod sink; +pub use endpoints::{EventSlot, EventStream}; pub use mailbox::{Address, Mailbox}; pub use sim_init::SimInit; -pub use sink::{EventQueue, EventSink, EventSinkWriter, EventSlot}; use std::error::Error; use std::fmt; @@ -150,7 +151,8 @@ use crate::util::sync_cell::SyncCell; /// [`SimInit::init()`](crate::simulation::SimInit::init) or /// [`SimInit::init_with_clock()`](crate::simulation::SimInit::init_with_clock) /// method on a simulation initializer. It contains an asynchronous executor -/// that runs all simulation models added beforehand to [`SimInit`]. +/// that runs all simulation models added beforehand to +/// [`SimInit`](crate::simulation::SimInit). /// /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation diff --git a/asynchronix/src/simulation/endpoints.rs b/asynchronix/src/simulation/endpoints.rs new file mode 100644 index 0000000..eed6c10 --- /dev/null +++ b/asynchronix/src/simulation/endpoints.rs @@ -0,0 +1,69 @@ +use std::fmt; +use std::sync::{Arc, Mutex, TryLockError, TryLockResult}; + +use crate::util::spsc_queue; + +/// An iterator that returns all events that were broadcast by an output port. +/// +/// Events are returned in first-in-first-out order. Note that even if the +/// iterator returns `None`, it may still produce more items after simulation +/// time is incremented. +pub struct EventStream { + consumer: spsc_queue::Consumer, +} + +impl EventStream { + /// Creates a new `EventStream`. + pub(crate) fn new(consumer: spsc_queue::Consumer) -> Self { + Self { consumer } + } +} + +impl Iterator for EventStream { + type Item = T; + + fn next(&mut self) -> Option { + self.consumer.pop() + } +} + +impl fmt::Debug for EventStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EventStream").finish_non_exhaustive() + } +} + +/// A single-value slot that holds the last event that was broadcast by an +/// output port. +pub struct EventSlot { + slot: Arc>>, +} + +impl EventSlot { + /// Creates a new `EventSlot`. + pub(crate) fn new(slot: Arc>>) -> Self { + Self { slot } + } + + /// Take the last event, if any, leaving the slot empty. + /// + /// Note that even after the event is taken, it may become populated anew + /// after simulation time is incremented. + pub fn take(&mut self) -> Option { + // We don't actually need to take self by mutable reference, but this + // signature is probably less surprising for the user and more + // consistent with `EventStream`. It also prevents multi-threaded + // access, which would be likely to be misused. + match self.slot.try_lock() { + TryLockResult::Ok(mut v) => v.take(), + TryLockResult::Err(TryLockError::WouldBlock) => None, + TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(), + } + } +} + +impl fmt::Debug for EventSlot { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EventSlot").finish_non_exhaustive() + } +} diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 8a57b35..dbfa7ff 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -69,8 +69,9 @@ impl SimInit { Simulation::new(self.executor, self.scheduler_queue, self.time) } - /// Builds a simulation synchronized with the provided [`Clock`] and - /// initialized at the specified simulation time, executing the + /// Builds a simulation synchronized with the provided + /// [`Clock`](crate::time::Clock) and initialized at the specified + /// simulation time, executing the /// [`Model::init()`](crate::model::Model::init) method on all model /// initializers. pub fn init_with_clock( diff --git a/asynchronix/src/simulation/sink.rs b/asynchronix/src/simulation/sink.rs deleted file mode 100644 index f1050b4..0000000 --- a/asynchronix/src/simulation/sink.rs +++ /dev/null @@ -1,171 +0,0 @@ -use std::fmt; -use std::sync::{Arc, Mutex, TryLockError, TryLockResult}; - -use crossbeam_queue::SegQueue; - -/// A simulation endpoint that can receive events sent by model outputs. -/// -/// An `EventSink` can be thought of as a self-standing input meant to -/// externally monitor the simulated system. -pub trait EventSink { - /// Writer handle to an event sink. - type Writer: EventSinkWriter; - - /// Returns the writer handle associated to this sink. - fn writer(&self) -> Self::Writer; -} - -/// A writer handle to an event sink. -pub trait EventSinkWriter: Send + Sync + 'static { - /// Writes a value to the associated sink. - fn write(&self, event: T); -} - -/// An event sink iterator that returns all events that were broadcast by -/// connected output ports. -/// -/// Events are returned in first-in-first-out order. Note that even if the -/// iterator returns `None`, it may still produce more items in the future (in -/// other words, it is not a [`FusedIterator`](std::iter::FusedIterator)). -pub struct EventQueue { - queue: Arc>, -} - -impl EventQueue { - /// Creates a new `EventStream`. - pub fn new() -> Self { - Self { - queue: Arc::new(SegQueue::new()), - } - } -} - -impl EventSink for EventQueue { - type Writer = EventQueueWriter; - - fn writer(&self) -> Self::Writer { - EventQueueWriter { - queue: self.queue.clone(), - } - } -} - -impl Iterator for EventQueue { - type Item = T; - - fn next(&mut self) -> Option { - self.queue.pop() - } -} - -impl Default for EventQueue { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for EventQueue { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("EventQueue").finish_non_exhaustive() - } -} - -/// A producer handle of an `EventStream`. -pub struct EventQueueWriter { - queue: Arc>, -} - -impl EventSinkWriter for EventQueueWriter { - /// Pushes an event onto the queue. - fn write(&self, event: T) { - self.queue.push(event); - } -} - -impl fmt::Debug for EventQueueWriter { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("EventQueueWriter").finish_non_exhaustive() - } -} - -/// A single-value event sink that holds the last event that was broadcast by -/// any of the connected output ports. -pub struct EventSlot { - slot: Arc>>, -} - -impl EventSlot { - /// Creates a new `EventSlot`. - pub fn new() -> Self { - Self { - slot: Arc::new(Mutex::new(None)), - } - } - - /// Take the last event, if any, leaving the slot empty. - /// - /// Note that even after the event is taken, it may become populated anew. - pub fn take(&mut self) -> Option { - // We don't actually need to take self by mutable reference, but this - // signature is probably less surprising for the user and more - // consistent with `EventStream`. It also prevents multi-threaded - // access, which would be likely to be misused. - match self.slot.try_lock() { - TryLockResult::Ok(mut v) => v.take(), - TryLockResult::Err(TryLockError::WouldBlock) => None, - TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(), - } - } -} - -impl EventSink for EventSlot { - type Writer = EventSlotWriter; - - /// Returns a writer handle. - fn writer(&self) -> EventSlotWriter { - EventSlotWriter { - slot: self.slot.clone(), - } - } -} - -impl Default for EventSlot { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for EventSlot { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("EventSlot").finish_non_exhaustive() - } -} - -/// A writer handle of an `EventSlot`. -pub struct EventSlotWriter { - slot: Arc>>, -} - -impl EventSinkWriter for EventSlotWriter { - /// Write an event into the slot. - fn write(&self, event: T) { - // Why do we just use `try_lock` and abandon if the lock is taken? The - // reason is that (i) the reader is never supposed to access the slot - // when the simulation runs and (ii) as a rule the simulator does not - // warrant fairness when concurrently writing to an input. Therefore, if - // the mutex is already locked when this writer attempts to lock it, it - // means another writer is concurrently writing an event, and that event - // is just as legitimate as ours so there is not need to overwrite it. - match self.slot.try_lock() { - TryLockResult::Ok(mut v) => *v = Some(event), - TryLockResult::Err(TryLockError::WouldBlock) => {} - TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(), - } - } -} - -impl fmt::Debug for EventSlotWriter { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("EventStreamWriter").finish_non_exhaustive() - } -} diff --git a/asynchronix/src/time/scheduler.rs b/asynchronix/src/time/scheduler.rs index 24332af..3ab2ead 100644 --- a/asynchronix/src/time/scheduler.rs +++ b/asynchronix/src/time/scheduler.rs @@ -3,7 +3,7 @@ use std::error::Error; use std::fmt; use std::future::Future; -use std::num::NonZeroUsize; +use std::marker::PhantomData; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -13,7 +13,7 @@ use std::time::Duration; use pin_project_lite::pin_project; use recycle_box::{coerce_box, RecycleBox}; -use crate::channel::Sender; +use crate::channel::{ChannelId, Sender}; use crate::executor::Executor; use crate::model::{InputFn, Model}; use crate::time::{MonotonicTime, TearableAtomicTime}; @@ -21,15 +21,7 @@ use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCellReader; /// Shorthand for the scheduler queue type. - -// Why use both time and channel ID as the key? The short answer is that this -// ensures that events targeting the same channel are sent in the order they -// were scheduled. More precisely, this ensures that events targeting the same -// channel are ordered contiguously in the priority queue, which in turns allows -// the event loop to easily aggregate such events into single futures and thus -// control their relative order of execution. -pub(crate) type SchedulerQueue = - PriorityQueue<(MonotonicTime, NonZeroUsize), Box>; +pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, ChannelId), Box>; /// Trait abstracting over time-absolute and time-relative deadlines. /// @@ -487,7 +479,7 @@ impl Error for SchedulingError {} /// Schedules an event at a future time. /// -/// This function does not check whether the specified time lies in the future +/// This method does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_event_at_unchecked( time: MonotonicTime, @@ -503,7 +495,7 @@ pub(crate) fn schedule_event_at_unchecked( { let channel_id = sender.channel_id(); - let event_dispatcher = Box::new(EventDispatcher::new(dispatch_event(func, arg, sender))); + let event_dispatcher = Box::new(new_event_dispatcher(func, arg, sender)); let mut scheduler_queue = scheduler_queue.lock().unwrap(); scheduler_queue.insert((time, channel_id), event_dispatcher); @@ -511,7 +503,7 @@ pub(crate) fn schedule_event_at_unchecked( /// Schedules an event at a future time, returning an event key. /// -/// This function does not check whether the specified time lies in the future +/// This method does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_keyed_event_at_unchecked( time: MonotonicTime, @@ -529,8 +521,10 @@ where let event_key = EventKey::new(); let channel_id = sender.channel_id(); let event_dispatcher = Box::new(KeyedEventDispatcher::new( - |ek| dispatch_keyed_event(ek, func, arg, sender), event_key.clone(), + func, + arg, + sender, )); let mut scheduler_queue = scheduler_queue.lock().unwrap(); @@ -541,7 +535,7 @@ where /// Schedules a periodic event at a future time. /// -/// This function does not check whether the specified time lies in the future +/// This method does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_periodic_event_at_unchecked( time: MonotonicTime, @@ -558,10 +552,7 @@ pub(crate) fn schedule_periodic_event_at_unchecked( { let channel_id = sender.channel_id(); - let event_dispatcher = Box::new(PeriodicEventDispatcher::new( - || dispatch_event(func, arg, sender), - period, - )); + let event_dispatcher = Box::new(PeriodicEventDispatcher::new(func, arg, sender, period)); let mut scheduler_queue = scheduler_queue.lock().unwrap(); scheduler_queue.insert((time, channel_id), event_dispatcher); @@ -569,7 +560,7 @@ pub(crate) fn schedule_periodic_event_at_unchecked( /// Schedules an event at a future time, returning an event key. /// -/// This function does not check whether the specified time lies in the future +/// This method does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_periodic_keyed_event_at_unchecked( time: MonotonicTime, @@ -588,9 +579,11 @@ where let event_key = EventKey::new(); let channel_id = sender.channel_id(); let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new( - |ek| dispatch_keyed_event(ek, func, arg, sender), - period, event_key.clone(), + func, + arg, + sender, + period, )); let mut scheduler_queue = scheduler_queue.lock().unwrap(); @@ -621,8 +614,8 @@ pub(crate) trait ScheduledEvent: Send { } pin_project! { - /// An object that can be converted to a future dispatching a - /// non-cancellable event. + /// Object that can be converted to a future dispatching a non-cancellable + /// event. /// /// Note that this particular event dispatcher is in fact already a future: /// since the future cannot be cancelled and the dispatcher does not need to @@ -634,14 +627,23 @@ pin_project! { } } -impl EventDispatcher +/// Constructs a new `EventDispatcher`. +/// +/// Due to some limitations of type inference or of my understanding of it, the +/// constructor for this event dispatchers is a freestanding function. +fn new_event_dispatcher( + func: F, + arg: T, + sender: Sender, +) -> EventDispatcher> where - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, { - /// Constructs a new `EventDispatcher`. - pub(crate) fn new(fut: F) -> Self { - EventDispatcher { fut } - } + let fut = dispatch_event(func, arg, sender); + + EventDispatcher { fut } } impl Future for EventDispatcher @@ -675,78 +677,112 @@ where } } -/// An object that can be converted to a future dispatching a non-cancellable, -/// periodic event. -pub(crate) struct PeriodicEventDispatcher +/// Object that can be converted to a future dispatching a non-cancellable periodic +/// event. +pub(crate) struct PeriodicEventDispatcher where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, + M: Model, { - /// A clonable generator for the dispatching future. - gen: G, - /// The event repetition period. + func: F, + arg: T, + sender: Sender, period: Duration, + _input_kind: PhantomData, } -impl PeriodicEventDispatcher +impl PeriodicEventDispatcher where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, { /// Constructs a new `PeriodicEventDispatcher`. - fn new(gen: G, period: Duration) -> Self { - Self { gen, period } + fn new(func: F, arg: T, sender: Sender, period: Duration) -> Self { + Self { + func, + arg, + sender, + period, + _input_kind: PhantomData, + } } } -impl ScheduledEvent for PeriodicEventDispatcher +impl ScheduledEvent for PeriodicEventDispatcher where - G: (FnOnce() -> F) + Clone + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, { fn is_cancelled(&self) -> bool { false } fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new(self.gen.clone(), self.period)); + let event = Box::new(Self::new( + self.func.clone(), + self.arg.clone(), + self.sender.clone(), + self.period, + )); Some((event, self.period)) } fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)()) + let Self { + func, arg, sender, .. + } = *self; + + Box::pin(dispatch_event(func, arg, sender)) } fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)()); + let Self { + func, arg, sender, .. + } = *self; + + let fut = dispatch_event(func, arg, sender); + executor.spawn_and_forget(fut); } } -/// An object that can be converted to a future dispatching a cancellable event. -pub(crate) struct KeyedEventDispatcher +/// Object that can be converted to a future dispatching a cancellable event. +pub(crate) struct KeyedEventDispatcher where - G: (FnOnce(EventKey) -> F) + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, { - /// A generator for the dispatching future. - gen: G, - /// The event cancellation key. event_key: EventKey, + func: F, + arg: T, + sender: Sender, + _input_kind: PhantomData, } -impl KeyedEventDispatcher +impl KeyedEventDispatcher where - G: (FnOnce(EventKey) -> F) + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, { /// Constructs a new `KeyedEventDispatcher`. - fn new(gen: G, event_key: EventKey) -> Self { - Self { gen, event_key } + fn new(event_key: EventKey, func: F, arg: T, sender: Sender) -> Self { + Self { + event_key, + func, + arg, + sender, + _input_kind: PhantomData, + } } } -impl ScheduledEvent for KeyedEventDispatcher +impl ScheduledEvent for KeyedEventDispatcher where - G: (FnOnce(EventKey) -> F) + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, { fn is_cancelled(&self) -> bool { self.event_key.is_cancelled() @@ -755,74 +791,116 @@ where None } fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + Box::pin(dispatch_keyed_event(event_key, func, arg, sender)) } fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + let fut = dispatch_keyed_event(event_key, func, arg, sender); + executor.spawn_and_forget(fut); } } -/// An object that can be converted to a future dispatching a periodic, -/// cancellable event. -pub(crate) struct PeriodicKeyedEventDispatcher +/// Object that can be converted to a future dispatching a cancellable event. +pub(crate) struct PeriodicKeyedEventDispatcher where - G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, { - /// A clonable generator for the dispatching future. - gen: G, - /// The repetition period. - period: Duration, - /// The event cancellation key. event_key: EventKey, + func: F, + arg: T, + sender: Sender, + period: Duration, + _input_kind: PhantomData, } -impl PeriodicKeyedEventDispatcher +impl PeriodicKeyedEventDispatcher where - G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, { /// Constructs a new `KeyedEventDispatcher`. - fn new(gen: G, period: Duration, event_key: EventKey) -> Self { + fn new(event_key: EventKey, func: F, arg: T, sender: Sender, period: Duration) -> Self { Self { - gen, - period, event_key, + func, + arg, + sender, + period, + _input_kind: PhantomData, } } } -impl ScheduledEvent for PeriodicKeyedEventDispatcher +impl ScheduledEvent for PeriodicKeyedEventDispatcher where - G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, - F: Future + Send + 'static, + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, { fn is_cancelled(&self) -> bool { self.event_key.is_cancelled() } fn next(&self) -> Option<(Box, Duration)> { let event = Box::new(Self::new( - self.gen.clone(), - self.period, self.event_key.clone(), + self.func.clone(), + self.arg.clone(), + self.sender.clone(), + self.period, )); Some((event, self.period)) } fn into_future(self: Box) -> Pin + Send>> { - Box::pin((self.gen)(self.event_key)) + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + Box::pin(dispatch_keyed_event(event_key, func, arg, sender)) } fn spawn_and_forget(self: Box, executor: &Executor) { - executor.spawn_and_forget((self.gen)(self.event_key)); + let Self { + event_key, + func, + arg, + sender, + .. + } = *self; + + let fut = dispatch_keyed_event(event_key, func, arg, sender); + executor.spawn_and_forget(fut); } } -/// Asynchronously dispatches a non-cancellable event to a model input. -pub(crate) async fn dispatch_event(func: F, arg: T, sender: Sender) +/// Asynchronously dispatch a regular, non-cancellable event. +async fn dispatch_event(func: F, arg: T, sender: Sender) where M: Model, F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + T: Send + Clone + 'static, { let _ = sender .send( @@ -838,13 +916,9 @@ where .await; } -/// Asynchronously dispatches a cancellable event to a model input. -pub(crate) async fn dispatch_keyed_event( - event_key: EventKey, - func: F, - arg: T, - sender: Sender, -) where +/// Asynchronously dispatch a cancellable event. +async fn dispatch_keyed_event(event_key: EventKey, func: F, arg: T, sender: Sender) +where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, diff --git a/asynchronix/src/util.rs b/asynchronix/src/util.rs index 12c5ccf..3b8e9c0 100644 --- a/asynchronix/src/util.rs +++ b/asynchronix/src/util.rs @@ -3,4 +3,5 @@ pub(crate) mod futures; pub(crate) mod priority_queue; pub(crate) mod rng; pub(crate) mod slot; +pub(crate) mod spsc_queue; pub(crate) mod sync_cell; diff --git a/asynchronix/src/util/priority_queue.rs b/asynchronix/src/util/priority_queue.rs index f8cf5cf..b57dca1 100644 --- a/asynchronix/src/util/priority_queue.rs +++ b/asynchronix/src/util/priority_queue.rs @@ -1,6 +1,6 @@ //! Associative priority queue. -use std::cmp::Ordering; +use std::cmp::{Eq, Ord, Ordering, PartialOrd}; use std::collections::BinaryHeap; /// A key-value pair ordered by keys in inverse order, with epoch-based ordering @@ -111,7 +111,7 @@ impl PriorityQueue { #[cfg(all(test, not(asynchronix_loom)))] mod tests { - use super::PriorityQueue; + use super::*; #[test] fn priority_smoke() { diff --git a/asynchronix/src/util/spsc_queue.rs b/asynchronix/src/util/spsc_queue.rs new file mode 100644 index 0000000..9a81b2c --- /dev/null +++ b/asynchronix/src/util/spsc_queue.rs @@ -0,0 +1,393 @@ +//! Single-producer single-consumer unbounded FIFO queue that stores values in +//! fixed-size memory segments. + +#![allow(unused)] + +use std::cell::Cell; +use std::error::Error; +use std::fmt; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering; + +use crossbeam_utils::CachePadded; + +use crate::loom_exports::cell::UnsafeCell; +use crate::loom_exports::sync::atomic::{AtomicBool, AtomicPtr}; +use crate::loom_exports::sync::Arc; + +/// The number of slots in a single segment. +const SEGMENT_LEN: usize = 32; + +/// A slot containing a single value. +struct Slot { + has_value: AtomicBool, + value: UnsafeCell>, +} + +impl Default for Slot { + fn default() -> Self { + Slot { + has_value: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } +} + +/// A memory segment containing `SEGMENT_LEN` slots. +struct Segment { + /// Address of the next segment. + /// + /// A null pointer means that the next segment is not allocated yet. + next_segment: AtomicPtr>, + data: [Slot; SEGMENT_LEN], +} + +impl Segment { + /// Allocates a new segment. + fn allocate_new() -> NonNull { + let segment = Self { + next_segment: AtomicPtr::new(ptr::null_mut()), + data: Default::default(), + }; + + // Safety: the pointer is non-null since it comes from a box. + unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(segment))) } + } +} + +/// The head of the queue from which values are popped. +struct Head { + /// Pointer to the segment at the head of the queue. + segment: NonNull>, + /// Index of the next value to be read. + /// + /// If the index is equal to the segment length, it is necessary to move to + /// the next segment before the next value can be read. + next_read_idx: usize, +} + +/// The tail of the queue to which values are pushed. +struct Tail { + /// Pointer to the segment at the tail of the queue. + segment: NonNull>, + /// Index of the next value to be written. + /// + /// If the index is equal to the segment length, a new segment must be + /// allocated before a new value can be written. + next_write_idx: usize, +} + +/// A single-producer, single-consumer unbounded FIFO queue. +struct Queue { + head: CachePadded>>, + tail: CachePadded>>, +} + +impl Queue { + /// Creates a new queue. + fn new() -> Self { + let segment = Segment::allocate_new(); + + let head = Head { + segment, + next_read_idx: 0, + }; + let tail = Tail { + segment, + next_write_idx: 0, + }; + + Self { + head: CachePadded::new(UnsafeCell::new(head)), + tail: CachePadded::new(UnsafeCell::new(tail)), + } + } + + /// Pushes a new value. + /// + /// # Safety + /// + /// The method cannot be called from multiple threads concurrently. + unsafe fn push(&self, value: T) { + // Safety: this is the only thread accessing the tail. + let tail = self.tail.with_mut(|p| &mut *p); + + // If the whole segment has been written, allocate a new segment. + if tail.next_write_idx == SEGMENT_LEN { + let old_segment = tail.segment; + tail.segment = Segment::allocate_new(); + + // Safety: the old segment is still allocated since the consumer + // cannot deallocate it before `next_segment` is set to a non-null + // value. + old_segment + .as_ref() + .next_segment + .store(tail.segment.as_ptr(), Ordering::Release); + + tail.next_write_idx = 0; + } + + // Safety: the tail segment is allocated since the consumer cannot + // deallocate it before `next_segment` is set to a non-null value. + let data = &tail.segment.as_ref().data[tail.next_write_idx]; + + // Safety: we have exclusive access to the slot value since the consumer + // cannot access it before `has_value` is set to true. + data.value.with_mut(|p| (*p).write(value)); + + // Ordering: this Release store synchronizes with the Acquire load in + // `pop` and ensures that the value is visible to the consumer once + // `has_value` reads `true`. + data.has_value.store(true, Ordering::Release); + + tail.next_write_idx += 1; + } + + /// Pops a new value. + /// + /// # Safety + /// + /// The method cannot be called from multiple threads concurrently. + unsafe fn pop(&self) -> Option { + // Safety: this is the only thread accessing the head. + let head = self.head.with_mut(|p| &mut *p); + + // If the whole segment has been read, try to move to the next segment. + if head.next_read_idx == SEGMENT_LEN { + // Read the next segment or return `None` if it is not ready yet. + // + // Safety: the head segment is still allocated since we are the only + // thread that can deallocate it. + let next_segment = head.segment.as_ref().next_segment.load(Ordering::Acquire); + let next_segment = NonNull::new(next_segment)?; + + // Deallocate the old segment. + // + // Safety: the pointer was initialized from a box and the segment is + // still allocated since we are the only thread that can deallocate + // it. + let _ = Box::from_raw(head.segment.as_ptr()); + + // Update the segment and the next index. + head.segment = next_segment; + head.next_read_idx = 0; + } + + let data = &head.segment.as_ref().data[head.next_read_idx]; + + // Ordering: this Acquire load synchronizes with the Release store in + // `push` and ensures that the value is visible once `has_value` reads + // `true`. + if !data.has_value.load(Ordering::Acquire) { + return None; + } + + // Safety: since `has_value` is `true` then we have exclusive ownership + // of the value and we know that it was initialized. + let value = data.value.with(|p| (*p).assume_init_read()); + + head.next_read_idx += 1; + + Some(value) + } +} + +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + // Drop all values. + while self.pop().is_some() {} + + // All values have been dropped: the last segment can be freed. + + // Safety: this is the only thread accessing the head since both the + // consumer and producer have been dropped. + let head = self.head.with_mut(|p| &mut *p); + + // Safety: the pointer was initialized from a box and the segment is + // still allocated since we are the only thread that can deallocate + // it. + let _ = Box::from_raw(head.segment.as_ptr()); + } + } +} + +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +impl UnwindSafe for Queue {} +impl RefUnwindSafe for Queue {} + +/// A handle to a single-producer, single-consumer queue that can push values. +pub(crate) struct Producer { + queue: Arc>, + _non_sync_phantom: PhantomData>, +} +impl Producer { + /// Pushes a value to the queue. + pub(crate) fn push(&self, value: T) -> Result<(), PushError> { + if Arc::strong_count(&self.queue) == 1 { + return Err(PushError {}); + } + + unsafe { self.queue.push(value) }; + + Ok(()) + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +/// Error returned when a push failed due to the consumer being dropped. +pub(crate) struct PushError {} + +impl fmt::Display for PushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sending message into a closed mailbox") + } +} + +impl Error for PushError {} + +/// A handle to a single-producer, single-consumer queue that can pop values. +pub(crate) struct Consumer { + queue: Arc>, + _non_sync_phantom: PhantomData>, +} +impl Consumer { + /// Pops a value from the queue. + pub(crate) fn pop(&self) -> Option { + unsafe { self.queue.pop() } + } +} + +/// Creates the producer and consumer handles of a single-producer, +/// single-consumer queue. +pub(crate) fn spsc_queue() -> (Producer, Consumer) { + let queue = Arc::new(Queue::new()); + + let producer = Producer { + queue: queue.clone(), + _non_sync_phantom: PhantomData, + }; + let consumer = Consumer { + queue, + _non_sync_phantom: PhantomData, + }; + + (producer, consumer) +} + +/// Loom tests. +#[cfg(all(test, not(asynchronix_loom)))] +mod tests { + use super::*; + + use std::thread; + + #[test] + fn spsc_queue_basic() { + const VALUE_COUNT: usize = if cfg!(miri) { 1000 } else { 100_000 }; + + let (producer, consumer) = spsc_queue(); + + let th = thread::spawn(move || { + for i in 0..VALUE_COUNT { + let value = loop { + if let Some(v) = consumer.pop() { + break v; + } + }; + + assert_eq!(value, i); + } + }); + + for i in 0..VALUE_COUNT { + producer.push(i).unwrap(); + } + + th.join().unwrap(); + } +} + +/// Loom tests. +#[cfg(all(test, asynchronix_loom))] +mod tests { + use super::*; + + use loom::model::Builder; + use loom::thread; + + #[test] + fn loom_spsc_queue_basic() { + const DEFAULT_PREEMPTION_BOUND: usize = 4; + const VALUE_COUNT: usize = 10; + + let mut builder = Builder::new(); + if builder.preemption_bound.is_none() { + builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND); + } + + builder.check(move || { + let (producer, consumer) = spsc_queue(); + + let th = thread::spawn(move || { + let mut value = 0; + for _ in 0..VALUE_COUNT { + if let Some(v) = consumer.pop() { + assert_eq!(v, value); + value += 1; + } + } + }); + + for i in 0..VALUE_COUNT { + let _ = producer.push(i); + } + + th.join().unwrap(); + }); + } + + #[test] + fn loom_spsc_queue_new_segment() { + const DEFAULT_PREEMPTION_BOUND: usize = 4; + const VALUE_COUNT_BEFORE: usize = 5; + const VALUE_COUNT_AFTER: usize = 5; + + let mut builder = Builder::new(); + if builder.preemption_bound.is_none() { + builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND); + } + + builder.check(move || { + let (producer, consumer) = spsc_queue(); + + // Fill up the first segment except for the last `VALUE_COUNT_BEFORE` slots. + for i in 0..(SEGMENT_LEN - VALUE_COUNT_BEFORE) { + producer.push(i).unwrap(); + consumer.pop(); + } + + let th = thread::spawn(move || { + let mut value = SEGMENT_LEN - VALUE_COUNT_BEFORE; + for _ in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) { + if let Some(v) = consumer.pop() { + assert_eq!(v, value); + value += 1; + } + } + }); + + for i in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) { + let _ = producer.push(i); + } + + th.join().unwrap(); + }); + } +} diff --git a/asynchronix/tests/model_scheduling.rs b/asynchronix/tests/model_scheduling.rs index 94f3ec1..6ff9b44 100644 --- a/asynchronix/tests/model_scheduling.rs +++ b/asynchronix/tests/model_scheduling.rs @@ -3,7 +3,7 @@ use std::time::Duration; use asynchronix::model::{Model, Output}; -use asynchronix::simulation::{EventQueue, Mailbox, SimInit}; +use asynchronix::simulation::{Mailbox, SimInit}; use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; #[test] @@ -27,8 +27,7 @@ fn model_schedule_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = EventQueue::new(); - model.output.connect_sink(&output); + let mut output = model.output.connect_stream().0; let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -72,8 +71,7 @@ fn model_cancel_future_keyed_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = EventQueue::new(); - model.output.connect_sink(&output); + let mut output = model.output.connect_stream().0; let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -118,8 +116,7 @@ fn model_cancel_same_time_keyed_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = EventQueue::new(); - model.output.connect_sink(&output); + let mut output = model.output.connect_stream().0; let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -160,8 +157,7 @@ fn model_schedule_periodic_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = EventQueue::new(); - model.output.connect_sink(&output); + let mut output = model.output.connect_stream().0; let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -210,8 +206,7 @@ fn model_cancel_periodic_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = EventQueue::new(); - model.output.connect_sink(&output); + let mut output = model.output.connect_stream().0; let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; diff --git a/asynchronix/tests/simulation_scheduling.rs b/asynchronix/tests/simulation_scheduling.rs index b8bd0d8..858f81e 100644 --- a/asynchronix/tests/simulation_scheduling.rs +++ b/asynchronix/tests/simulation_scheduling.rs @@ -3,7 +3,7 @@ use std::time::Duration; use asynchronix::model::{Model, Output}; -use asynchronix::simulation::{Address, EventQueue, Mailbox, SimInit, Simulation}; +use asynchronix::simulation::{Address, EventStream, Mailbox, SimInit, Simulation}; use asynchronix::time::MonotonicTime; // Input-to-output pass-through model. @@ -26,13 +26,12 @@ impl Model for PassThroughModel {} /// output) running as fast as possible. fn passthrough_bench( t0: MonotonicTime, -) -> (Simulation, Address>, EventQueue) { +) -> (Simulation, Address>, EventStream) { // Bench assembly. let mut model = PassThroughModel::new(); let mbox = Mailbox::new(); - let out_stream = EventQueue::new(); - model.output.connect_sink(&out_stream); + let out_stream = model.output.connect_stream().0; let addr = mbox.address(); let simu = SimInit::new().add_model(model, mbox).init(t0); @@ -244,14 +243,13 @@ fn timestamp_bench( ) -> ( Simulation, Address, - EventQueue<(Instant, SystemTime)>, + EventStream<(Instant, SystemTime)>, ) { // Bench assembly. let mut model = TimestampModel::default(); let mbox = Mailbox::new(); - let stamp_stream = EventQueue::new(); - model.stamp.connect_sink(&stamp_stream); + let stamp_stream = model.stamp.connect_stream().0; let addr = mbox.address(); let simu = SimInit::new()