1
0
forked from ROMEO/nexosim

Revert "Merge pull request #12 from asynchronics/feature/event-sinks"

This reverts commit 7e881afb638ccc0dbcfc7b539fc152dc923d63e1, reversing
changes made to 9d78e4f72a4c6ff459fc386b2f25beae40b94429.
This commit is contained in:
Serge Barral 2024-03-06 16:16:55 +01:00
parent 43e41012d2
commit 1be2f48a00
25 changed files with 870 additions and 573 deletions

View File

@ -13,6 +13,7 @@ on:
- 'asynchronix/src/model/ports/broadcaster.rs' - 'asynchronix/src/model/ports/broadcaster.rs'
- 'asynchronix/src/model/ports/broadcaster/**' - 'asynchronix/src/model/ports/broadcaster/**'
- 'asynchronix/src/util/slot.rs' - 'asynchronix/src/util/slot.rs'
- 'asynchronix/src/util/spsc_queue.rs'
- 'asynchronix/src/util/sync_cell.rs' - 'asynchronix/src/util/sync_cell.rs'
jobs: jobs:

View File

@ -1,3 +1,2 @@
[workspace] [workspace]
members = ["asynchronix"] members = ["asynchronix"]
resolver = "2"

View File

@ -28,7 +28,6 @@ dev-logs = []
[dependencies] [dependencies]
async-event = "0.1" async-event = "0.1"
crossbeam-utils = "0.8" crossbeam-utils = "0.8"
crossbeam-queue = "0.3"
diatomic-waker = "0.1" diatomic-waker = "0.1"
futures-task = "0.3" futures-task = "0.3"
multishot = "0.3" multishot = "0.3"

View File

