diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 7b2c805..e739386 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -13,7 +13,6 @@ on: - 'asynchronix/src/model/ports/broadcaster.rs' - 'asynchronix/src/model/ports/broadcaster/**' - 'asynchronix/src/util/slot.rs' - - 'asynchronix/src/util/spsc_queue.rs' - 'asynchronix/src/util/sync_cell.rs' jobs: diff --git a/Cargo.toml b/Cargo.toml index 94974a1..c24bd10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,3 @@ [workspace] members = ["asynchronix"] +resolver = "2" diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 2d6b306..e3e7c59 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -28,6 +28,7 @@ dev-logs = [] [dependencies] async-event = "0.1" crossbeam-utils = "0.8" +crossbeam-queue = "0.3" diatomic-waker = "0.1" futures-task = "0.3" multishot = "0.3" diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index 8167717..dd482e8 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -36,7 +36,7 @@ use std::pin::Pin; use std::time::Duration; use asynchronix::model::{InitializedModel, Model, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; /// Water pump. @@ -364,7 +364,8 @@ fn main() { pump.flow_rate.connect(Tank::set_flow_rate, &tank_mbox); // Model handles for simulation. - let mut flow_rate = pump.flow_rate.connect_slot().0; + let mut flow_rate = EventSlot::new(); + pump.flow_rate.connect_sink(&flow_rate); let controller_addr = controller_mbox.address(); let tank_addr = tank_mbox.address(); diff --git a/asynchronix/examples/power_supply.rs b/asynchronix/examples/power_supply.rs index a477912..233913b 100644 --- a/asynchronix/examples/power_supply.rs +++ b/asynchronix/examples/power_supply.rs @@ -27,7 +27,7 @@ //! └──────────┘ //! ``` use asynchronix::model::{Model, Output, Requestor}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; use asynchronix::time::MonotonicTime; /// Power supply. @@ -124,10 +124,14 @@ fn main() { psu.pwr_out.connect(Load::pwr_in, &load3_mbox); // Model handles for simulation. - let mut psu_power = psu.power.connect_slot().0; - let mut load1_power = load1.power.connect_slot().0; - let mut load2_power = load2.power.connect_slot().0; - let mut load3_power = load3.power.connect_slot().0; + let mut psu_power = EventSlot::new(); + let mut load1_power = EventSlot::new(); + let mut load2_power = EventSlot::new(); + let mut load3_power = EventSlot::new(); + psu.power.connect_sink(&psu_power); + load1.power.connect_sink(&load1_power); + load2.power.connect_sink(&load2_power); + load3.power.connect_sink(&load3_power); let psu_addr = psu_mbox.address(); // Start time (arbitrary since models do not depend on absolute time). diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index c9937db..409365a 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -19,7 +19,7 @@ use std::pin::Pin; use std::time::Duration; use asynchronix::model::{InitializedModel, Model, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{EventQueue, Mailbox, SimInit}; use asynchronix::time::{MonotonicTime, Scheduler}; /// Stepper motor. @@ -200,7 +200,8 @@ fn main() { driver.current_out.connect(Motor::current_in, &motor_mbox); // Model handles for simulation. - let mut position = motor.position.connect_stream().0; + let mut position = EventQueue::new(); + motor.position.connect_sink(&position); let motor_addr = motor_mbox.address(); let driver_addr = driver_mbox.address(); diff --git a/asynchronix/src/channel.rs b/asynchronix/src/channel.rs index c732477..f4f8b79 100644 --- a/asynchronix/src/channel.rs +++ b/asynchronix/src/channel.rs @@ -255,8 +255,8 @@ impl Sender { /// All channels are guaranteed to have different identifiers at any given /// time, but an identifier may be reused after all handles to a channel /// have been dropped. - pub(crate) fn channel_id(&self) -> ChannelId { - ChannelId(NonZeroUsize::new(&*self.inner as *const Inner as usize).unwrap()) + pub(crate) fn channel_id(&self) -> NonZeroUsize { + NonZeroUsize::new(Arc::as_ptr(&self.inner) as usize).unwrap() } } diff --git a/asynchronix/src/executor/task/tests/general.rs b/asynchronix/src/executor/task/tests/general.rs index ba4e2e3..beee857 100644 --- a/asynchronix/src/executor/task/tests/general.rs +++ b/asynchronix/src/executor/task/tests/general.rs @@ -1,7 +1,6 @@ -use std::future::Future; use std::ops::Deref; use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::thread; diff --git a/asynchronix/src/executor/tests.rs b/asynchronix/src/executor/tests.rs index 9e21f8d..7f63b46 100644 --- a/asynchronix/src/executor/tests.rs +++ b/asynchronix/src/executor/tests.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; - use futures_channel::{mpsc, oneshot}; use futures_util::StreamExt; diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index c27bd70..c7ede82 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -193,7 +193,7 @@ //! # impl Model for Delay {} //! # } //! use std::time::Duration; -//! use asynchronix::simulation::{Mailbox, SimInit}; +//! use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; //! use asynchronix::time::MonotonicTime; //! //! use models::{Delay, Multiplier}; @@ -217,7 +217,8 @@ //! delay1.output.connect(Delay::input, &delay2_mbox); //! //! // Keep handles to the system input and output for the simulation. -//! let mut output_slot = delay2.output.connect_slot().0; +//! let mut output_slot = EventSlot::new(); +//! delay2.output.connect_sink(&output_slot); //! let input_address = multiplier1_mbox.address(); //! //! // Pick an arbitrary simulation start time and build the simulation. @@ -255,7 +256,7 @@ //! //! Simulation outputs can be monitored using //! [`EventSlot`](simulation::EventSlot)s and -//! [`EventStream`](simulation::EventStream)s, which can be connected to any +//! [`EventQueue`](simulation::EventQueue)s, which can be connected to any //! model's output port. While an event slot only gives access to the last value //! sent from a port, an event stream is an iterator that yields all events that //! were sent in first-in-first-out order. @@ -293,7 +294,7 @@ //! # impl Model for Delay {} //! # } //! # use std::time::Duration; -//! # use asynchronix::simulation::{Mailbox, SimInit}; +//! # use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; //! # use asynchronix::time::MonotonicTime; //! # use models::{Delay, Multiplier}; //! # let mut multiplier1 = Multiplier::default(); @@ -308,7 +309,8 @@ //! # multiplier1.output.connect(Multiplier::input, &multiplier2_mbox); //! # multiplier2.output.connect(Delay::input, &delay2_mbox); //! # delay1.output.connect(Delay::input, &delay2_mbox); -//! # let mut output_slot = delay2.output.connect_slot().0; +//! # let mut output_slot = EventSlot::new(); +//! # delay2.output.connect_sink(&output_slot); //! # let input_address = multiplier1_mbox.address(); //! # let t0 = MonotonicTime::EPOCH; //! # let mut simu = SimInit::new() diff --git a/asynchronix/src/model.rs b/asynchronix/src/model.rs index e5ea5d1..2414717 100644 --- a/asynchronix/src/model.rs +++ b/asynchronix/src/model.rs @@ -132,7 +132,7 @@ //! can be connected to input and requestor ports when assembling the simulation //! bench. However, input ports may instead be defined as private methods if //! they are only used by the model itself to schedule future actions (see the -//! [`Scheduler`](crate::time::Scheduler) examples). +//! [`Scheduler`] examples). //! //! Changing the signature of an input or replier port is not considered to //! alter the public interface of a model provided that the event, request and diff --git a/asynchronix/src/model/ports.rs b/asynchronix/src/model/ports.rs index f296f16..6b79539 100644 --- a/asynchronix/src/model/ports.rs +++ b/asynchronix/src/model/ports.rs @@ -14,18 +14,17 @@ //! ports should generally be preferred over requestor ports when possible. use std::fmt; -use std::sync::{Arc, Mutex}; mod broadcaster; mod sender; +use crate::model::ports::sender::EventSinkSender; use crate::model::{InputFn, Model, ReplierFn}; -use crate::simulation::{Address, EventSlot, EventStream}; -use crate::util::spsc_queue; +use crate::simulation::{Address, EventSink}; -use broadcaster::Broadcaster; +use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use self::sender::{EventSender, EventSlotSender, EventStreamSender, QuerySender}; +use self::sender::{InputSender, ReplierSender}; #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] /// Unique identifier for a connection between two ports. @@ -37,7 +36,7 @@ pub struct LineId(u64); /// methods that return no value. They broadcast events to all connected input /// ports. pub struct Output { - broadcaster: Broadcaster, + broadcaster: EventBroadcaster, next_line_id: u64, } @@ -62,40 +61,23 @@ impl Output { assert!(self.next_line_id != u64::MAX); let line_id = LineId(self.next_line_id); self.next_line_id += 1; - let sender = Box::new(EventSender::new(input, address.into().0)); + let sender = Box::new(InputSender::new(input, address.into().0)); self.broadcaster.add(sender, line_id); line_id } - /// Adds a connection to an event stream iterator. - pub fn connect_stream(&mut self) -> (EventStream, LineId) { + /// Adds a connection to an event sink such as an + /// [`EventSlot`](crate::simulation::EventSlot) or + /// [`EventQueue`](crate::simulation::EventQueue). + pub fn connect_sink>(&mut self, sink: &S) -> LineId { assert!(self.next_line_id != u64::MAX); let line_id = LineId(self.next_line_id); self.next_line_id += 1; - - let (producer, consumer) = spsc_queue::spsc_queue(); - let sender = Box::new(EventStreamSender::new(producer)); - let event_stream = EventStream::new(consumer); - + let sender = Box::new(EventSinkSender::new(sink.writer())); self.broadcaster.add(sender, line_id); - (event_stream, line_id) - } - - /// Adds a connection to an event slot. - pub fn connect_slot(&mut self) -> (EventSlot, LineId) { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; - - let slot = Arc::new(Mutex::new(None)); - let sender = Box::new(EventSlotSender::new(slot.clone())); - let event_slot = EventSlot::new(slot); - - self.broadcaster.add(sender, line_id); - - (event_slot, line_id) + line_id } /// Removes the connection specified by the `LineId` parameter. @@ -118,14 +100,14 @@ impl Output { /// Broadcasts an event to all connected input ports. pub async fn send(&mut self, arg: T) { - self.broadcaster.broadcast_event(arg).await.unwrap(); + self.broadcaster.broadcast(arg).await.unwrap(); } } impl Default for Output { fn default() -> Self { Self { - broadcaster: Broadcaster::default(), + broadcaster: EventBroadcaster::default(), next_line_id: 0, } } @@ -143,7 +125,7 @@ impl fmt::Debug for Output { /// model methods that return a value. They broadcast queries to all connected /// replier ports. pub struct Requestor { - broadcaster: Broadcaster, + broadcaster: QueryBroadcaster, next_line_id: u64, } @@ -168,7 +150,7 @@ impl Requestor { assert!(self.next_line_id != u64::MAX); let line_id = LineId(self.next_line_id); self.next_line_id += 1; - let sender = Box::new(QuerySender::new(replier, address.into().0)); + let sender = Box::new(ReplierSender::new(replier, address.into().0)); self.broadcaster.add(sender, line_id); line_id @@ -194,14 +176,14 @@ impl Requestor { /// Broadcasts a query to all connected replier ports. pub async fn send(&mut self, arg: T) -> impl Iterator + '_ { - self.broadcaster.broadcast_query(arg).await.unwrap() + self.broadcaster.broadcast(arg).await.unwrap() } } impl Default for Requestor { fn default() -> Self { Self { - broadcaster: Broadcaster::default(), + broadcaster: QueryBroadcaster::default(), next_line_id: 0, } } diff --git a/asynchronix/src/model/ports/broadcaster.rs b/asynchronix/src/model/ports/broadcaster.rs index ff347cc..22f525d 100644 --- a/asynchronix/src/model/ports/broadcaster.rs +++ b/asynchronix/src/model/ports/broadcaster.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; use std::task::{Context, Poll}; @@ -26,28 +27,17 @@ mod task_set; /// exploits this behavior by waking the main broadcast future only when all /// sender futures have been awaken, which strongly reduces overhead since /// waking a sender task does not actually schedule it on the executor. -pub(super) struct Broadcaster { +pub(super) struct BroadcasterInner { /// The list of senders with their associated line identifier. senders: Vec<(LineId, Box>)>, /// Fields explicitly borrowed by the `BroadcastFuture`. shared: Shared, + /// Phantom types. + _phantom_event: PhantomData, + _phantom_reply: PhantomData, } -impl Broadcaster { - /// Broadcasts an event to all addresses. - pub(super) async fn broadcast_event(&mut self, arg: T) -> Result<(), BroadcastError> { - match self.senders.as_mut_slice() { - // No sender. - [] => Ok(()), - // One sender. - [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), - // Multiple senders. - _ => self.broadcast(arg).await, - } - } -} - -impl Broadcaster { +impl BroadcasterInner { /// Adds a new sender associated to the specified identifier. /// /// # Panics @@ -93,55 +83,25 @@ impl Broadcaster { self.senders.len() } - /// Broadcasts a query to all addresses and collect all responses. - pub(super) async fn broadcast_query( - &mut self, - arg: T, - ) -> Result + '_, BroadcastError> { - match self.senders.as_mut_slice() { - // No sender. - [] => {} - // One sender. - [sender] => { - let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.shared.futures_env[0].output = Some(output); - } - // Multiple senders. - _ => self.broadcast(arg).await?, - }; - - // At this point all outputs should be available so `unwrap` can be - // called on the output of each future. - let outputs = self - .shared - .futures_env - .iter_mut() - .map(|t| t.output.take().unwrap()); - - Ok(outputs) - } - /// Efficiently broadcasts a message or a query to multiple addresses. /// /// This method does not collect the responses from queries. fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> { - let futures_count = self.senders.len(); let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. - for (i, (sender, futures_env)) in self + let mut iter = self .senders .iter_mut() - .zip(self.shared.futures_env.iter_mut()) - .enumerate() - { + .zip(self.shared.futures_env.iter_mut()); + while let Some((sender, futures_env)) = iter.next() { let future_cache = futures_env .storage .take() .unwrap_or_else(|| RecycleBox::new(())); // Move the argument rather than clone it for the last future. - if i + 1 == futures_count { + if iter.len() == 0 { let future: RecycleBox> + Send + '_> = coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg))); @@ -161,7 +121,7 @@ impl Broadcaster { } } -impl Default for Broadcaster { +impl Default for BroadcasterInner { /// Creates an empty `Broadcaster` object. fn default() -> Self { let wake_sink = WakeSink::new(); @@ -175,6 +135,141 @@ impl Default for Broadcaster { futures_env: Vec::new(), storage: None, }, + _phantom_event: PhantomData, + _phantom_reply: PhantomData, + } + } +} + +/// An object that can efficiently broadcast events to several input ports. +/// +/// See `BroadcasterInner` for implementation details. +pub(super) struct EventBroadcaster { + /// The broadcaster core object. + inner: BroadcasterInner, +} + +impl EventBroadcaster { + /// Adds a new sender associated to the specified identifier. + /// + /// # Panics + /// + /// This method will panic if the total count of senders would reach + /// `u32::MAX - 1`. + pub(super) fn add(&mut self, sender: Box>, id: LineId) { + self.inner.add(sender, id); + } + + /// Removes the first sender with the specified identifier, if any. + /// + /// Returns `true` if there was indeed a sender associated to the specified + /// identifier. + pub(super) fn remove(&mut self, id: LineId) -> bool { + self.inner.remove(id) + } + + /// Removes all senders. + pub(super) fn clear(&mut self) { + self.inner.clear(); + } + + /// Returns the number of connected senders. + pub(super) fn len(&self) -> usize { + self.inner.len() + } + + /// Broadcasts an event to all addresses. + pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), BroadcastError> { + match self.inner.senders.as_mut_slice() { + // No sender. + [] => Ok(()), + // One sender. + [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), + // Multiple senders. + _ => self.inner.broadcast(arg).await, + } + } +} + +impl Default for EventBroadcaster { + fn default() -> Self { + Self { + inner: BroadcasterInner::default(), + } + } +} + +/// An object that can efficiently broadcast queries to several replier ports. +/// +/// See `BroadcasterInner` for implementation details. +pub(super) struct QueryBroadcaster { + /// The broadcaster core object. + inner: BroadcasterInner, +} + +impl QueryBroadcaster { + /// Adds a new sender associated to the specified identifier. + /// + /// # Panics + /// + /// This method will panic if the total count of senders would reach + /// `u32::MAX - 1`. + pub(super) fn add(&mut self, sender: Box>, id: LineId) { + self.inner.add(sender, id); + } + + /// Removes the first sender with the specified identifier, if any. + /// + /// Returns `true` if there was indeed a sender associated to the specified + /// identifier. + pub(super) fn remove(&mut self, id: LineId) -> bool { + self.inner.remove(id) + } + + /// Removes all senders. + pub(super) fn clear(&mut self) { + self.inner.clear(); + } + + /// Returns the number of connected senders. + pub(super) fn len(&self) -> usize { + self.inner.len() + } + + /// Broadcasts a query to all addresses and collect all responses. + pub(super) async fn broadcast( + &mut self, + arg: T, + ) -> Result + '_, BroadcastError> { + match self.inner.senders.as_mut_slice() { + // No sender. + [] => {} + // One sender. + [sender] => { + let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; + self.inner.shared.futures_env[0].output = Some(output); + } + // Multiple senders. + _ => self.inner.broadcast(arg).await?, + }; + + // At this point all outputs should be available so `unwrap` can be + // called on the output of each future. + let outputs = self + .inner + .shared + .futures_env + .iter_mut() + .map(|t| t.output.take().unwrap()); + + Ok(outputs) + } +} + +impl Default for QueryBroadcaster { + fn default() -> Self { + Self { + inner: BroadcasterInner::default(), } } } @@ -323,7 +418,7 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { let scheduled_tasks = match this .shared .task_set - .steal_scheduled(this.pending_futures_count) + .take_scheduled(this.pending_futures_count) { Some(st) => st, None => return Poll::Pending, @@ -408,9 +503,7 @@ mod tests { use futures_executor::block_on; - use super::super::sender::QuerySender; use crate::channel::Receiver; - use crate::model::Model; use crate::time::Scheduler; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; @@ -441,18 +534,18 @@ mod tests { const N_RECV: usize = 4; let mut mailboxes = Vec::new(); - let mut broadcaster = Broadcaster::default(); + let mut broadcaster = EventBroadcaster::default(); for id in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(EventSender::new(Counter::inc, address)); + let sender = Box::new(InputSender::new(Counter::inc, address)); broadcaster.add(sender, LineId(id as u64)); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - block_on(broadcaster.broadcast_event(1)).unwrap(); + block_on(broadcaster.broadcast(1)).unwrap(); }); let counter = Arc::new(AtomicUsize::new(0)); @@ -489,18 +582,18 @@ mod tests { const N_RECV: usize = 4; let mut mailboxes = Vec::new(); - let mut broadcaster = Broadcaster::default(); + let mut broadcaster = QueryBroadcaster::default(); for id in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(QuerySender::new(Counter::fetch_inc, address)); + let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); broadcaster.add(sender, LineId(id as u64)); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - let iter = block_on(broadcaster.broadcast_query(1)).unwrap(); + let iter = block_on(broadcaster.broadcast(1)).unwrap(); let sum = iter.fold(0, |acc, val| acc + val); assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)} @@ -609,12 +702,12 @@ mod tests { let (test_event2, waker2) = test_event::(); let (test_event3, waker3) = test_event::(); - let mut broadcaster = Broadcaster::default(); + let mut broadcaster = QueryBroadcaster::default(); broadcaster.add(Box::new(test_event1), LineId(1)); broadcaster.add(Box::new(test_event2), LineId(2)); broadcaster.add(Box::new(test_event3), LineId(3)); - let mut fut = Box::pin(broadcaster.broadcast_query(())); + let mut fut = Box::pin(broadcaster.broadcast(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); let is_scheduled_waker = is_scheduled.clone(); @@ -684,11 +777,11 @@ mod tests { let (test_event1, waker1) = test_event::(); let (test_event2, waker2) = test_event::(); - let mut broadcaster = Broadcaster::default(); + let mut broadcaster = QueryBroadcaster::default(); broadcaster.add(Box::new(test_event1), LineId(1)); broadcaster.add(Box::new(test_event2), LineId(2)); - let mut fut = Box::pin(broadcaster.broadcast_query(())); + let mut fut = Box::pin(broadcaster.broadcast(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); let is_scheduled_waker = is_scheduled.clone(); diff --git a/asynchronix/src/model/ports/broadcaster/task_set.rs b/asynchronix/src/model/ports/broadcaster/task_set.rs index 6538ee4..9a978f1 100644 --- a/asynchronix/src/model/ports/broadcaster/task_set.rs +++ b/asynchronix/src/model/ports/broadcaster/task_set.rs @@ -21,31 +21,37 @@ const COUNTDOWN_MASK: u64 = !INDEX_MASK; /// scheduled tasks. const COUNTDOWN_ONE: u64 = 1 << 32; -/// A set of tasks that may be scheduled cheaply and can be requested to wake a -/// parent task only when a given amount of tasks have been scheduled. +/// An object for the efficient management of a set of tasks scheduled +/// concurrently. /// -/// This object maintains both a list of all active tasks and a list of the -/// subset of active tasks currently scheduled. The latter is stored in a -/// Treiber stack which links tasks through indices rather than pointers. Using -/// indices has two advantages: (i) it enables a fully safe implementation and -/// (ii) it makes it possible to use a single CAS to simultaneously move the -/// head and decrement the outstanding amount of tasks to be scheduled before -/// the parent task is notified. +/// The algorithm used by `TaskSet` is designed to wake up the parent task as +/// seldom as possible, ideally only when all non-completed sub-tasks have been +/// scheduled (awaken). +/// +/// A `TaskSet` maintains both a vector-based list of tasks (or more accurately, +/// task waker handles) and a linked list of the subset of tasks that are +/// currently scheduled. The latter is stored in a vector-based Treiber stack +/// which links tasks through indices rather than pointers. Using indices has +/// two advantages: (i) it makes a fully safe implementation possible and (ii) +/// it can take advantage of a single CAS to simultaneously move the head and +/// decrement the outstanding amount of tasks to be scheduled before the parent +/// task is notified. pub(super) struct TaskSet { - /// Set of all active tasks, scheduled or not. + /// Set of all tasks, scheduled or not. /// - /// In some rare cases, the back of the vector can also contain inactive - /// (retired) tasks. + /// In some cases, the use of `resize()` to shrink the task set may leave + /// inactive tasks at the back of the vector, in which case the length of + /// the vector will exceed `task_count`. tasks: Vec>, /// Head of the Treiber stack for scheduled tasks. /// - /// The lower bits specify the index of the last scheduled task, if any, - /// whereas the upper bits specify the countdown of tasks still to be - /// scheduled before the parent task is notified. + /// The lower 32 bits specify the index of the last scheduled task (head), + /// if any, whereas the upper 32 bits specify the countdown of tasks still + /// to be scheduled before the parent task is notified. head: Arc, /// A notifier used to wake the parent task. notifier: WakeSource, - /// Count of all active tasks, scheduled or not. + /// Count of all tasks, scheduled or not. task_count: usize, } @@ -65,21 +71,25 @@ impl TaskSet { } } - /// Steals scheduled tasks if any and returns an iterator over their - /// indices, otherwise returns `None` and requests a notification to be sent - /// after `notify_count` tasks have been scheduled. + /// Take all scheduled tasks and returns an iterator over their indices, or + /// if there are no currently scheduled tasks returns `None` and requests a + /// notification to be sent after `pending_task_count` tasks have been + /// scheduled. /// - /// In all cases, the list of scheduled tasks is guaranteed to be empty - /// after this call. + /// In all cases, the list of scheduled tasks will be empty right after this + /// call. /// - /// If some tasks were stolen, no notification is requested. + /// If there were scheduled tasks, no notification is requested because this + /// method is expected to be called repeatedly until it returns `None`. + /// Failure to do so will result in missed notifications. /// - /// If no tasks were stolen, the notification is guaranteed to be triggered - /// no later than after `notify_count` tasks have been scheduled, though it - /// may in some cases be triggered earlier. If the specified `notify_count` - /// is zero then no notification is requested. - pub(super) fn steal_scheduled(&self, notify_count: usize) -> Option> { - let countdown = u32::try_from(notify_count).unwrap(); + /// If no tasks were scheduled, the notification is guaranteed to be + /// triggered no later than after `pending_task_count` tasks have been + /// scheduled, though it may in some cases be triggered earlier. If the + /// specified `pending_task_count` is zero then no notification is + /// requested. + pub(super) fn take_scheduled(&self, pending_task_count: usize) -> Option> { + let countdown = u32::try_from(pending_task_count).unwrap(); let mut head = self.head.load(Ordering::Relaxed); loop { @@ -126,13 +136,13 @@ impl TaskSet { if self.head.load(Ordering::Relaxed) != EMPTY as u64 { // Dropping the iterator ensures that all tasks are put in the // sleeping state. - let _ = self.steal_scheduled(0); + let _ = self.take_scheduled(0); } } - /// Modify the number of active tasks. + /// Set the number of active tasks. /// - /// Note that this method may discard all scheduled tasks. + /// Note that this method may discard already scheduled tasks. /// /// # Panic /// @@ -200,7 +210,7 @@ impl TaskSet { } } - /// Returns `true` if one or more tasks are currently scheduled. + /// Returns `true` if one or more sub-tasks are currently scheduled. pub(super) fn has_scheduled(&self) -> bool { // Ordering: the content of the head is only used as an advisory flag so // Relaxed ordering is sufficient. diff --git a/asynchronix/src/model/ports/sender.rs b/asynchronix/src/model/ports/sender.rs index d5dc2a2..e63d055 100644 --- a/asynchronix/src/model/ports/sender.rs +++ b/asynchronix/src/model/ports/sender.rs @@ -4,22 +4,23 @@ use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use recycle_box::{coerce_box, RecycleBox}; use crate::channel; use crate::model::{InputFn, Model, ReplierFn}; -use crate::util::spsc_queue; +use crate::simulation::EventSinkWriter; -/// Abstraction over `EventSender` and `QuerySender`. +/// An event or query sender abstracting over the target model and input or +/// replier method. pub(super) trait Sender: Send { + /// Asynchronously send the event or request. fn send(&mut self, arg: T) -> RecycledFuture<'_, Result>; } -/// An object that can send a payload to a model. -pub(super) struct EventSender { +/// An object that can send events to an input port. +pub(super) struct InputSender { func: F, sender: channel::Sender, fut_storage: Option>, @@ -27,7 +28,7 @@ pub(super) struct EventSender { _phantom_closure_marker: PhantomData, } -impl EventSender +impl InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S>, @@ -44,15 +45,15 @@ where } } -impl Sender for EventSender +impl Sender for InputSender where M: Model, - F: for<'a> InputFn<'a, M, T, S> + Copy, + F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + 'static, - S: Send, + S: Send + 'static, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let func = self.func; + let func = self.func.clone(); let fut = self.sender.send(move |model, scheduler, recycle_box| { let fut = func.call(model, arg, scheduler); @@ -66,8 +67,8 @@ where } } -/// An object that can send a payload to a model and retrieve a response. -pub(super) struct QuerySender { +/// An object that can send a request to a replier port and retrieve a response. +pub(super) struct ReplierSender { func: F, sender: channel::Sender, receiver: multishot::Receiver, @@ -76,7 +77,7 @@ pub(super) struct QuerySender { _phantom_closure_marker: PhantomData, } -impl QuerySender +impl ReplierSender where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S>, @@ -95,16 +96,16 @@ where } } -impl Sender for QuerySender +impl Sender for ReplierSender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S> + Copy, + F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, T: Send + 'static, R: Send + 'static, S: Send, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { - let func = self.func; + let func = self.func.clone(); let sender = &mut self.sender; let reply_receiver = &mut self.receiver; let fut_storage = &mut self.fut_storage; @@ -134,59 +135,32 @@ where } } -/// An object that can send a payload to an unbounded queue. -pub(super) struct EventStreamSender { - producer: spsc_queue::Producer, - fut_storage: Option>, -} - -impl EventStreamSender { - pub(super) fn new(producer: spsc_queue::Producer) -> Self { - Self { - producer, - fut_storage: None, - } - } -} - -impl Sender for EventStreamSender -where - T: Send + 'static, -{ - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let producer = &mut self.producer; - - RecycledFuture::new(&mut self.fut_storage, async move { - producer.push(arg).map_err(|_| SendError {}) - }) - } -} - /// An object that can send a payload to a mutex-protected slot. -pub(super) struct EventSlotSender { - slot: Arc>>, +pub(super) struct EventSinkSender> { + writer: W, fut_storage: Option>, + _phantom_event: PhantomData, } -impl EventSlotSender { - pub(super) fn new(slot: Arc>>) -> Self { +impl> EventSinkSender { + pub(super) fn new(writer: W) -> Self { Self { - slot, + writer, fut_storage: None, + _phantom_event: PhantomData, } } } -impl Sender for EventSlotSender +impl> Sender for EventSinkSender where T: Send + 'static, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let slot = &*self.slot; + let writer = &self.writer; RecycledFuture::new(&mut self.fut_storage, async move { - let mut slot = slot.lock().unwrap(); - *slot = Some(arg); + writer.write(arg); Ok(()) }) diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 1fba49f..a99e2e7 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -87,13 +87,12 @@ //! Although uncommon, there is sometimes a need for connecting and/or //! disconnecting models after they have been migrated to the simulation. //! Likewise, one may want to connect or disconnect an [`EventSlot`] or -//! [`EventStream`] after the simulation has been instantiated. +//! [`EventQueue`] after the simulation has been instantiated. //! //! There is actually a very simple solution to this problem: since the -//! [`InputFn`](crate::model::InputFn) trait also matches closures of type -//! `FnOnce(&mut impl Model)`, it is enough to invoke -//! [`Simulation::send_event()`] with a closure that connects or disconnects a -//! port, such as: +//! [`InputFn`] trait also matches closures of type `FnOnce(&mut impl Model)`, +//! it is enough to invoke [`Simulation::send_event()`] with a closure that +//! connects or disconnects a port, such as: //! //! ``` //! # use asynchronix::model::{Model, Output}; @@ -119,13 +118,13 @@ //! &modelA_addr //! ); //! ``` -mod endpoints; mod mailbox; mod sim_init; +mod sink; -pub use endpoints::{EventSlot, EventStream}; pub use mailbox::{Address, Mailbox}; pub use sim_init::SimInit; +pub use sink::{EventQueue, EventSink, EventSinkWriter, EventSlot}; use std::error::Error; use std::fmt; @@ -151,8 +150,7 @@ use crate::util::sync_cell::SyncCell; /// [`SimInit::init()`](crate::simulation::SimInit::init) or /// [`SimInit::init_with_clock()`](crate::simulation::SimInit::init_with_clock) /// method on a simulation initializer. It contains an asynchronous executor -/// that runs all simulation models added beforehand to -/// [`SimInit`](crate::simulation::SimInit). +/// that runs all simulation models added beforehand to [`SimInit`]. /// /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation diff --git a/asynchronix/src/simulation/endpoints.rs b/asynchronix/src/simulation/endpoints.rs deleted file mode 100644 index eed6c10..0000000 --- a/asynchronix/src/simulation/endpoints.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::fmt; -use std::sync::{Arc, Mutex, TryLockError, TryLockResult}; - -use crate::util::spsc_queue; - -/// An iterator that returns all events that were broadcast by an output port. -/// -/// Events are returned in first-in-first-out order. Note that even if the -/// iterator returns `None`, it may still produce more items after simulation -/// time is incremented. -pub struct EventStream { - consumer: spsc_queue::Consumer, -} - -impl EventStream { - /// Creates a new `EventStream`. - pub(crate) fn new(consumer: spsc_queue::Consumer) -> Self { - Self { consumer } - } -} - -impl Iterator for EventStream { - type Item = T; - - fn next(&mut self) -> Option { - self.consumer.pop() - } -} - -impl fmt::Debug for EventStream { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("EventStream").finish_non_exhaustive() - } -} - -/// A single-value slot that holds the last event that was broadcast by an -/// output port. -pub struct EventSlot { - slot: Arc>>, -} - -impl EventSlot { - /// Creates a new `EventSlot`. - pub(crate) fn new(slot: Arc>>) -> Self { - Self { slot } - } - - /// Take the last event, if any, leaving the slot empty. - /// - /// Note that even after the event is taken, it may become populated anew - /// after simulation time is incremented. - pub fn take(&mut self) -> Option { - // We don't actually need to take self by mutable reference, but this - // signature is probably less surprising for the user and more - // consistent with `EventStream`. It also prevents multi-threaded - // access, which would be likely to be misused. - match self.slot.try_lock() { - TryLockResult::Ok(mut v) => v.take(), - TryLockResult::Err(TryLockError::WouldBlock) => None, - TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(), - } - } -} - -impl fmt::Debug for EventSlot { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("EventSlot").finish_non_exhaustive() - } -} diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index dbfa7ff..8a57b35 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -69,9 +69,8 @@ impl SimInit { Simulation::new(self.executor, self.scheduler_queue, self.time) } - /// Builds a simulation synchronized with the provided - /// [`Clock`](crate::time::Clock) and initialized at the specified - /// simulation time, executing the + /// Builds a simulation synchronized with the provided [`Clock`] and + /// initialized at the specified simulation time, executing the /// [`Model::init()`](crate::model::Model::init) method on all model /// initializers. pub fn init_with_clock( diff --git a/asynchronix/src/simulation/sink.rs b/asynchronix/src/simulation/sink.rs new file mode 100644 index 0000000..f1050b4 --- /dev/null +++ b/asynchronix/src/simulation/sink.rs @@ -0,0 +1,171 @@ +use std::fmt; +use std::sync::{Arc, Mutex, TryLockError, TryLockResult}; + +use crossbeam_queue::SegQueue; + +/// A simulation endpoint that can receive events sent by model outputs. +/// +/// An `EventSink` can be thought of as a self-standing input meant to +/// externally monitor the simulated system. +pub trait EventSink { + /// Writer handle to an event sink. + type Writer: EventSinkWriter; + + /// Returns the writer handle associated to this sink. + fn writer(&self) -> Self::Writer; +} + +/// A writer handle to an event sink. +pub trait EventSinkWriter: Send + Sync + 'static { + /// Writes a value to the associated sink. + fn write(&self, event: T); +} + +/// An event sink iterator that returns all events that were broadcast by +/// connected output ports. +/// +/// Events are returned in first-in-first-out order. Note that even if the +/// iterator returns `None`, it may still produce more items in the future (in +/// other words, it is not a [`FusedIterator`](std::iter::FusedIterator)). +pub struct EventQueue { + queue: Arc>, +} + +impl EventQueue { + /// Creates a new `EventStream`. + pub fn new() -> Self { + Self { + queue: Arc::new(SegQueue::new()), + } + } +} + +impl EventSink for EventQueue { + type Writer = EventQueueWriter; + + fn writer(&self) -> Self::Writer { + EventQueueWriter { + queue: self.queue.clone(), + } + } +} + +impl Iterator for EventQueue { + type Item = T; + + fn next(&mut self) -> Option { + self.queue.pop() + } +} + +impl Default for EventQueue { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for EventQueue { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EventQueue").finish_non_exhaustive() + } +} + +/// A producer handle of an `EventStream`. +pub struct EventQueueWriter { + queue: Arc>, +} + +impl EventSinkWriter for EventQueueWriter { + /// Pushes an event onto the queue. + fn write(&self, event: T) { + self.queue.push(event); + } +} + +impl fmt::Debug for EventQueueWriter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EventQueueWriter").finish_non_exhaustive() + } +} + +/// A single-value event sink that holds the last event that was broadcast by +/// any of the connected output ports. +pub struct EventSlot { + slot: Arc>>, +} + +impl EventSlot { + /// Creates a new `EventSlot`. + pub fn new() -> Self { + Self { + slot: Arc::new(Mutex::new(None)), + } + } + + /// Take the last event, if any, leaving the slot empty. + /// + /// Note that even after the event is taken, it may become populated anew. + pub fn take(&mut self) -> Option { + // We don't actually need to take self by mutable reference, but this + // signature is probably less surprising for the user and more + // consistent with `EventStream`. It also prevents multi-threaded + // access, which would be likely to be misused. + match self.slot.try_lock() { + TryLockResult::Ok(mut v) => v.take(), + TryLockResult::Err(TryLockError::WouldBlock) => None, + TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(), + } + } +} + +impl EventSink for EventSlot { + type Writer = EventSlotWriter; + + /// Returns a writer handle. + fn writer(&self) -> EventSlotWriter { + EventSlotWriter { + slot: self.slot.clone(), + } + } +} + +impl Default for EventSlot { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for EventSlot { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EventSlot").finish_non_exhaustive() + } +} + +/// A writer handle of an `EventSlot`. +pub struct EventSlotWriter { + slot: Arc>>, +} + +impl EventSinkWriter for EventSlotWriter { + /// Write an event into the slot. + fn write(&self, event: T) { + // Why do we just use `try_lock` and abandon if the lock is taken? The + // reason is that (i) the reader is never supposed to access the slot + // when the simulation runs and (ii) as a rule the simulator does not + // warrant fairness when concurrently writing to an input. Therefore, if + // the mutex is already locked when this writer attempts to lock it, it + // means another writer is concurrently writing an event, and that event + // is just as legitimate as ours so there is not need to overwrite it. + match self.slot.try_lock() { + TryLockResult::Ok(mut v) => *v = Some(event), + TryLockResult::Err(TryLockError::WouldBlock) => {} + TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(), + } + } +} + +impl fmt::Debug for EventSlotWriter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EventStreamWriter").finish_non_exhaustive() + } +} diff --git a/asynchronix/src/time/scheduler.rs b/asynchronix/src/time/scheduler.rs index 3ab2ead..24332af 100644 --- a/asynchronix/src/time/scheduler.rs +++ b/asynchronix/src/time/scheduler.rs @@ -3,7 +3,7 @@ use std::error::Error; use std::fmt; use std::future::Future; -use std::marker::PhantomData; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -13,7 +13,7 @@ use std::time::Duration; use pin_project_lite::pin_project; use recycle_box::{coerce_box, RecycleBox}; -use crate::channel::{ChannelId, Sender}; +use crate::channel::Sender; use crate::executor::Executor; use crate::model::{InputFn, Model}; use crate::time::{MonotonicTime, TearableAtomicTime}; @@ -21,7 +21,15 @@ use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCellReader; /// Shorthand for the scheduler queue type. -pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, ChannelId), Box>; + +// Why use both time and channel ID as the key? The short answer is that this +// ensures that events targeting the same channel are sent in the order they +// were scheduled. More precisely, this ensures that events targeting the same +// channel are ordered contiguously in the priority queue, which in turns allows +// the event loop to easily aggregate such events into single futures and thus +// control their relative order of execution. +pub(crate) type SchedulerQueue = + PriorityQueue<(MonotonicTime, NonZeroUsize), Box>; /// Trait abstracting over time-absolute and time-relative deadlines. /// @@ -479,7 +487,7 @@ impl Error for SchedulingError {} /// Schedules an event at a future time. /// -/// This method does not check whether the specified time lies in the future +/// This function does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_event_at_unchecked( time: MonotonicTime, @@ -495,7 +503,7 @@ pub(crate) fn schedule_event_at_unchecked( { let channel_id = sender.channel_id(); - let event_dispatcher = Box::new(new_event_dispatcher(func, arg, sender)); + let event_dispatcher = Box::new(EventDispatcher::new(dispatch_event(func, arg, sender))); let mut scheduler_queue = scheduler_queue.lock().unwrap(); scheduler_queue.insert((time, channel_id), event_dispatcher); @@ -503,7 +511,7 @@ pub(crate) fn schedule_event_at_unchecked( /// Schedules an event at a future time, returning an event key. /// -/// This method does not check whether the specified time lies in the future +/// This function does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_keyed_event_at_unchecked( time: MonotonicTime, @@ -521,10 +529,8 @@ where let event_key = EventKey::new(); let channel_id = sender.channel_id(); let event_dispatcher = Box::new(KeyedEventDispatcher::new( + |ek| dispatch_keyed_event(ek, func, arg, sender), event_key.clone(), - func, - arg, - sender, )); let mut scheduler_queue = scheduler_queue.lock().unwrap(); @@ -535,7 +541,7 @@ where /// Schedules a periodic event at a future time. /// -/// This method does not check whether the specified time lies in the future +/// This function does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_periodic_event_at_unchecked( time: MonotonicTime, @@ -552,7 +558,10 @@ pub(crate) fn schedule_periodic_event_at_unchecked( { let channel_id = sender.channel_id(); - let event_dispatcher = Box::new(PeriodicEventDispatcher::new(func, arg, sender, period)); + let event_dispatcher = Box::new(PeriodicEventDispatcher::new( + || dispatch_event(func, arg, sender), + period, + )); let mut scheduler_queue = scheduler_queue.lock().unwrap(); scheduler_queue.insert((time, channel_id), event_dispatcher); @@ -560,7 +569,7 @@ pub(crate) fn schedule_periodic_event_at_unchecked( /// Schedules an event at a future time, returning an event key. /// -/// This method does not check whether the specified time lies in the future +/// This function does not check whether the specified time lies in the future /// of the current simulation time. pub(crate) fn schedule_periodic_keyed_event_at_unchecked( time: MonotonicTime, @@ -579,11 +588,9 @@ where let event_key = EventKey::new(); let channel_id = sender.channel_id(); let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new( - event_key.clone(), - func, - arg, - sender, + |ek| dispatch_keyed_event(ek, func, arg, sender), period, + event_key.clone(), )); let mut scheduler_queue = scheduler_queue.lock().unwrap(); @@ -614,8 +621,8 @@ pub(crate) trait ScheduledEvent: Send { } pin_project! { - /// Object that can be converted to a future dispatching a non-cancellable - /// event. + /// An object that can be converted to a future dispatching a + /// non-cancellable event. /// /// Note that this particular event dispatcher is in fact already a future: /// since the future cannot be cancelled and the dispatcher does not need to @@ -627,23 +634,14 @@ pin_project! { } } -/// Constructs a new `EventDispatcher`. -/// -/// Due to some limitations of type inference or of my understanding of it, the -/// constructor for this event dispatchers is a freestanding function. -fn new_event_dispatcher( - func: F, - arg: T, - sender: Sender, -) -> EventDispatcher> +impl EventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + F: Future + Send + 'static, { - let fut = dispatch_event(func, arg, sender); - - EventDispatcher { fut } + /// Constructs a new `EventDispatcher`. + pub(crate) fn new(fut: F) -> Self { + EventDispatcher { fut } + } } impl Future for EventDispatcher @@ -677,112 +675,78 @@ where } } -/// Object that can be converted to a future dispatching a non-cancellable periodic -/// event. -pub(crate) struct PeriodicEventDispatcher +/// An object that can be converted to a future dispatching a non-cancellable, +/// periodic event. +pub(crate) struct PeriodicEventDispatcher where - M: Model, + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, { - func: F, - arg: T, - sender: Sender, + /// A clonable generator for the dispatching future. + gen: G, + /// The event repetition period. period: Duration, - _input_kind: PhantomData, } -impl PeriodicEventDispatcher +impl PeriodicEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, { /// Constructs a new `PeriodicEventDispatcher`. - fn new(func: F, arg: T, sender: Sender, period: Duration) -> Self { - Self { - func, - arg, - sender, - period, - _input_kind: PhantomData, - } + fn new(gen: G, period: Duration) -> Self { + Self { gen, period } } } -impl ScheduledEvent for PeriodicEventDispatcher +impl ScheduledEvent for PeriodicEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, + G: (FnOnce() -> F) + Clone + Send + 'static, + F: Future + Send + 'static, { fn is_cancelled(&self) -> bool { false } fn next(&self) -> Option<(Box, Duration)> { - let event = Box::new(Self::new( - self.func.clone(), - self.arg.clone(), - self.sender.clone(), - self.period, - )); + let event = Box::new(Self::new(self.gen.clone(), self.period)); Some((event, self.period)) } fn into_future(self: Box) -> Pin + Send>> { - let Self { - func, arg, sender, .. - } = *self; - - Box::pin(dispatch_event(func, arg, sender)) + Box::pin((self.gen)()) } fn spawn_and_forget(self: Box, executor: &Executor) { - let Self { - func, arg, sender, .. - } = *self; - - let fut = dispatch_event(func, arg, sender); - executor.spawn_and_forget(fut); + executor.spawn_and_forget((self.gen)()); } } -/// Object that can be converted to a future dispatching a cancellable event. -pub(crate) struct KeyedEventDispatcher +/// An object that can be converted to a future dispatching a cancellable event. +pub(crate) struct KeyedEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + G: (FnOnce(EventKey) -> F) + Send + 'static, + F: Future + Send + 'static, { + /// A generator for the dispatching future. + gen: G, + /// The event cancellation key. event_key: EventKey, - func: F, - arg: T, - sender: Sender, - _input_kind: PhantomData, } -impl KeyedEventDispatcher +impl KeyedEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + G: (FnOnce(EventKey) -> F) + Send + 'static, + F: Future + Send + 'static, { /// Constructs a new `KeyedEventDispatcher`. - fn new(event_key: EventKey, func: F, arg: T, sender: Sender) -> Self { - Self { - event_key, - func, - arg, - sender, - _input_kind: PhantomData, - } + fn new(gen: G, event_key: EventKey) -> Self { + Self { gen, event_key } } } -impl ScheduledEvent for KeyedEventDispatcher +impl ScheduledEvent for KeyedEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, + G: (FnOnce(EventKey) -> F) + Send + 'static, + F: Future + Send + 'static, { fn is_cancelled(&self) -> bool { self.event_key.is_cancelled() @@ -791,116 +755,74 @@ where None } fn into_future(self: Box) -> Pin + Send>> { - let Self { - event_key, - func, - arg, - sender, - .. - } = *self; - - Box::pin(dispatch_keyed_event(event_key, func, arg, sender)) + Box::pin((self.gen)(self.event_key)) } fn spawn_and_forget(self: Box, executor: &Executor) { - let Self { - event_key, - func, - arg, - sender, - .. - } = *self; - - let fut = dispatch_keyed_event(event_key, func, arg, sender); - executor.spawn_and_forget(fut); + executor.spawn_and_forget((self.gen)(self.event_key)); } } -/// Object that can be converted to a future dispatching a cancellable event. -pub(crate) struct PeriodicKeyedEventDispatcher +/// An object that can be converted to a future dispatching a periodic, +/// cancellable event. +pub(crate) struct PeriodicKeyedEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, { - event_key: EventKey, - func: F, - arg: T, - sender: Sender, + /// A clonable generator for the dispatching future. + gen: G, + /// The repetition period. period: Duration, - _input_kind: PhantomData, + /// The event cancellation key. + event_key: EventKey, } -impl PeriodicKeyedEventDispatcher +impl PeriodicKeyedEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, { /// Constructs a new `KeyedEventDispatcher`. - fn new(event_key: EventKey, func: F, arg: T, sender: Sender, period: Duration) -> Self { + fn new(gen: G, period: Duration, event_key: EventKey) -> Self { Self { - event_key, - func, - arg, - sender, + gen, period, - _input_kind: PhantomData, + event_key, } } } -impl ScheduledEvent for PeriodicKeyedEventDispatcher +impl ScheduledEvent for PeriodicKeyedEventDispatcher where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, + G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, + F: Future + Send + 'static, { fn is_cancelled(&self) -> bool { self.event_key.is_cancelled() } fn next(&self) -> Option<(Box, Duration)> { let event = Box::new(Self::new( - self.event_key.clone(), - self.func.clone(), - self.arg.clone(), - self.sender.clone(), + self.gen.clone(), self.period, + self.event_key.clone(), )); Some((event, self.period)) } fn into_future(self: Box) -> Pin + Send>> { - let Self { - event_key, - func, - arg, - sender, - .. - } = *self; - - Box::pin(dispatch_keyed_event(event_key, func, arg, sender)) + Box::pin((self.gen)(self.event_key)) } fn spawn_and_forget(self: Box, executor: &Executor) { - let Self { - event_key, - func, - arg, - sender, - .. - } = *self; - - let fut = dispatch_keyed_event(event_key, func, arg, sender); - executor.spawn_and_forget(fut); + executor.spawn_and_forget((self.gen)(self.event_key)); } } -/// Asynchronously dispatch a regular, non-cancellable event. -async fn dispatch_event(func: F, arg: T, sender: Sender) +/// Asynchronously dispatches a non-cancellable event to a model input. +pub(crate) async fn dispatch_event(func: F, arg: T, sender: Sender) where M: Model, F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, + T: Send + 'static, { let _ = sender .send( @@ -916,9 +838,13 @@ where .await; } -/// Asynchronously dispatch a cancellable event. -async fn dispatch_keyed_event(event_key: EventKey, func: F, arg: T, sender: Sender) -where +/// Asynchronously dispatches a cancellable event to a model input. +pub(crate) async fn dispatch_keyed_event( + event_key: EventKey, + func: F, + arg: T, + sender: Sender, +) where M: Model, F: for<'a> InputFn<'a, M, T, S>, T: Send + Clone + 'static, diff --git a/asynchronix/src/util.rs b/asynchronix/src/util.rs index 3b8e9c0..12c5ccf 100644 --- a/asynchronix/src/util.rs +++ b/asynchronix/src/util.rs @@ -3,5 +3,4 @@ pub(crate) mod futures; pub(crate) mod priority_queue; pub(crate) mod rng; pub(crate) mod slot; -pub(crate) mod spsc_queue; pub(crate) mod sync_cell; diff --git a/asynchronix/src/util/priority_queue.rs b/asynchronix/src/util/priority_queue.rs index b57dca1..f8cf5cf 100644 --- a/asynchronix/src/util/priority_queue.rs +++ b/asynchronix/src/util/priority_queue.rs @@ -1,6 +1,6 @@ //! Associative priority queue. -use std::cmp::{Eq, Ord, Ordering, PartialOrd}; +use std::cmp::Ordering; use std::collections::BinaryHeap; /// A key-value pair ordered by keys in inverse order, with epoch-based ordering @@ -111,7 +111,7 @@ impl PriorityQueue { #[cfg(all(test, not(asynchronix_loom)))] mod tests { - use super::*; + use super::PriorityQueue; #[test] fn priority_smoke() { diff --git a/asynchronix/src/util/spsc_queue.rs b/asynchronix/src/util/spsc_queue.rs deleted file mode 100644 index 9a81b2c..0000000 --- a/asynchronix/src/util/spsc_queue.rs +++ /dev/null @@ -1,393 +0,0 @@ -//! Single-producer single-consumer unbounded FIFO queue that stores values in -//! fixed-size memory segments. - -#![allow(unused)] - -use std::cell::Cell; -use std::error::Error; -use std::fmt; -use std::marker::PhantomData; -use std::mem::MaybeUninit; -use std::panic::{RefUnwindSafe, UnwindSafe}; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering; - -use crossbeam_utils::CachePadded; - -use crate::loom_exports::cell::UnsafeCell; -use crate::loom_exports::sync::atomic::{AtomicBool, AtomicPtr}; -use crate::loom_exports::sync::Arc; - -/// The number of slots in a single segment. -const SEGMENT_LEN: usize = 32; - -/// A slot containing a single value. -struct Slot { - has_value: AtomicBool, - value: UnsafeCell>, -} - -impl Default for Slot { - fn default() -> Self { - Slot { - has_value: AtomicBool::new(false), - value: UnsafeCell::new(MaybeUninit::uninit()), - } - } -} - -/// A memory segment containing `SEGMENT_LEN` slots. -struct Segment { - /// Address of the next segment. - /// - /// A null pointer means that the next segment is not allocated yet. - next_segment: AtomicPtr>, - data: [Slot; SEGMENT_LEN], -} - -impl Segment { - /// Allocates a new segment. - fn allocate_new() -> NonNull { - let segment = Self { - next_segment: AtomicPtr::new(ptr::null_mut()), - data: Default::default(), - }; - - // Safety: the pointer is non-null since it comes from a box. - unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(segment))) } - } -} - -/// The head of the queue from which values are popped. -struct Head { - /// Pointer to the segment at the head of the queue. - segment: NonNull>, - /// Index of the next value to be read. - /// - /// If the index is equal to the segment length, it is necessary to move to - /// the next segment before the next value can be read. - next_read_idx: usize, -} - -/// The tail of the queue to which values are pushed. -struct Tail { - /// Pointer to the segment at the tail of the queue. - segment: NonNull>, - /// Index of the next value to be written. - /// - /// If the index is equal to the segment length, a new segment must be - /// allocated before a new value can be written. - next_write_idx: usize, -} - -/// A single-producer, single-consumer unbounded FIFO queue. -struct Queue { - head: CachePadded>>, - tail: CachePadded>>, -} - -impl Queue { - /// Creates a new queue. - fn new() -> Self { - let segment = Segment::allocate_new(); - - let head = Head { - segment, - next_read_idx: 0, - }; - let tail = Tail { - segment, - next_write_idx: 0, - }; - - Self { - head: CachePadded::new(UnsafeCell::new(head)), - tail: CachePadded::new(UnsafeCell::new(tail)), - } - } - - /// Pushes a new value. - /// - /// # Safety - /// - /// The method cannot be called from multiple threads concurrently. - unsafe fn push(&self, value: T) { - // Safety: this is the only thread accessing the tail. - let tail = self.tail.with_mut(|p| &mut *p); - - // If the whole segment has been written, allocate a new segment. - if tail.next_write_idx == SEGMENT_LEN { - let old_segment = tail.segment; - tail.segment = Segment::allocate_new(); - - // Safety: the old segment is still allocated since the consumer - // cannot deallocate it before `next_segment` is set to a non-null - // value. - old_segment - .as_ref() - .next_segment - .store(tail.segment.as_ptr(), Ordering::Release); - - tail.next_write_idx = 0; - } - - // Safety: the tail segment is allocated since the consumer cannot - // deallocate it before `next_segment` is set to a non-null value. - let data = &tail.segment.as_ref().data[tail.next_write_idx]; - - // Safety: we have exclusive access to the slot value since the consumer - // cannot access it before `has_value` is set to true. - data.value.with_mut(|p| (*p).write(value)); - - // Ordering: this Release store synchronizes with the Acquire load in - // `pop` and ensures that the value is visible to the consumer once - // `has_value` reads `true`. - data.has_value.store(true, Ordering::Release); - - tail.next_write_idx += 1; - } - - /// Pops a new value. - /// - /// # Safety - /// - /// The method cannot be called from multiple threads concurrently. - unsafe fn pop(&self) -> Option { - // Safety: this is the only thread accessing the head. - let head = self.head.with_mut(|p| &mut *p); - - // If the whole segment has been read, try to move to the next segment. - if head.next_read_idx == SEGMENT_LEN { - // Read the next segment or return `None` if it is not ready yet. - // - // Safety: the head segment is still allocated since we are the only - // thread that can deallocate it. - let next_segment = head.segment.as_ref().next_segment.load(Ordering::Acquire); - let next_segment = NonNull::new(next_segment)?; - - // Deallocate the old segment. - // - // Safety: the pointer was initialized from a box and the segment is - // still allocated since we are the only thread that can deallocate - // it. - let _ = Box::from_raw(head.segment.as_ptr()); - - // Update the segment and the next index. - head.segment = next_segment; - head.next_read_idx = 0; - } - - let data = &head.segment.as_ref().data[head.next_read_idx]; - - // Ordering: this Acquire load synchronizes with the Release store in - // `push` and ensures that the value is visible once `has_value` reads - // `true`. - if !data.has_value.load(Ordering::Acquire) { - return None; - } - - // Safety: since `has_value` is `true` then we have exclusive ownership - // of the value and we know that it was initialized. - let value = data.value.with(|p| (*p).assume_init_read()); - - head.next_read_idx += 1; - - Some(value) - } -} - -impl Drop for Queue { - fn drop(&mut self) { - unsafe { - // Drop all values. - while self.pop().is_some() {} - - // All values have been dropped: the last segment can be freed. - - // Safety: this is the only thread accessing the head since both the - // consumer and producer have been dropped. - let head = self.head.with_mut(|p| &mut *p); - - // Safety: the pointer was initialized from a box and the segment is - // still allocated since we are the only thread that can deallocate - // it. - let _ = Box::from_raw(head.segment.as_ptr()); - } - } -} - -unsafe impl Send for Queue {} -unsafe impl Sync for Queue {} - -impl UnwindSafe for Queue {} -impl RefUnwindSafe for Queue {} - -/// A handle to a single-producer, single-consumer queue that can push values. -pub(crate) struct Producer { - queue: Arc>, - _non_sync_phantom: PhantomData>, -} -impl Producer { - /// Pushes a value to the queue. - pub(crate) fn push(&self, value: T) -> Result<(), PushError> { - if Arc::strong_count(&self.queue) == 1 { - return Err(PushError {}); - } - - unsafe { self.queue.push(value) }; - - Ok(()) - } -} - -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -/// Error returned when a push failed due to the consumer being dropped. -pub(crate) struct PushError {} - -impl fmt::Display for PushError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sending message into a closed mailbox") - } -} - -impl Error for PushError {} - -/// A handle to a single-producer, single-consumer queue that can pop values. -pub(crate) struct Consumer { - queue: Arc>, - _non_sync_phantom: PhantomData>, -} -impl Consumer { - /// Pops a value from the queue. - pub(crate) fn pop(&self) -> Option { - unsafe { self.queue.pop() } - } -} - -/// Creates the producer and consumer handles of a single-producer, -/// single-consumer queue. -pub(crate) fn spsc_queue() -> (Producer, Consumer) { - let queue = Arc::new(Queue::new()); - - let producer = Producer { - queue: queue.clone(), - _non_sync_phantom: PhantomData, - }; - let consumer = Consumer { - queue, - _non_sync_phantom: PhantomData, - }; - - (producer, consumer) -} - -/// Loom tests. -#[cfg(all(test, not(asynchronix_loom)))] -mod tests { - use super::*; - - use std::thread; - - #[test] - fn spsc_queue_basic() { - const VALUE_COUNT: usize = if cfg!(miri) { 1000 } else { 100_000 }; - - let (producer, consumer) = spsc_queue(); - - let th = thread::spawn(move || { - for i in 0..VALUE_COUNT { - let value = loop { - if let Some(v) = consumer.pop() { - break v; - } - }; - - assert_eq!(value, i); - } - }); - - for i in 0..VALUE_COUNT { - producer.push(i).unwrap(); - } - - th.join().unwrap(); - } -} - -/// Loom tests. -#[cfg(all(test, asynchronix_loom))] -mod tests { - use super::*; - - use loom::model::Builder; - use loom::thread; - - #[test] - fn loom_spsc_queue_basic() { - const DEFAULT_PREEMPTION_BOUND: usize = 4; - const VALUE_COUNT: usize = 10; - - let mut builder = Builder::new(); - if builder.preemption_bound.is_none() { - builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND); - } - - builder.check(move || { - let (producer, consumer) = spsc_queue(); - - let th = thread::spawn(move || { - let mut value = 0; - for _ in 0..VALUE_COUNT { - if let Some(v) = consumer.pop() { - assert_eq!(v, value); - value += 1; - } - } - }); - - for i in 0..VALUE_COUNT { - let _ = producer.push(i); - } - - th.join().unwrap(); - }); - } - - #[test] - fn loom_spsc_queue_new_segment() { - const DEFAULT_PREEMPTION_BOUND: usize = 4; - const VALUE_COUNT_BEFORE: usize = 5; - const VALUE_COUNT_AFTER: usize = 5; - - let mut builder = Builder::new(); - if builder.preemption_bound.is_none() { - builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND); - } - - builder.check(move || { - let (producer, consumer) = spsc_queue(); - - // Fill up the first segment except for the last `VALUE_COUNT_BEFORE` slots. - for i in 0..(SEGMENT_LEN - VALUE_COUNT_BEFORE) { - producer.push(i).unwrap(); - consumer.pop(); - } - - let th = thread::spawn(move || { - let mut value = SEGMENT_LEN - VALUE_COUNT_BEFORE; - for _ in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) { - if let Some(v) = consumer.pop() { - assert_eq!(v, value); - value += 1; - } - } - }); - - for i in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) { - let _ = producer.push(i); - } - - th.join().unwrap(); - }); - } -} diff --git a/asynchronix/tests/model_scheduling.rs b/asynchronix/tests/model_scheduling.rs index 6ff9b44..94f3ec1 100644 --- a/asynchronix/tests/model_scheduling.rs +++ b/asynchronix/tests/model_scheduling.rs @@ -3,7 +3,7 @@ use std::time::Duration; use asynchronix::model::{Model, Output}; -use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::simulation::{EventQueue, Mailbox, SimInit}; use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; #[test] @@ -27,7 +27,8 @@ fn model_schedule_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = model.output.connect_stream().0; + let mut output = EventQueue::new(); + model.output.connect_sink(&output); let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -71,7 +72,8 @@ fn model_cancel_future_keyed_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = model.output.connect_stream().0; + let mut output = EventQueue::new(); + model.output.connect_sink(&output); let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -116,7 +118,8 @@ fn model_cancel_same_time_keyed_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = model.output.connect_stream().0; + let mut output = EventQueue::new(); + model.output.connect_sink(&output); let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -157,7 +160,8 @@ fn model_schedule_periodic_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = model.output.connect_stream().0; + let mut output = EventQueue::new(); + model.output.connect_sink(&output); let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; @@ -206,7 +210,8 @@ fn model_cancel_periodic_event() { let mut model = TestModel::default(); let mbox = Mailbox::new(); - let mut output = model.output.connect_stream().0; + let mut output = EventQueue::new(); + model.output.connect_sink(&output); let addr = mbox.address(); let t0 = MonotonicTime::EPOCH; diff --git a/asynchronix/tests/simulation_scheduling.rs b/asynchronix/tests/simulation_scheduling.rs index 858f81e..b8bd0d8 100644 --- a/asynchronix/tests/simulation_scheduling.rs +++ b/asynchronix/tests/simulation_scheduling.rs @@ -3,7 +3,7 @@ use std::time::Duration; use asynchronix::model::{Model, Output}; -use asynchronix::simulation::{Address, EventStream, Mailbox, SimInit, Simulation}; +use asynchronix::simulation::{Address, EventQueue, Mailbox, SimInit, Simulation}; use asynchronix::time::MonotonicTime; // Input-to-output pass-through model. @@ -26,12 +26,13 @@ impl Model for PassThroughModel {} /// output) running as fast as possible. fn passthrough_bench( t0: MonotonicTime, -) -> (Simulation, Address>, EventStream) { +) -> (Simulation, Address>, EventQueue) { // Bench assembly. let mut model = PassThroughModel::new(); let mbox = Mailbox::new(); - let out_stream = model.output.connect_stream().0; + let out_stream = EventQueue::new(); + model.output.connect_sink(&out_stream); let addr = mbox.address(); let simu = SimInit::new().add_model(model, mbox).init(t0); @@ -243,13 +244,14 @@ fn timestamp_bench( ) -> ( Simulation, Address, - EventStream<(Instant, SystemTime)>, + EventQueue<(Instant, SystemTime)>, ) { // Bench assembly. let mut model = TimestampModel::default(); let mbox = Mailbox::new(); - let stamp_stream = model.stamp.connect_stream().0; + let stamp_stream = EventQueue::new(); + model.stamp.connect_sink(&stamp_stream); let addr = mbox.address(); let simu = SimInit::new()