@ -36,7 +36,7 @@ use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use asynchronix::model::{InitializedModel, Model, Output}; use asynchronix::model::{InitializedModel, Model, Output};
use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
/// Water pump. /// Water pump.
@ -364,8 +364,7 @@ fn main() {
pump.flow_rate.connect(Tank::set_flow_rate, &tank_mbox); pump.flow_rate.connect(Tank::set_flow_rate, &tank_mbox);
// Model handles for simulation. // Model handles for simulation.
let mut flow_rate = EventSlot::new(); let mut flow_rate = pump.flow_rate.connect_slot().0;
pump.flow_rate.connect_sink(&flow_rate);
let controller_addr = controller_mbox.address(); let controller_addr = controller_mbox.address();
let tank_addr = tank_mbox.address(); let tank_addr = tank_mbox.address();

View File

@ -27,7 +27,7 @@
//! └──────────┘ //! └──────────┘
//! ``` //! ```
use asynchronix::model::{Model, Output, Requestor}; use asynchronix::model::{Model, Output, Requestor};
use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
/// Power supply. /// Power supply.
@ -124,14 +124,10 @@ fn main() {
psu.pwr_out.connect(Load::pwr_in, &load3_mbox); psu.pwr_out.connect(Load::pwr_in, &load3_mbox);
// Model handles for simulation. // Model handles for simulation.
let mut psu_power = EventSlot::new(); let mut psu_power = psu.power.connect_slot().0;
let mut load1_power = EventSlot::new(); let mut load1_power = load1.power.connect_slot().0;
let mut load2_power = EventSlot::new(); let mut load2_power = load2.power.connect_slot().0;
let mut load3_power = EventSlot::new(); let mut load3_power = load3.power.connect_slot().0;
psu.power.connect_sink(&psu_power);
load1.power.connect_sink(&load1_power);
load2.power.connect_sink(&load2_power);
load3.power.connect_sink(&load3_power);
let psu_addr = psu_mbox.address(); let psu_addr = psu_mbox.address();
// Start time (arbitrary since models do not depend on absolute time). // Start time (arbitrary since models do not depend on absolute time).

View File

@ -19,7 +19,7 @@ use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use asynchronix::model::{InitializedModel, Model, Output}; use asynchronix::model::{InitializedModel, Model, Output};
use asynchronix::simulation::{EventQueue, Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{MonotonicTime, Scheduler}; use asynchronix::time::{MonotonicTime, Scheduler};
/// Stepper motor. /// Stepper motor.
@ -200,8 +200,7 @@ fn main() {
driver.current_out.connect(Motor::current_in, &motor_mbox); driver.current_out.connect(Motor::current_in, &motor_mbox);
// Model handles for simulation. // Model handles for simulation.
let mut position = EventQueue::new(); let mut position = motor.position.connect_stream().0;
motor.position.connect_sink(&position);
let motor_addr = motor_mbox.address(); let motor_addr = motor_mbox.address();
let driver_addr = driver_mbox.address(); let driver_addr = driver_mbox.address();

View File

@ -255,8 +255,8 @@ impl<M: Model> Sender<M> {
/// All channels are guaranteed to have different identifiers at any given /// All channels are guaranteed to have different identifiers at any given
/// time, but an identifier may be reused after all handles to a channel /// time, but an identifier may be reused after all handles to a channel
/// have been dropped. /// have been dropped.
pub(crate) fn channel_id(&self) -> NonZeroUsize { pub(crate) fn channel_id(&self) -> ChannelId {
NonZeroUsize::new(Arc::as_ptr(&self.inner) as usize).unwrap() ChannelId(NonZeroUsize::new(&*self.inner as *const Inner<M> as usize).unwrap())
} }
} }

View File

@ -1,6 +1,7 @@
use std::future::Future;
use std::ops::Deref; use std::ops::Deref;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::thread; use std::thread;

View File

@ -1,3 +1,5 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_channel::{mpsc, oneshot}; use futures_channel::{mpsc, oneshot};
use futures_util::StreamExt; use futures_util::StreamExt;

View File

@ -193,7 +193,7 @@
//! # impl Model for Delay {} //! # impl Model for Delay {}
//! # } //! # }
//! use std::time::Duration; //! use std::time::Duration;
//! use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; //! use asynchronix::simulation::{Mailbox, SimInit};
//! use asynchronix::time::MonotonicTime; //! use asynchronix::time::MonotonicTime;
//! //!
//! use models::{Delay, Multiplier}; //! use models::{Delay, Multiplier};
@ -217,8 +217,7 @@
//! delay1.output.connect(Delay::input, &delay2_mbox); //! delay1.output.connect(Delay::input, &delay2_mbox);
//! //!
//! // Keep handles to the system input and output for the simulation. //! // Keep handles to the system input and output for the simulation.
//! let mut output_slot = EventSlot::new(); //! let mut output_slot = delay2.output.connect_slot().0;
//! delay2.output.connect_sink(&output_slot);
//! let input_address = multiplier1_mbox.address(); //! let input_address = multiplier1_mbox.address();
//! //!
//! // Pick an arbitrary simulation start time and build the simulation. //! // Pick an arbitrary simulation start time and build the simulation.
@ -256,7 +255,7 @@
//! //!
//! Simulation outputs can be monitored using //! Simulation outputs can be monitored using
//! [`EventSlot`](simulation::EventSlot)s and //! [`EventSlot`](simulation::EventSlot)s and
//! [`EventQueue`](simulation::EventQueue)s, which can be connected to any //! [`EventStream`](simulation::EventStream)s, which can be connected to any
//! model's output port. While an event slot only gives access to the last value //! model's output port. While an event slot only gives access to the last value
//! sent from a port, an event stream is an iterator that yields all events that //! sent from a port, an event stream is an iterator that yields all events that
//! were sent in first-in-first-out order. //! were sent in first-in-first-out order.
@ -294,7 +293,7 @@
//! # impl Model for Delay {} //! # impl Model for Delay {}
//! # } //! # }
//! # use std::time::Duration; //! # use std::time::Duration;
//! # use asynchronix::simulation::{EventSlot, Mailbox, SimInit}; //! # use asynchronix::simulation::{Mailbox, SimInit};
//! # use asynchronix::time::MonotonicTime; //! # use asynchronix::time::MonotonicTime;
//! # use models::{Delay, Multiplier}; //! # use models::{Delay, Multiplier};
//! # let mut multiplier1 = Multiplier::default(); //! # let mut multiplier1 = Multiplier::default();
@ -309,8 +308,7 @@
//! # multiplier1.output.connect(Multiplier::input, &multiplier2_mbox); //! # multiplier1.output.connect(Multiplier::input, &multiplier2_mbox);
//! # multiplier2.output.connect(Delay::input, &delay2_mbox); //! # multiplier2.output.connect(Delay::input, &delay2_mbox);
//! # delay1.output.connect(Delay::input, &delay2_mbox); //! # delay1.output.connect(Delay::input, &delay2_mbox);
//! # let mut output_slot = EventSlot::new(); //! # let mut output_slot = delay2.output.connect_slot().0;
//! # delay2.output.connect_sink(&output_slot);
//! # let input_address = multiplier1_mbox.address(); //! # let input_address = multiplier1_mbox.address();
//! # let t0 = MonotonicTime::EPOCH; //! # let t0 = MonotonicTime::EPOCH;
//! # let mut simu = SimInit::new() //! # let mut simu = SimInit::new()

View File

@ -132,7 +132,7 @@
//! can be connected to input and requestor ports when assembling the simulation //! can be connected to input and requestor ports when assembling the simulation
//! bench. However, input ports may instead be defined as private methods if //! bench. However, input ports may instead be defined as private methods if
//! they are only used by the model itself to schedule future actions (see the //! they are only used by the model itself to schedule future actions (see the
//! [`Scheduler`] examples). //! [`Scheduler`](crate::time::Scheduler) examples).
//! //!
//! Changing the signature of an input or replier port is not considered to //! Changing the signature of an input or replier port is not considered to
//! alter the public interface of a model provided that the event, request and //! alter the public interface of a model provided that the event, request and

View File

@ -14,17 +14,18 @@
//! ports should generally be preferred over requestor ports when possible. //! ports should generally be preferred over requestor ports when possible.
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex};
mod broadcaster; mod broadcaster;
mod sender; mod sender;
use crate::model::ports::sender::EventSinkSender;
use crate::model::{InputFn, Model, ReplierFn}; use crate::model::{InputFn, Model, ReplierFn};
use crate::simulation::{Address, EventSink}; use crate::simulation::{Address, EventSlot, EventStream};
use crate::util::spsc_queue;
use broadcaster::{EventBroadcaster, QueryBroadcaster}; use broadcaster::Broadcaster;
use self::sender::{InputSender, ReplierSender}; use self::sender::{EventSender, EventSlotSender, EventStreamSender, QuerySender};
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
/// Unique identifier for a connection between two ports. /// Unique identifier for a connection between two ports.
@ -36,7 +37,7 @@ pub struct LineId(u64);
/// methods that return no value. They broadcast events to all connected input /// methods that return no value. They broadcast events to all connected input
/// ports. /// ports.
pub struct Output<T: Clone + Send + 'static> { pub struct Output<T: Clone + Send + 'static> {
broadcaster: EventBroadcaster<T>, broadcaster: Broadcaster<T, ()>,
next_line_id: u64, next_line_id: u64,
} }
@ -61,23 +62,40 @@ impl<T: Clone + Send + 'static> Output<T> {
assert!(self.next_line_id != u64::MAX); assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id); let line_id = LineId(self.next_line_id);
self.next_line_id += 1; self.next_line_id += 1;
let sender = Box::new(InputSender::new(input, address.into().0)); let sender = Box::new(EventSender::new(input, address.into().0));
self.broadcaster.add(sender, line_id); self.broadcaster.add(sender, line_id);
line_id line_id
} }
/// Adds a connection to an event sink such as an /// Adds a connection to an event stream iterator.
/// [`EventSlot`](crate::simulation::EventSlot) or pub fn connect_stream(&mut self) -> (EventStream<T>, LineId) {
/// [`EventQueue`](crate::simulation::EventQueue).
pub fn connect_sink<S: EventSink<T>>(&mut self, sink: &S) -> LineId {
assert!(self.next_line_id != u64::MAX); assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id); let line_id = LineId(self.next_line_id);
self.next_line_id += 1; self.next_line_id += 1;
let sender = Box::new(EventSinkSender::new(sink.writer()));
let (producer, consumer) = spsc_queue::spsc_queue();
let sender = Box::new(EventStreamSender::new(producer));
let event_stream = EventStream::new(consumer);
self.broadcaster.add(sender, line_id); self.broadcaster.add(sender, line_id);
line_id (event_stream, line_id)
}
/// Adds a connection to an event slot.
pub fn connect_slot(&mut self) -> (EventSlot<T>, LineId) {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let slot = Arc::new(Mutex::new(None));
let sender = Box::new(EventSlotSender::new(slot.clone()));
let event_slot = EventSlot::new(slot);
self.broadcaster.add(sender, line_id);
(event_slot, line_id)
} }
/// Removes the connection specified by the `LineId` parameter. /// Removes the connection specified by the `LineId` parameter.
@ -100,14 +118,14 @@ impl<T: Clone + Send + 'static> Output<T> {
/// Broadcasts an event to all connected input ports. /// Broadcasts an event to all connected input ports.
pub async fn send(&mut self, arg: T) { pub async fn send(&mut self, arg: T) {
self.broadcaster.broadcast(arg).await.unwrap(); self.broadcaster.broadcast_event(arg).await.unwrap();
} }
} }
impl<T: Clone + Send + 'static> Default for Output<T> { impl<T: Clone + Send + 'static> Default for Output<T> {
fn default() -> Self { fn default() -> Self {
Self { Self {
broadcaster: EventBroadcaster::default(), broadcaster: Broadcaster::default(),
next_line_id: 0, next_line_id: 0,
} }
} }
@ -125,7 +143,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
/// model methods that return a value. They broadcast queries to all connected /// model methods that return a value. They broadcast queries to all connected
/// replier ports. /// replier ports.
pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> { pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: QueryBroadcaster<T, R>, broadcaster: Broadcaster<T, R>,
next_line_id: u64, next_line_id: u64,
} }
@ -150,7 +168,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
assert!(self.next_line_id != u64::MAX); assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id); let line_id = LineId(self.next_line_id);
self.next_line_id += 1; self.next_line_id += 1;
let sender = Box::new(ReplierSender::new(replier, address.into().0)); let sender = Box::new(QuerySender::new(replier, address.into().0));
self.broadcaster.add(sender, line_id); self.broadcaster.add(sender, line_id);
line_id line_id
@ -176,14 +194,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// Broadcasts a query to all connected replier ports. /// Broadcasts a query to all connected replier ports.
pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ { pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ {
self.broadcaster.broadcast(arg).await.unwrap() self.broadcaster.broadcast_query(arg).await.unwrap()
} }
} }
impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> { impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> {
fn default() -> Self { fn default() -> Self {
Self { Self {
broadcaster: QueryBroadcaster::default(), broadcaster: Broadcaster::default(),
next_line_id: 0, next_line_id: 0,
} }
} }

View File

@ -1,5 +1,4 @@
use std::future::Future; use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -27,17 +26,28 @@ mod task_set;
/// exploits this behavior by waking the main broadcast future only when all /// exploits this behavior by waking the main broadcast future only when all
/// sender futures have been awaken, which strongly reduces overhead since /// sender futures have been awaken, which strongly reduces overhead since
/// waking a sender task does not actually schedule it on the executor. /// waking a sender task does not actually schedule it on the executor.
pub(super) struct BroadcasterInner<T: Clone + 'static, R: 'static> { pub(super) struct Broadcaster<T: Clone + 'static, R: 'static> {
/// The list of senders with their associated line identifier. /// The list of senders with their associated line identifier.
senders: Vec<(LineId, Box<dyn Sender<T, R>>)>, senders: Vec<(LineId, Box<dyn Sender<T, R>>)>,
/// Fields explicitly borrowed by the `BroadcastFuture`. /// Fields explicitly borrowed by the `BroadcastFuture`.
shared: Shared<R>, shared: Shared<R>,
/// Phantom types.
_phantom_event: PhantomData<T>,
_phantom_reply: PhantomData<R>,
} }
impl<T: Clone + 'static, R> BroadcasterInner<T, R> { impl<T: Clone + 'static> Broadcaster<T, ()> {
/// Broadcasts an event to all addresses.
pub(super) async fn broadcast_event(&mut self, arg: T) -> Result<(), BroadcastError> {
match self.senders.as_mut_slice() {
// No sender.
[] => Ok(()),
// One sender.
[sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}),
// Multiple senders.
_ => self.broadcast(arg).await,
}
}
}
impl<T: Clone + 'static, R> Broadcaster<T, R> {
/// Adds a new sender associated to the specified identifier. /// Adds a new sender associated to the specified identifier.
/// ///
/// # Panics /// # Panics
@ -83,25 +93,55 @@ impl<T: Clone + 'static, R> BroadcasterInner<T, R> {
self.senders.len() self.senders.len()
} }
/// Broadcasts a query to all addresses and collect all responses.
pub(super) async fn broadcast_query(
&mut self,
arg: T,
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
match self.senders.as_mut_slice() {
// No sender.
[] => {}
// One sender.
[sender] => {
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
self.shared.futures_env[0].output = Some(output);
}
// Multiple senders.
_ => self.broadcast(arg).await?,
};
// At this point all outputs should be available so `unwrap` can be
// called on the output of each future.
let outputs = self
.shared
.futures_env
.iter_mut()
.map(|t| t.output.take().unwrap());
Ok(outputs)
}
/// Efficiently broadcasts a message or a query to multiple addresses. /// Efficiently broadcasts a message or a query to multiple addresses.
/// ///
/// This method does not collect the responses from queries. /// This method does not collect the responses from queries.
fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> { fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> {
let futures_count = self.senders.len();
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
// Broadcast the message and collect all futures. // Broadcast the message and collect all futures.
let mut iter = self for (i, (sender, futures_env)) in self
.senders .senders
.iter_mut() .iter_mut()
.zip(self.shared.futures_env.iter_mut()); .zip(self.shared.futures_env.iter_mut())
while let Some((sender, futures_env)) = iter.next() { .enumerate()
{
let future_cache = futures_env let future_cache = futures_env
.storage .storage
.take() .take()
.unwrap_or_else(|| RecycleBox::new(())); .unwrap_or_else(|| RecycleBox::new(()));
// Move the argument rather than clone it for the last future. // Move the argument rather than clone it for the last future.
if iter.len() == 0 { if i + 1 == futures_count {
let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> = let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> =
coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg))); coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg)));
@ -121,7 +161,7 @@ impl<T: Clone + 'static, R> BroadcasterInner<T, R> {
} }
} }
impl<T: Clone + 'static, R> Default for BroadcasterInner<T, R> { impl<T: Clone + 'static, R> Default for Broadcaster<T, R> {
/// Creates an empty `Broadcaster` object. /// Creates an empty `Broadcaster` object.
fn default() -> Self { fn default() -> Self {
let wake_sink = WakeSink::new(); let wake_sink = WakeSink::new();
@ -135,141 +175,6 @@ impl<T: Clone + 'static, R> Default for BroadcasterInner<T, R> {
futures_env: Vec::new(), futures_env: Vec::new(),
storage: None, storage: None,
}, },
_phantom_event: PhantomData,
_phantom_reply: PhantomData,
}
}
}
/// An object that can efficiently broadcast events to several input ports.
///
/// See `BroadcasterInner` for implementation details.
pub(super) struct EventBroadcaster<T: Clone + 'static> {
/// The broadcaster core object.
inner: BroadcasterInner<T, ()>,
}
impl<T: Clone + 'static> EventBroadcaster<T> {
/// Adds a new sender associated to the specified identifier.
///
/// # Panics
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>, id: LineId) {
self.inner.add(sender, id);
}
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
}
/// Returns the number of connected senders.
pub(super) fn len(&self) -> usize {
self.inner.len()
}
/// Broadcasts an event to all addresses.
pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), BroadcastError> {
match self.inner.senders.as_mut_slice() {
// No sender.
[] => Ok(()),
// One sender.
[sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}),
// Multiple senders.
_ => self.inner.broadcast(arg).await,
}
}
}
impl<T: Clone + 'static> Default for EventBroadcaster<T> {
fn default() -> Self {
Self {
inner: BroadcasterInner::default(),
}
}
}
/// An object that can efficiently broadcast queries to several replier ports.
///
/// See `BroadcasterInner` for implementation details.
pub(super) struct QueryBroadcaster<T: Clone + 'static, R: 'static> {
/// The broadcaster core object.
inner: BroadcasterInner<T, R>,
}
impl<T: Clone + 'static, R: 'static> QueryBroadcaster<T, R> {
/// Adds a new sender associated to the specified identifier.
///
/// # Panics
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>, id: LineId) {
self.inner.add(sender, id);
}
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
}
/// Returns the number of connected senders.
pub(super) fn len(&self) -> usize {
self.inner.len()
}
/// Broadcasts a query to all addresses and collect all responses.
pub(super) async fn broadcast(
&mut self,
arg: T,
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
match self.inner.senders.as_mut_slice() {
// No sender.
[] => {}
// One sender.
[sender] => {
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
self.inner.shared.futures_env[0].output = Some(output);
}
// Multiple senders.
_ => self.inner.broadcast(arg).await?,
};
// At this point all outputs should be available so `unwrap` can be
// called on the output of each future.
let outputs = self
.inner
.shared
.futures_env
.iter_mut()
.map(|t| t.output.take().unwrap());
Ok(outputs)
}
}
impl<T: Clone + 'static, R: 'static> Default for QueryBroadcaster<T, R> {
fn default() -> Self {
Self {
inner: BroadcasterInner::default(),
} }
} }
} }
@ -418,7 +323,7 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
let scheduled_tasks = match this let scheduled_tasks = match this
.shared .shared
.task_set .task_set
.take_scheduled(this.pending_futures_count) .steal_scheduled(this.pending_futures_count)
{ {
Some(st) => st, Some(st) => st,
None => return Poll::Pending, None => return Poll::Pending,
@ -503,7 +408,9 @@ mod tests {
use futures_executor::block_on; use futures_executor::block_on;
use super::super::sender::QuerySender;
use crate::channel::Receiver; use crate::channel::Receiver;
use crate::model::Model;
use crate::time::Scheduler; use crate::time::Scheduler;
use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue; use crate::util::priority_queue::PriorityQueue;
@ -534,18 +441,18 @@ mod tests {
const N_RECV: usize = 4; const N_RECV: usize = 4;
let mut mailboxes = Vec::new(); let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default(); let mut broadcaster = Broadcaster::default();
for id in 0..N_RECV { for id in 0..N_RECV {
let mailbox = Receiver::new(10); let mailbox = Receiver::new(10);
let address = mailbox.sender(); let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address)); let sender = Box::new(EventSender::new(Counter::inc, address));
broadcaster.add(sender, LineId(id as u64)); broadcaster.add(sender, LineId(id as u64));
mailboxes.push(mailbox); mailboxes.push(mailbox);
} }
let th_broadcast = thread::spawn(move || { let th_broadcast = thread::spawn(move || {
block_on(broadcaster.broadcast(1)).unwrap(); block_on(broadcaster.broadcast_event(1)).unwrap();
}); });
let counter = Arc::new(AtomicUsize::new(0)); let counter = Arc::new(AtomicUsize::new(0));
@ -582,18 +489,18 @@ mod tests {
const N_RECV: usize = 4; const N_RECV: usize = 4;
let mut mailboxes = Vec::new(); let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default(); let mut broadcaster = Broadcaster::default();
for id in 0..N_RECV { for id in 0..N_RECV {
let mailbox = Receiver::new(10); let mailbox = Receiver::new(10);
let address = mailbox.sender(); let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); let sender = Box::new(QuerySender::new(Counter::fetch_inc, address));
broadcaster.add(sender, LineId(id as u64)); broadcaster.add(sender, LineId(id as u64));
mailboxes.push(mailbox); mailboxes.push(mailbox);
} }
let th_broadcast = thread::spawn(move || { let th_broadcast = thread::spawn(move || {
let iter = block_on(broadcaster.broadcast(1)).unwrap(); let iter = block_on(broadcaster.broadcast_query(1)).unwrap();
let sum = iter.fold(0, |acc, val| acc + val); let sum = iter.fold(0, |acc, val| acc + val);
assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)} assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)}
@ -702,12 +609,12 @@ mod tests {
let (test_event2, waker2) = test_event::<usize>(); let (test_event2, waker2) = test_event::<usize>();
let (test_event3, waker3) = test_event::<usize>(); let (test_event3, waker3) = test_event::<usize>();
let mut broadcaster = QueryBroadcaster::default(); let mut broadcaster = Broadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1)); broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2)); broadcaster.add(Box::new(test_event2), LineId(2));
broadcaster.add(Box::new(test_event3), LineId(3)); broadcaster.add(Box::new(test_event3), LineId(3));
let mut fut = Box::pin(broadcaster.broadcast(())); let mut fut = Box::pin(broadcaster.broadcast_query(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));
let is_scheduled_waker = is_scheduled.clone(); let is_scheduled_waker = is_scheduled.clone();
@ -777,11 +684,11 @@ mod tests {
let (test_event1, waker1) = test_event::<usize>(); let (test_event1, waker1) = test_event::<usize>();
let (test_event2, waker2) = test_event::<usize>(); let (test_event2, waker2) = test_event::<usize>();
let mut broadcaster = QueryBroadcaster::default(); let mut broadcaster = Broadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1)); broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2)); broadcaster.add(Box::new(test_event2), LineId(2));
let mut fut = Box::pin(broadcaster.broadcast(())); let mut fut = Box::pin(broadcaster.broadcast_query(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));
let is_scheduled_waker = is_scheduled.clone(); let is_scheduled_waker = is_scheduled.clone();

View File

@ -21,37 +21,31 @@ const COUNTDOWN_MASK: u64 = !INDEX_MASK;
/// scheduled tasks. /// scheduled tasks.
const COUNTDOWN_ONE: u64 = 1 << 32; const COUNTDOWN_ONE: u64 = 1 << 32;
/// An object for the efficient management of a set of tasks scheduled /// A set of tasks that may be scheduled cheaply and can be requested to wake a
/// concurrently. /// parent task only when a given amount of tasks have been scheduled.
/// ///
/// The algorithm used by `TaskSet` is designed to wake up the parent task as /// This object maintains both a list of all active tasks and a list of the
/// seldom as possible, ideally only when all non-completed sub-tasks have been /// subset of active tasks currently scheduled. The latter is stored in a
/// scheduled (awaken). /// Treiber stack which links tasks through indices rather than pointers. Using
/// /// indices has two advantages: (i) it enables a fully safe implementation and
/// A `TaskSet` maintains both a vector-based list of tasks (or more accurately, /// (ii) it makes it possible to use a single CAS to simultaneously move the
/// task waker handles) and a linked list of the subset of tasks that are /// head and decrement the outstanding amount of tasks to be scheduled before
/// currently scheduled. The latter is stored in a vector-based Treiber stack /// the parent task is notified.
/// which links tasks through indices rather than pointers. Using indices has
/// two advantages: (i) it makes a fully safe implementation possible and (ii)
/// it can take advantage of a single CAS to simultaneously move the head and
/// decrement the outstanding amount of tasks to be scheduled before the parent
/// task is notified.
pub(super) struct TaskSet { pub(super) struct TaskSet {
/// Set of all tasks, scheduled or not. /// Set of all active tasks, scheduled or not.
/// ///
/// In some cases, the use of `resize()` to shrink the task set may leave /// In some rare cases, the back of the vector can also contain inactive
/// inactive tasks at the back of the vector, in which case the length of /// (retired) tasks.
/// the vector will exceed `task_count`.
tasks: Vec<Arc<Task>>, tasks: Vec<Arc<Task>>,
/// Head of the Treiber stack for scheduled tasks. /// Head of the Treiber stack for scheduled tasks.
/// ///
/// The lower 32 bits specify the index of the last scheduled task (head), /// The lower bits specify the index of the last scheduled task, if any,
/// if any, whereas the upper 32 bits specify the countdown of tasks still /// whereas the upper bits specify the countdown of tasks still to be
/// to be scheduled before the parent task is notified. /// scheduled before the parent task is notified.
head: Arc<AtomicU64>, head: Arc<AtomicU64>,
/// A notifier used to wake the parent task. /// A notifier used to wake the parent task.
notifier: WakeSource, notifier: WakeSource,
/// Count of all tasks, scheduled or not. /// Count of all active tasks, scheduled or not.
task_count: usize, task_count: usize,
} }
@ -71,25 +65,21 @@ impl TaskSet {
} }
} }
/// Take all scheduled tasks and returns an iterator over their indices, or /// Steals scheduled tasks if any and returns an iterator over their
/// if there are no currently scheduled tasks returns `None` and requests a /// indices, otherwise returns `None` and requests a notification to be sent
/// notification to be sent after `pending_task_count` tasks have been /// after `notify_count` tasks have been scheduled.
/// scheduled.
/// ///
/// In all cases, the list of scheduled tasks will be empty right after this /// In all cases, the list of scheduled tasks is guaranteed to be empty
/// call. /// after this call.
/// ///
/// If there were scheduled tasks, no notification is requested because this /// If some tasks were stolen, no notification is requested.
/// method is expected to be called repeatedly until it returns `None`.
/// Failure to do so will result in missed notifications.
/// ///
/// If no tasks were scheduled, the notification is guaranteed to be /// If no tasks were stolen, the notification is guaranteed to be triggered
/// triggered no later than after `pending_task_count` tasks have been /// no later than after `notify_count` tasks have been scheduled, though it
/// scheduled, though it may in some cases be triggered earlier. If the /// may in some cases be triggered earlier. If the specified `notify_count`
/// specified `pending_task_count` is zero then no notification is /// is zero then no notification is requested.
/// requested. pub(super) fn steal_scheduled(&self, notify_count: usize) -> Option<TaskIterator<'_>> {
pub(super) fn take_scheduled(&self, pending_task_count: usize) -> Option<TaskIterator<'_>> { let countdown = u32::try_from(notify_count).unwrap();
let countdown = u32::try_from(pending_task_count).unwrap();
let mut head = self.head.load(Ordering::Relaxed); let mut head = self.head.load(Ordering::Relaxed);
loop { loop {
@ -136,13 +126,13 @@ impl TaskSet {
if self.head.load(Ordering::Relaxed) != EMPTY as u64 { if self.head.load(Ordering::Relaxed) != EMPTY as u64 {
// Dropping the iterator ensures that all tasks are put in the // Dropping the iterator ensures that all tasks are put in the
// sleeping state. // sleeping state.
let _ = self.take_scheduled(0); let _ = self.steal_scheduled(0);
} }
} }
/// Set the number of active tasks. /// Modify the number of active tasks.
/// ///
/// Note that this method may discard already scheduled tasks. /// Note that this method may discard all scheduled tasks.
/// ///
/// # Panic /// # Panic
/// ///
@ -210,7 +200,7 @@ impl TaskSet {
} }
} }
/// Returns `true` if one or more sub-tasks are currently scheduled. /// Returns `true` if one or more tasks are currently scheduled.
pub(super) fn has_scheduled(&self) -> bool { pub(super) fn has_scheduled(&self) -> bool {
// Ordering: the content of the head is only used as an advisory flag so // Ordering: the content of the head is only used as an advisory flag so
// Relaxed ordering is sufficient. // Relaxed ordering is sufficient.

View File

@ -4,23 +4,22 @@ use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel; use crate::channel;
use crate::model::{InputFn, Model, ReplierFn}; use crate::model::{InputFn, Model, ReplierFn};
use crate::simulation::EventSinkWriter; use crate::util::spsc_queue;
/// An event or query sender abstracting over the target model and input or /// Abstraction over `EventSender` and `QuerySender`.
/// replier method.
pub(super) trait Sender<T, R>: Send { pub(super) trait Sender<T, R>: Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>; fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>;
} }
/// An object that can send events to an input port. /// An object that can send a payload to a model.
pub(super) struct InputSender<M: 'static, F, T, S> { pub(super) struct EventSender<M: 'static, F, T, S> {
func: F, func: F,
sender: channel::Sender<M>, sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>, fut_storage: Option<RecycleBox<()>>,
@ -28,7 +27,7 @@ pub(super) struct InputSender<M: 'static, F, T, S> {
_phantom_closure_marker: PhantomData<S>, _phantom_closure_marker: PhantomData<S>,
} }
impl<M: Send, F, T, S> InputSender<M, F, T, S> impl<M: Send, F, T, S> EventSender<M, F, T, S>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S>,
@ -45,15 +44,15 @@ where
} }
} }
impl<M: Send, F, T, S> Sender<T, ()> for InputSender<M, F, T, S> impl<M: Send, F, T, S> Sender<T, ()> for EventSender<M, F, T, S>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone, F: for<'a> InputFn<'a, M, T, S> + Copy,
T: Send + 'static, T: Send + 'static,
S: Send + 'static, S: Send,
{ {
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let func = self.func.clone(); let func = self.func;
let fut = self.sender.send(move |model, scheduler, recycle_box| { let fut = self.sender.send(move |model, scheduler, recycle_box| {
let fut = func.call(model, arg, scheduler); let fut = func.call(model, arg, scheduler);
@ -67,8 +66,8 @@ where
} }
} }
/// An object that can send a request to a replier port and retrieve a response. /// An object that can send a payload to a model and retrieve a response.
pub(super) struct ReplierSender<M: 'static, F, T, R, S> { pub(super) struct QuerySender<M: 'static, F, T, R, S> {
func: F, func: F,
sender: channel::Sender<M>, sender: channel::Sender<M>,
receiver: multishot::Receiver<R>, receiver: multishot::Receiver<R>,
@ -77,7 +76,7 @@ pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
_phantom_closure_marker: PhantomData<S>, _phantom_closure_marker: PhantomData<S>,
} }
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S> impl<M, F, T, R, S> QuerySender<M, F, T, R, S>
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>, F: for<'a> ReplierFn<'a, M, T, R, S>,
@ -96,16 +95,16 @@ where
} }
} }
impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S> impl<M, F, T, R, S> Sender<T, R> for QuerySender<M, F, T, R, S>
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, F: for<'a> ReplierFn<'a, M, T, R, S> + Copy,
T: Send + 'static, T: Send + 'static,
R: Send + 'static, R: Send + 'static,
S: Send, S: Send,
{ {
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
let func = self.func.clone(); let func = self.func;
let sender = &mut self.sender; let sender = &mut self.sender;
let reply_receiver = &mut self.receiver; let reply_receiver = &mut self.receiver;
let fut_storage = &mut self.fut_storage; let fut_storage = &mut self.fut_storage;
@ -135,32 +134,59 @@ where
} }
} }
/// An object that can send a payload to a mutex-protected slot. /// An object that can send a payload to an unbounded queue.
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>> { pub(super) struct EventStreamSender<T> {
writer: W, producer: spsc_queue::Producer<T>,
fut_storage: Option<RecycleBox<()>>, fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
} }
impl<T: Send + 'static, W: EventSinkWriter<T>> EventSinkSender<T, W> { impl<T> EventStreamSender<T> {
pub(super) fn new(writer: W) -> Self { pub(super) fn new(producer: spsc_queue::Producer<T>) -> Self {
Self { Self {
writer, producer,
fut_storage: None, fut_storage: None,
_phantom_event: PhantomData,
} }
} }
} }
impl<T, W: EventSinkWriter<T>> Sender<T, ()> for EventSinkSender<T, W> impl<T> Sender<T, ()> for EventStreamSender<T>
where where
T: Send + 'static, T: Send + 'static,
{ {
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &self.writer; let producer = &mut self.producer;
RecycledFuture::new(&mut self.fut_storage, async move { RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg); producer.push(arg).map_err(|_| SendError {})
})
}
}
/// An object that can send a payload to a mutex-protected slot.
pub(super) struct EventSlotSender<T> {
slot: Arc<Mutex<Option<T>>>,
fut_storage: Option<RecycleBox<()>>,
}
impl<T> EventSlotSender<T> {
pub(super) fn new(slot: Arc<Mutex<Option<T>>>) -> Self {
Self {
slot,
fut_storage: None,
}
}
}
impl<T> Sender<T, ()> for EventSlotSender<T>
where
T: Send + 'static,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let slot = &*self.slot;
RecycledFuture::new(&mut self.fut_storage, async move {
let mut slot = slot.lock().unwrap();
*slot = Some(arg);
Ok(()) Ok(())
}) })

View File

@ -87,12 +87,13 @@
//! Although uncommon, there is sometimes a need for connecting and/or //! Although uncommon, there is sometimes a need for connecting and/or
//! disconnecting models after they have been migrated to the simulation. //! disconnecting models after they have been migrated to the simulation.
//! Likewise, one may want to connect or disconnect an [`EventSlot`] or //! Likewise, one may want to connect or disconnect an [`EventSlot`] or
//! [`EventQueue`] after the simulation has been instantiated. //! [`EventStream`] after the simulation has been instantiated.
//! //!
//! There is actually a very simple solution to this problem: since the //! There is actually a very simple solution to this problem: since the
//! [`InputFn`] trait also matches closures of type `FnOnce(&mut impl Model)`, //! [`InputFn`](crate::model::InputFn) trait also matches closures of type
//! it is enough to invoke [`Simulation::send_event()`] with a closure that //! `FnOnce(&mut impl Model)`, it is enough to invoke
//! connects or disconnects a port, such as: //! [`Simulation::send_event()`] with a closure that connects or disconnects a
//! port, such as:
//! //!
//! ``` //! ```
//! # use asynchronix::model::{Model, Output}; //! # use asynchronix::model::{Model, Output};
@ -118,13 +119,13 @@
//! &modelA_addr //! &modelA_addr
//! ); //! );
//! ``` //! ```
mod endpoints;
mod mailbox; mod mailbox;
mod sim_init; mod sim_init;
mod sink;
pub use endpoints::{EventSlot, EventStream};
pub use mailbox::{Address, Mailbox}; pub use mailbox::{Address, Mailbox};
pub use sim_init::SimInit; pub use sim_init::SimInit;
pub use sink::{EventQueue, EventSink, EventSinkWriter, EventSlot};
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
@ -150,7 +151,8 @@ use crate::util::sync_cell::SyncCell;
/// [`SimInit::init()`](crate::simulation::SimInit::init) or /// [`SimInit::init()`](crate::simulation::SimInit::init) or
/// [`SimInit::init_with_clock()`](crate::simulation::SimInit::init_with_clock) /// [`SimInit::init_with_clock()`](crate::simulation::SimInit::init_with_clock)
/// method on a simulation initializer. It contains an asynchronous executor /// method on a simulation initializer. It contains an asynchronous executor
/// that runs all simulation models added beforehand to [`SimInit`]. /// that runs all simulation models added beforehand to
/// [`SimInit`](crate::simulation::SimInit).
/// ///
/// A [`Simulation`] object also manages an event scheduling queue and /// A [`Simulation`] object also manages an event scheduling queue and
/// simulation time. The scheduling queue can be accessed from the simulation /// simulation time. The scheduling queue can be accessed from the simulation

View File

@ -0,0 +1,69 @@
use std::fmt;
use std::sync::{Arc, Mutex, TryLockError, TryLockResult};
use crate::util::spsc_queue;
/// An iterator that returns all events that were broadcast by an output port.
///
/// Events are returned in first-in-first-out order. Note that even if the
/// iterator returns `None`, it may still produce more items after simulation
/// time is incremented.
pub struct EventStream<T> {
consumer: spsc_queue::Consumer<T>,
}
impl<T> EventStream<T> {
/// Creates a new `EventStream`.
pub(crate) fn new(consumer: spsc_queue::Consumer<T>) -> Self {
Self { consumer }
}
}
impl<T> Iterator for EventStream<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.consumer.pop()
}
}
impl<T> fmt::Debug for EventStream<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventStream").finish_non_exhaustive()
}
}
/// A single-value slot that holds the last event that was broadcast by an
/// output port.
pub struct EventSlot<T> {
slot: Arc<Mutex<Option<T>>>,
}
impl<T> EventSlot<T> {
/// Creates a new `EventSlot`.
pub(crate) fn new(slot: Arc<Mutex<Option<T>>>) -> Self {
Self { slot }
}
/// Take the last event, if any, leaving the slot empty.
///
/// Note that even after the event is taken, it may become populated anew
/// after simulation time is incremented.
pub fn take(&mut self) -> Option<T> {
// We don't actually need to take self by mutable reference, but this
// signature is probably less surprising for the user and more
// consistent with `EventStream`. It also prevents multi-threaded
// access, which would be likely to be misused.
match self.slot.try_lock() {
TryLockResult::Ok(mut v) => v.take(),
TryLockResult::Err(TryLockError::WouldBlock) => None,
TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(),
}
}
}
impl<T> fmt::Debug for EventSlot<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventSlot").finish_non_exhaustive()
}
}

View File

@ -69,8 +69,9 @@ impl SimInit {
Simulation::new(self.executor, self.scheduler_queue, self.time) Simulation::new(self.executor, self.scheduler_queue, self.time)
} }
/// Builds a simulation synchronized with the provided [`Clock`] and /// Builds a simulation synchronized with the provided
/// initialized at the specified simulation time, executing the /// [`Clock`](crate::time::Clock) and initialized at the specified
/// simulation time, executing the
/// [`Model::init()`](crate::model::Model::init) method on all model /// [`Model::init()`](crate::model::Model::init) method on all model
/// initializers. /// initializers.
pub fn init_with_clock( pub fn init_with_clock(

View File

@ -1,171 +0,0 @@
use std::fmt;
use std::sync::{Arc, Mutex, TryLockError, TryLockResult};
use crossbeam_queue::SegQueue;
/// A simulation endpoint that can receive events sent by model outputs.
///
/// An `EventSink` can be thought of as a self-standing input meant to
/// externally monitor the simulated system.
pub trait EventSink<T> {
/// Writer handle to an event sink.
type Writer: EventSinkWriter<T>;
/// Returns the writer handle associated to this sink.
fn writer(&self) -> Self::Writer;
}
/// A writer handle to an event sink.
pub trait EventSinkWriter<T>: Send + Sync + 'static {
/// Writes a value to the associated sink.
fn write(&self, event: T);
}
/// An event sink iterator that returns all events that were broadcast by
/// connected output ports.
///
/// Events are returned in first-in-first-out order. Note that even if the
/// iterator returns `None`, it may still produce more items in the future (in
/// other words, it is not a [`FusedIterator`](std::iter::FusedIterator)).
pub struct EventQueue<T> {
queue: Arc<SegQueue<T>>,
}
impl<T> EventQueue<T> {
/// Creates a new `EventStream`.
pub fn new() -> Self {
Self {
queue: Arc::new(SegQueue::new()),
}
}
}
impl<T: Send + 'static> EventSink<T> for EventQueue<T> {
type Writer = EventQueueWriter<T>;
fn writer(&self) -> Self::Writer {
EventQueueWriter {
queue: self.queue.clone(),
}
}
}
impl<T> Iterator for EventQueue<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop()
}
}
impl<T> Default for EventQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Debug for EventQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventQueue").finish_non_exhaustive()
}
}
/// A producer handle of an `EventStream`.
pub struct EventQueueWriter<T> {
queue: Arc<SegQueue<T>>,
}
impl<T: Send + 'static> EventSinkWriter<T> for EventQueueWriter<T> {
/// Pushes an event onto the queue.
fn write(&self, event: T) {
self.queue.push(event);
}
}
impl<T> fmt::Debug for EventQueueWriter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventQueueWriter").finish_non_exhaustive()
}
}
/// A single-value event sink that holds the last event that was broadcast by
/// any of the connected output ports.
pub struct EventSlot<T> {
slot: Arc<Mutex<Option<T>>>,
}
impl<T> EventSlot<T> {
/// Creates a new `EventSlot`.
pub fn new() -> Self {
Self {
slot: Arc::new(Mutex::new(None)),
}
}
/// Take the last event, if any, leaving the slot empty.
///
/// Note that even after the event is taken, it may become populated anew.
pub fn take(&mut self) -> Option<T> {
// We don't actually need to take self by mutable reference, but this
// signature is probably less surprising for the user and more
// consistent with `EventStream`. It also prevents multi-threaded
// access, which would be likely to be misused.
match self.slot.try_lock() {
TryLockResult::Ok(mut v) => v.take(),
TryLockResult::Err(TryLockError::WouldBlock) => None,
TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(),
}
}
}
impl<T: Send + 'static> EventSink<T> for EventSlot<T> {
type Writer = EventSlotWriter<T>;
/// Returns a writer handle.
fn writer(&self) -> EventSlotWriter<T> {
EventSlotWriter {
slot: self.slot.clone(),
}
}
}
impl<T> Default for EventSlot<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Debug for EventSlot<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventSlot").finish_non_exhaustive()
}
}
/// A writer handle of an `EventSlot`.
pub struct EventSlotWriter<T> {
slot: Arc<Mutex<Option<T>>>,
}
impl<T: Send + 'static> EventSinkWriter<T> for EventSlotWriter<T> {
/// Write an event into the slot.
fn write(&self, event: T) {
// Why do we just use `try_lock` and abandon if the lock is taken? The
// reason is that (i) the reader is never supposed to access the slot
// when the simulation runs and (ii) as a rule the simulator does not
// warrant fairness when concurrently writing to an input. Therefore, if
// the mutex is already locked when this writer attempts to lock it, it
// means another writer is concurrently writing an event, and that event
// is just as legitimate as ours so there is not need to overwrite it.
match self.slot.try_lock() {
TryLockResult::Ok(mut v) => *v = Some(event),
TryLockResult::Err(TryLockError::WouldBlock) => {}
TryLockResult::Err(TryLockError::Poisoned(_)) => panic!(),
}
}
}
impl<T> fmt::Debug for EventSlotWriter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventStreamWriter").finish_non_exhaustive()
}
}

View File

@ -3,7 +3,7 @@
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::num::NonZeroUsize; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -13,7 +13,7 @@ use std::time::Duration;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel::Sender; use crate::channel::{ChannelId, Sender};
use crate::executor::Executor; use crate::executor::Executor;
use crate::model::{InputFn, Model}; use crate::model::{InputFn, Model};
use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::time::{MonotonicTime, TearableAtomicTime};
@ -21,15 +21,7 @@ use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCellReader; use crate::util::sync_cell::SyncCellReader;
/// Shorthand for the scheduler queue type. /// Shorthand for the scheduler queue type.
pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, ChannelId), Box<dyn ScheduledEvent>>;
// Why use both time and channel ID as the key? The short answer is that this
// ensures that events targeting the same channel are sent in the order they
// were scheduled. More precisely, this ensures that events targeting the same
// channel are ordered contiguously in the priority queue, which in turns allows
// the event loop to easily aggregate such events into single futures and thus
// control their relative order of execution.
pub(crate) type SchedulerQueue =
PriorityQueue<(MonotonicTime, NonZeroUsize), Box<dyn ScheduledEvent>>;
/// Trait abstracting over time-absolute and time-relative deadlines. /// Trait abstracting over time-absolute and time-relative deadlines.
/// ///
@ -487,7 +479,7 @@ impl Error for SchedulingError {}
/// Schedules an event at a future time. /// Schedules an event at a future time.
/// ///
/// This function does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
pub(crate) fn schedule_event_at_unchecked<M, F, T, S>( pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
time: MonotonicTime, time: MonotonicTime,
@ -503,7 +495,7 @@ pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
{ {
let channel_id = sender.channel_id(); let channel_id = sender.channel_id();
let event_dispatcher = Box::new(EventDispatcher::new(dispatch_event(func, arg, sender))); let event_dispatcher = Box::new(new_event_dispatcher(func, arg, sender));
let mut scheduler_queue = scheduler_queue.lock().unwrap(); let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), event_dispatcher); scheduler_queue.insert((time, channel_id), event_dispatcher);
@ -511,7 +503,7 @@ pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
/// Schedules an event at a future time, returning an event key. /// Schedules an event at a future time, returning an event key.
/// ///
/// This function does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>( pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
time: MonotonicTime, time: MonotonicTime,
@ -529,8 +521,10 @@ where
let event_key = EventKey::new(); let event_key = EventKey::new();
let channel_id = sender.channel_id(); let channel_id = sender.channel_id();
let event_dispatcher = Box::new(KeyedEventDispatcher::new( let event_dispatcher = Box::new(KeyedEventDispatcher::new(
|ek| dispatch_keyed_event(ek, func, arg, sender),
event_key.clone(), event_key.clone(),
func,
arg,
sender,
)); ));
let mut scheduler_queue = scheduler_queue.lock().unwrap(); let mut scheduler_queue = scheduler_queue.lock().unwrap();
@ -541,7 +535,7 @@ where
/// Schedules a periodic event at a future time. /// Schedules a periodic event at a future time.
/// ///
/// This function does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>( pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
time: MonotonicTime, time: MonotonicTime,
@ -558,10 +552,7 @@ pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
{ {
let channel_id = sender.channel_id(); let channel_id = sender.channel_id();
let event_dispatcher = Box::new(PeriodicEventDispatcher::new( let event_dispatcher = Box::new(PeriodicEventDispatcher::new(func, arg, sender, period));
|| dispatch_event(func, arg, sender),
period,
));
let mut scheduler_queue = scheduler_queue.lock().unwrap(); let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), event_dispatcher); scheduler_queue.insert((time, channel_id), event_dispatcher);
@ -569,7 +560,7 @@ pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
/// Schedules an event at a future time, returning an event key. /// Schedules an event at a future time, returning an event key.
/// ///
/// This function does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
pub(crate) fn schedule_periodic_keyed_event_at_unchecked<M, F, T, S>( pub(crate) fn schedule_periodic_keyed_event_at_unchecked<M, F, T, S>(
time: MonotonicTime, time: MonotonicTime,
@ -588,9 +579,11 @@ where
let event_key = EventKey::new(); let event_key = EventKey::new();
let channel_id = sender.channel_id(); let channel_id = sender.channel_id();
let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new( let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new(
|ek| dispatch_keyed_event(ek, func, arg, sender),
period,
event_key.clone(), event_key.clone(),
func,
arg,
sender,
period,
)); ));
let mut scheduler_queue = scheduler_queue.lock().unwrap(); let mut scheduler_queue = scheduler_queue.lock().unwrap();
@ -621,8 +614,8 @@ pub(crate) trait ScheduledEvent: Send {
} }
pin_project! { pin_project! {
/// An object that can be converted to a future dispatching a /// Object that can be converted to a future dispatching a non-cancellable
/// non-cancellable event. /// event.
/// ///
/// Note that this particular event dispatcher is in fact already a future: /// Note that this particular event dispatcher is in fact already a future:
/// since the future cannot be cancelled and the dispatcher does not need to /// since the future cannot be cancelled and the dispatcher does not need to
@ -634,14 +627,23 @@ pin_project! {
} }
} }
impl<F> EventDispatcher<F> /// Constructs a new `EventDispatcher`.
///
/// Due to some limitations of type inference or of my understanding of it, the
/// constructor for this event dispatchers is a freestanding function.
fn new_event_dispatcher<M, F, T, S>(
func: F,
arg: T,
sender: Sender<M>,
) -> EventDispatcher<impl Future<Output = ()>>
where where
F: Future<Output = ()> + Send + 'static, M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{ {
/// Constructs a new `EventDispatcher`. let fut = dispatch_event(func, arg, sender);
pub(crate) fn new(fut: F) -> Self {
EventDispatcher { fut } EventDispatcher { fut }
}
} }
impl<F> Future for EventDispatcher<F> impl<F> Future for EventDispatcher<F>
@ -675,78 +677,112 @@ where
} }
} }
/// An object that can be converted to a future dispatching a non-cancellable, /// Object that can be converted to a future dispatching a non-cancellable periodic
/// periodic event. /// event.
pub(crate) struct PeriodicEventDispatcher<G, F> pub(crate) struct PeriodicEventDispatcher<M, F, T, S>
where where
G: (FnOnce() -> F) + Clone + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static,
{ {
/// A clonable generator for the dispatching future. func: F,
gen: G, arg: T,
/// The event repetition period. sender: Sender<M>,
period: Duration, period: Duration,
_input_kind: PhantomData<S>,
} }
impl<G, F> PeriodicEventDispatcher<G, F> impl<M, F, T, S> PeriodicEventDispatcher<M, F, T, S>
where where
G: (FnOnce() -> F) + Clone + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{ {
/// Constructs a new `PeriodicEventDispatcher`. /// Constructs a new `PeriodicEventDispatcher`.
fn new(gen: G, period: Duration) -> Self { fn new(func: F, arg: T, sender: Sender<M>, period: Duration) -> Self {
Self { gen, period } Self {
func,
arg,
sender,
period,
_input_kind: PhantomData,
}
} }
} }
impl<G, F> ScheduledEvent for PeriodicEventDispatcher<G, F> impl<M, F, T, S> ScheduledEvent for PeriodicEventDispatcher<M, F, T, S>
where where
G: (FnOnce() -> F) + Clone + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{ {
fn is_cancelled(&self) -> bool { fn is_cancelled(&self) -> bool {
false false
} }
fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> { fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
let event = Box::new(Self::new(self.gen.clone(), self.period)); let event = Box::new(Self::new(
self.func.clone(),
self.arg.clone(),
self.sender.clone(),
self.period,
));
Some((event, self.period)) Some((event, self.period))
} }
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> { fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)()) let Self {
func, arg, sender, ..
} = *self;
Box::pin(dispatch_event(func, arg, sender))
} }
fn spawn_and_forget(self: Box<Self>, executor: &Executor) { fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)()); let Self {
func, arg, sender, ..
} = *self;
let fut = dispatch_event(func, arg, sender);
executor.spawn_and_forget(fut);
} }
} }
/// An object that can be converted to a future dispatching a cancellable event. /// Object that can be converted to a future dispatching a cancellable event.
pub(crate) struct KeyedEventDispatcher<G, F> pub(crate) struct KeyedEventDispatcher<M, F, T, S>
where where
G: (FnOnce(EventKey) -> F) + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{ {
/// A generator for the dispatching future.
gen: G,
/// The event cancellation key.
event_key: EventKey, event_key: EventKey,
func: F,
arg: T,
sender: Sender<M>,
_input_kind: PhantomData<S>,
} }
impl<G, F> KeyedEventDispatcher<G, F> impl<M, F, T, S> KeyedEventDispatcher<M, F, T, S>
where where
G: (FnOnce(EventKey) -> F) + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{ {
/// Constructs a new `KeyedEventDispatcher`. /// Constructs a new `KeyedEventDispatcher`.
fn new(gen: G, event_key: EventKey) -> Self { fn new(event_key: EventKey, func: F, arg: T, sender: Sender<M>) -> Self {
Self { gen, event_key } Self {
event_key,
func,
arg,
sender,
_input_kind: PhantomData,
}
} }
} }
impl<G, F> ScheduledEvent for KeyedEventDispatcher<G, F> impl<M, F, T, S> ScheduledEvent for KeyedEventDispatcher<M, F, T, S>
where where
G: (FnOnce(EventKey) -> F) + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{ {
fn is_cancelled(&self) -> bool { fn is_cancelled(&self) -> bool {
self.event_key.is_cancelled() self.event_key.is_cancelled()
@ -755,74 +791,116 @@ where
None None
} }
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> { fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)(self.event_key)) let Self {
event_key,
func,
arg,
sender,
..
} = *self;
Box::pin(dispatch_keyed_event(event_key, func, arg, sender))
} }
fn spawn_and_forget(self: Box<Self>, executor: &Executor) { fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)(self.event_key)); let Self {
event_key,
func,
arg,
sender,
..
} = *self;
let fut = dispatch_keyed_event(event_key, func, arg, sender);
executor.spawn_and_forget(fut);
} }
} }
/// An object that can be converted to a future dispatching a periodic, /// Object that can be converted to a future dispatching a cancellable event.
/// cancellable event. pub(crate) struct PeriodicKeyedEventDispatcher<M, F, T, S>
pub(crate) struct PeriodicKeyedEventDispatcher<G, F>
where where
G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{ {
/// A clonable generator for the dispatching future.
gen: G,
/// The repetition period.
period: Duration,
/// The event cancellation key.
event_key: EventKey, event_key: EventKey,
func: F,
arg: T,
sender: Sender<M>,
period: Duration,
_input_kind: PhantomData<S>,
} }
impl<G, F> PeriodicKeyedEventDispatcher<G, F> impl<M, F, T, S> PeriodicKeyedEventDispatcher<M, F, T, S>
where where
G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{ {
/// Constructs a new `KeyedEventDispatcher`. /// Constructs a new `KeyedEventDispatcher`.
fn new(gen: G, period: Duration, event_key: EventKey) -> Self { fn new(event_key: EventKey, func: F, arg: T, sender: Sender<M>, period: Duration) -> Self {
Self { Self {
gen,
period,
event_key, event_key,
func,
arg,
sender,
period,
_input_kind: PhantomData,
} }
} }
} }
impl<G, F> ScheduledEvent for PeriodicKeyedEventDispatcher<G, F> impl<M, F, T, S> ScheduledEvent for PeriodicKeyedEventDispatcher<M, F, T, S>
where where
G: (FnOnce(EventKey) -> F) + Clone + Send + 'static, M: Model,
F: Future<Output = ()> + Send + 'static, F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{ {
fn is_cancelled(&self) -> bool { fn is_cancelled(&self) -> bool {
self.event_key.is_cancelled() self.event_key.is_cancelled()
} }
fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> { fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
let event = Box::new(Self::new( let event = Box::new(Self::new(
self.gen.clone(),
self.period,
self.event_key.clone(), self.event_key.clone(),
self.func.clone(),
self.arg.clone(),
self.sender.clone(),
self.period,
)); ));
Some((event, self.period)) Some((event, self.period))
} }
fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> { fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin((self.gen)(self.event_key)) let Self {
event_key,
func,
arg,
sender,
..
} = *self;
Box::pin(dispatch_keyed_event(event_key, func, arg, sender))
} }
fn spawn_and_forget(self: Box<Self>, executor: &Executor) { fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
executor.spawn_and_forget((self.gen)(self.event_key)); let Self {
event_key,
func,
arg,
sender,
..
} = *self;
let fut = dispatch_keyed_event(event_key, func, arg, sender);
executor.spawn_and_forget(fut);
} }
} }
/// Asynchronously dispatches a non-cancellable event to a model input. /// Asynchronously dispatch a regular, non-cancellable event.
pub(crate) async fn dispatch_event<M, F, T, S>(func: F, arg: T, sender: Sender<M>) async fn dispatch_event<M, F, T, S>(func: F, arg: T, sender: Sender<M>)
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S>,
T: Send + 'static, T: Send + Clone + 'static,
{ {
let _ = sender let _ = sender
.send( .send(
@ -838,13 +916,9 @@ where
.await; .await;
} }
/// Asynchronously dispatches a cancellable event to a model input. /// Asynchronously dispatch a cancellable event.
pub(crate) async fn dispatch_keyed_event<M, F, T, S>( async fn dispatch_keyed_event<M, F, T, S>(event_key: EventKey, func: F, arg: T, sender: Sender<M>)
event_key: EventKey, where
func: F,
arg: T,
sender: Sender<M>,
) where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S>, F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static, T: Send + Clone + 'static,

View File

@ -3,4 +3,5 @@ pub(crate) mod futures;
pub(crate) mod priority_queue; pub(crate) mod priority_queue;
pub(crate) mod rng; pub(crate) mod rng;
pub(crate) mod slot; pub(crate) mod slot;
pub(crate) mod spsc_queue;
pub(crate) mod sync_cell; pub(crate) mod sync_cell;

View File

@ -1,6 +1,6 @@
//! Associative priority queue. //! Associative priority queue.
use std::cmp::Ordering; use std::cmp::{Eq, Ord, Ordering, PartialOrd};
use std::collections::BinaryHeap; use std::collections::BinaryHeap;
/// A key-value pair ordered by keys in inverse order, with epoch-based ordering /// A key-value pair ordered by keys in inverse order, with epoch-based ordering
@ -111,7 +111,7 @@ impl<K: Copy + Ord, V> PriorityQueue<K, V> {
#[cfg(all(test, not(asynchronix_loom)))] #[cfg(all(test, not(asynchronix_loom)))]
mod tests { mod tests {
use super::PriorityQueue; use super::*;
#[test] #[test]
fn priority_smoke() { fn priority_smoke() {

View File

@ -0,0 +1,393 @@
//! Single-producer single-consumer unbounded FIFO queue that stores values in
//! fixed-size memory segments.
#![allow(unused)]
use std::cell::Cell;
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering;
use crossbeam_utils::CachePadded;
use crate::loom_exports::cell::UnsafeCell;
use crate::loom_exports::sync::atomic::{AtomicBool, AtomicPtr};
use crate::loom_exports::sync::Arc;
/// The number of slots in a single segment.
const SEGMENT_LEN: usize = 32;
/// A slot containing a single value.
struct Slot<T> {
has_value: AtomicBool,
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Default for Slot<T> {
fn default() -> Self {
Slot {
has_value: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
/// A memory segment containing `SEGMENT_LEN` slots.
struct Segment<T> {
/// Address of the next segment.
///
/// A null pointer means that the next segment is not allocated yet.
next_segment: AtomicPtr<Segment<T>>,
data: [Slot<T>; SEGMENT_LEN],
}
impl<T> Segment<T> {
/// Allocates a new segment.
fn allocate_new() -> NonNull<Self> {
let segment = Self {
next_segment: AtomicPtr::new(ptr::null_mut()),
data: Default::default(),
};
// Safety: the pointer is non-null since it comes from a box.
unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(segment))) }
}
}
/// The head of the queue from which values are popped.
struct Head<T> {
/// Pointer to the segment at the head of the queue.
segment: NonNull<Segment<T>>,
/// Index of the next value to be read.
///
/// If the index is equal to the segment length, it is necessary to move to
/// the next segment before the next value can be read.
next_read_idx: usize,
}
/// The tail of the queue to which values are pushed.
struct Tail<T> {
/// Pointer to the segment at the tail of the queue.
segment: NonNull<Segment<T>>,
/// Index of the next value to be written.
///
/// If the index is equal to the segment length, a new segment must be
/// allocated before a new value can be written.
next_write_idx: usize,
}
/// A single-producer, single-consumer unbounded FIFO queue.
struct Queue<T> {
head: CachePadded<UnsafeCell<Head<T>>>,
tail: CachePadded<UnsafeCell<Tail<T>>>,
}
impl<T> Queue<T> {
/// Creates a new queue.
fn new() -> Self {
let segment = Segment::allocate_new();
let head = Head {
segment,
next_read_idx: 0,
};
let tail = Tail {
segment,
next_write_idx: 0,
};
Self {
head: CachePadded::new(UnsafeCell::new(head)),
tail: CachePadded::new(UnsafeCell::new(tail)),
}
}
/// Pushes a new value.
///
/// # Safety
///
/// The method cannot be called from multiple threads concurrently.
unsafe fn push(&self, value: T) {
// Safety: this is the only thread accessing the tail.
let tail = self.tail.with_mut(|p| &mut *p);
// If the whole segment has been written, allocate a new segment.
if tail.next_write_idx == SEGMENT_LEN {
let old_segment = tail.segment;
tail.segment = Segment::allocate_new();
// Safety: the old segment is still allocated since the consumer
// cannot deallocate it before `next_segment` is set to a non-null
// value.
old_segment
.as_ref()
.next_segment
.store(tail.segment.as_ptr(), Ordering::Release);
tail.next_write_idx = 0;
}
// Safety: the tail segment is allocated since the consumer cannot
// deallocate it before `next_segment` is set to a non-null value.
let data = &tail.segment.as_ref().data[tail.next_write_idx];
// Safety: we have exclusive access to the slot value since the consumer
// cannot access it before `has_value` is set to true.
data.value.with_mut(|p| (*p).write(value));
// Ordering: this Release store synchronizes with the Acquire load in
// `pop` and ensures that the value is visible to the consumer once
// `has_value` reads `true`.
data.has_value.store(true, Ordering::Release);
tail.next_write_idx += 1;
}
/// Pops a new value.
///
/// # Safety
///
/// The method cannot be called from multiple threads concurrently.
unsafe fn pop(&self) -> Option<T> {
// Safety: this is the only thread accessing the head.
let head = self.head.with_mut(|p| &mut *p);
// If the whole segment has been read, try to move to the next segment.
if head.next_read_idx == SEGMENT_LEN {
// Read the next segment or return `None` if it is not ready yet.
//
// Safety: the head segment is still allocated since we are the only
// thread that can deallocate it.
let next_segment = head.segment.as_ref().next_segment.load(Ordering::Acquire);
let next_segment = NonNull::new(next_segment)?;
// Deallocate the old segment.
//
// Safety: the pointer was initialized from a box and the segment is
// still allocated since we are the only thread that can deallocate
// it.
let _ = Box::from_raw(head.segment.as_ptr());
// Update the segment and the next index.
head.segment = next_segment;
head.next_read_idx = 0;
}
let data = &head.segment.as_ref().data[head.next_read_idx];
// Ordering: this Acquire load synchronizes with the Release store in
// `push` and ensures that the value is visible once `has_value` reads
// `true`.
if !data.has_value.load(Ordering::Acquire) {
return None;
}
// Safety: since `has_value` is `true` then we have exclusive ownership
// of the value and we know that it was initialized.
let value = data.value.with(|p| (*p).assume_init_read());
head.next_read_idx += 1;
Some(value)
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
// Drop all values.
while self.pop().is_some() {}
// All values have been dropped: the last segment can be freed.
// Safety: this is the only thread accessing the head since both the
// consumer and producer have been dropped.
let head = self.head.with_mut(|p| &mut *p);
// Safety: the pointer was initialized from a box and the segment is
// still allocated since we are the only thread that can deallocate
// it.
let _ = Box::from_raw(head.segment.as_ptr());
}
}
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> UnwindSafe for Queue<T> {}
impl<T> RefUnwindSafe for Queue<T> {}
/// A handle to a single-producer, single-consumer queue that can push values.
pub(crate) struct Producer<T> {
queue: Arc<Queue<T>>,
_non_sync_phantom: PhantomData<Cell<()>>,
}
impl<T> Producer<T> {
/// Pushes a value to the queue.
pub(crate) fn push(&self, value: T) -> Result<(), PushError> {
if Arc::strong_count(&self.queue) == 1 {
return Err(PushError {});
}
unsafe { self.queue.push(value) };
Ok(())
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
/// Error returned when a push failed due to the consumer being dropped.
pub(crate) struct PushError {}
impl fmt::Display for PushError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending message into a closed mailbox")
}
}
impl Error for PushError {}
/// A handle to a single-producer, single-consumer queue that can pop values.
pub(crate) struct Consumer<T> {
queue: Arc<Queue<T>>,
_non_sync_phantom: PhantomData<Cell<()>>,
}
impl<T> Consumer<T> {
/// Pops a value from the queue.
pub(crate) fn pop(&self) -> Option<T> {
unsafe { self.queue.pop() }
}
}
/// Creates the producer and consumer handles of a single-producer,
/// single-consumer queue.
pub(crate) fn spsc_queue<T>() -> (Producer<T>, Consumer<T>) {
let queue = Arc::new(Queue::new());
let producer = Producer {
queue: queue.clone(),
_non_sync_phantom: PhantomData,
};
let consumer = Consumer {
queue,
_non_sync_phantom: PhantomData,
};
(producer, consumer)
}
/// Loom tests.
#[cfg(all(test, not(asynchronix_loom)))]
mod tests {
use super::*;
use std::thread;
#[test]
fn spsc_queue_basic() {
const VALUE_COUNT: usize = if cfg!(miri) { 1000 } else { 100_000 };
let (producer, consumer) = spsc_queue();
let th = thread::spawn(move || {
for i in 0..VALUE_COUNT {
let value = loop {
if let Some(v) = consumer.pop() {
break v;
}
};
assert_eq!(value, i);
}
});
for i in 0..VALUE_COUNT {
producer.push(i).unwrap();
}
th.join().unwrap();
}
}
/// Loom tests.
#[cfg(all(test, asynchronix_loom))]
mod tests {
use super::*;
use loom::model::Builder;
use loom::thread;
#[test]
fn loom_spsc_queue_basic() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
const VALUE_COUNT: usize = 10;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, consumer) = spsc_queue();
let th = thread::spawn(move || {
let mut value = 0;
for _ in 0..VALUE_COUNT {
if let Some(v) = consumer.pop() {
assert_eq!(v, value);
value += 1;
}
}
});
for i in 0..VALUE_COUNT {
let _ = producer.push(i);
}
th.join().unwrap();
});
}
#[test]
fn loom_spsc_queue_new_segment() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
const VALUE_COUNT_BEFORE: usize = 5;
const VALUE_COUNT_AFTER: usize = 5;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, consumer) = spsc_queue();
// Fill up the first segment except for the last `VALUE_COUNT_BEFORE` slots.
for i in 0..(SEGMENT_LEN - VALUE_COUNT_BEFORE) {
producer.push(i).unwrap();
consumer.pop();
}
let th = thread::spawn(move || {
let mut value = SEGMENT_LEN - VALUE_COUNT_BEFORE;
for _ in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
if let Some(v) = consumer.pop() {
assert_eq!(v, value);
value += 1;
}
}
});
for i in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
let _ = producer.push(i);
}
th.join().unwrap();
});
}
}

View File

@ -3,7 +3,7 @@
use std::time::Duration; use std::time::Duration;
use asynchronix::model::{Model, Output}; use asynchronix::model::{Model, Output};
use asynchronix::simulation::{EventQueue, Mailbox, SimInit}; use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{EventKey, MonotonicTime, Scheduler}; use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
#[test] #[test]
@ -27,8 +27,7 @@ fn model_schedule_event() {
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let mut output = EventQueue::new(); let mut output = model.output.connect_stream().0;
model.output.connect_sink(&output);
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
@ -72,8 +71,7 @@ fn model_cancel_future_keyed_event() {
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let mut output = EventQueue::new(); let mut output = model.output.connect_stream().0;
model.output.connect_sink(&output);
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
@ -118,8 +116,7 @@ fn model_cancel_same_time_keyed_event() {
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let mut output = EventQueue::new(); let mut output = model.output.connect_stream().0;
model.output.connect_sink(&output);
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
@ -160,8 +157,7 @@ fn model_schedule_periodic_event() {
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let mut output = EventQueue::new(); let mut output = model.output.connect_stream().0;
model.output.connect_sink(&output);
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
@ -210,8 +206,7 @@ fn model_cancel_periodic_event() {
let mut model = TestModel::default(); let mut model = TestModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let mut output = EventQueue::new(); let mut output = model.output.connect_stream().0;
model.output.connect_sink(&output);
let addr = mbox.address(); let addr = mbox.address();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;

View File

@ -3,7 +3,7 @@
use std::time::Duration; use std::time::Duration;
use asynchronix::model::{Model, Output}; use asynchronix::model::{Model, Output};
use asynchronix::simulation::{Address, EventQueue, Mailbox, SimInit, Simulation}; use asynchronix::simulation::{Address, EventStream, Mailbox, SimInit, Simulation};
use asynchronix::time::MonotonicTime; use asynchronix::time::MonotonicTime;
// Input-to-output pass-through model. // Input-to-output pass-through model.
@ -26,13 +26,12 @@ impl<T: Clone + Send + 'static> Model for PassThroughModel<T> {}
/// output) running as fast as possible. /// output) running as fast as possible.
fn passthrough_bench<T: Clone + Send + 'static>( fn passthrough_bench<T: Clone + Send + 'static>(
t0: MonotonicTime, t0: MonotonicTime,
) -> (Simulation, Address<PassThroughModel<T>>, EventQueue<T>) { ) -> (Simulation, Address<PassThroughModel<T>>, EventStream<T>) {
// Bench assembly. // Bench assembly.
let mut model = PassThroughModel::new(); let mut model = PassThroughModel::new();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let out_stream = EventQueue::new(); let out_stream = model.output.connect_stream().0;
model.output.connect_sink(&out_stream);
let addr = mbox.address(); let addr = mbox.address();
let simu = SimInit::new().add_model(model, mbox).init(t0); let simu = SimInit::new().add_model(model, mbox).init(t0);
@ -244,14 +243,13 @@ fn timestamp_bench(
) -> ( ) -> (
Simulation, Simulation,
Address<TimestampModel>, Address<TimestampModel>,
EventQueue<(Instant, SystemTime)>, EventStream<(Instant, SystemTime)>,
) { ) {
// Bench assembly. // Bench assembly.
let mut model = TimestampModel::default(); let mut model = TimestampModel::default();
let mbox = Mailbox::new(); let mbox = Mailbox::new();
let stamp_stream = EventQueue::new(); let stamp_stream = model.stamp.connect_stream().0;
model.stamp.connect_sink(&stamp_stream);
let addr = mbox.address(); let addr = mbox.address();
let simu = SimInit::new() let simu = SimInit::new()