1
0
forked from ROMEO/nexosim

Implement clonable outputs and add submodels example

This commit is contained in:
Jaŭhien Piatlicki 2024-04-26 11:06:40 +02:00
parent e7c0c5f217
commit 0734dc2fac
15 changed files with 524 additions and 89 deletions

View File

@ -39,6 +39,7 @@ dev-logs = []
async-event = "0.1"
crossbeam-utils = "0.8"
diatomic-waker = "0.1"
dyn-clone = "1.0"
futures-channel = "0.3"
futures-task = "0.3"
multishot = "0.3.2"

View File

@ -0,0 +1,153 @@
//! Example: an assembly consisting of a current-controlled stepper motor and
//! its driver.
//!
//! This example demonstrates in particular:
//!
//! * submodels,
//! * outputs cloning,
//! * self-scheduling methods,
//! * model setup,
//! * model initialization,
//! * simulation monitoring with event streams.
//!
//! ```text
//! ┌──────────────────────────────────────────────┐
//! │ Assembly │
//! │ ┌──────────┐ ┌──────────┐ │
//! PPS │ │ │ coil currents │ │ │position
//! Pulse rate ●───────▶│──▶│ Driver ├───────────────▶│ Motor ├──▶│─────────▶
//! (±freq)│ │ │ (IA, IB) │ │ │(0:199)
//! │ └──────────┘ └──────────┘ │
//! └──────────────────────────────────────────────┘
//! ```
use std::time::Duration;
use asynchronix::model::{Model, SetupContext};
use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::MonotonicTime;
mod stepper_motor;
pub use stepper_motor::{Driver, Motor};
pub struct MotorAssembly {
pub position: Output<u16>,
init_pos: u16,
load: Output<f64>,
pps: Output<f64>,
}
impl MotorAssembly {
pub fn new(init_pos: u16) -> Self {
Self {
position: Default::default(),
init_pos,
load: Default::default(),
pps: Default::default(),
}
}
/// Sets the pulse rate (sign = direction) [Hz] -- input port.
pub async fn pulse_rate(&mut self, pps: f64) {
self.pps.send(pps).await;
}
/// Torque applied by the load [N·m] -- input port.
pub async fn load(&mut self, torque: f64) {
self.load.send(torque).await;
}
}
impl Model for MotorAssembly {
fn setup(&mut self, setup_context: &SetupContext<Self>) {
let mut motor = Motor::new(self.init_pos);
let mut driver = Driver::new(1.0);
// Mailboxes.
let motor_mbox = Mailbox::new();
let driver_mbox = Mailbox::new();
// Connections.
self.pps.connect(Driver::pulse_rate, &driver_mbox);
self.load.connect(Motor::load, &motor_mbox);
driver.current_out.connect(Motor::current_in, &motor_mbox);
// Note: it is important to clone `position` from the parent to the
// submodel so that all connections made by the user to the parent model
// are preserved. Connections added after cloning are reflected in all
// clones.
motor.position = self.position.clone();
setup_context.add_model(driver, driver_mbox);
setup_context.add_model(motor, motor_mbox);
}
}
fn main() {
// ---------------
// Bench assembly.
// ---------------
// Models.
let init_pos = 123;
let mut assembly = MotorAssembly::new(init_pos);
// Mailboxes.
let assembly_mbox = Mailbox::new();
let assembly_addr = assembly_mbox.address();
// Model handles for simulation.
let mut position = EventBuffer::new();
assembly.position.connect_sink(&position);
// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;
// Assembly and initialization.
let mut simu = SimInit::new().add_model(assembly, assembly_mbox).init(t0);
// ----------
// Simulation.
// ----------
// Check initial conditions.
let mut t = t0;
assert_eq!(simu.time(), t);
assert_eq!(position.next(), Some(init_pos));
assert!(position.next().is_none());
// Start the motor in 2s with a PPS of 10Hz.
simu.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();
// Advance simulation time to two next events.
simu.step();
t += Duration::new(2, 0);
assert_eq!(simu.time(), t);
simu.step();
t += Duration::new(0, 100_000_000);
assert_eq!(simu.time(), t);
// Whichever the starting position, after two phase increments from the
// driver the rotor should have synchronized with the driver, with a
// position given by this beautiful formula.
let mut pos = (((init_pos + 1) / 4) * 4 + 1) % Motor::STEPS_PER_REV;
assert_eq!(position.by_ref().last().unwrap(), pos);
// Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to
// 9 position increments.
simu.step_by(Duration::new(0, 900_000_000));
t += Duration::new(0, 900_000_000);
assert_eq!(simu.time(), t);
for _ in 0..9 {
pos = (pos + 1) % Motor::STEPS_PER_REV;
assert_eq!(position.next(), Some(pos));
}
assert!(position.next().is_none());
}

View File

@ -40,7 +40,7 @@ impl Motor {
pub const TORQUE_CONSTANT: f64 = 1.0;
/// Creates a motor with the specified initial position.
fn new(position: u16) -> Self {
pub fn new(position: u16) -> Self {
Self {
position: Default::default(),
pos: position % Self::STEPS_PER_REV,
@ -176,6 +176,7 @@ impl Driver {
impl Model for Driver {}
#[allow(dead_code)]
fn main() {
// ---------------
// Bench assembly.

View File

@ -12,6 +12,41 @@
//! contrast, since events are buffered in the mailbox of the target model,
//! sending an event is a fire-and-forget operation. For this reason, output
//! ports should generally be preferred over requestor ports when possible.
//!
//! `Output` and `Requestor` ports are clonable. Their clones are shallow
//! copies, meaning that any modification of the ports connected to one clone is
//! immediately reflected in other clones.
//!
//! #### Example
//!
//! The outputs in this example are clones of each other and remain therefore
//! always connected to the same inputs. For an example usage of outputs cloning
//! in submodels assemblies, see the [`assembly example`][assembly].
//!
//! [assembly]:
//! https://github.com/asynchronics/asynchronix/tree/main/asynchronix/examples/assembly.rs
//!
//! ```
//! use asynchronix::model::Model;
//! use asynchronix::ports::Output;
//!
//! pub struct MyModel {
//! pub output_a: Output<u64>,
//! pub output_b: Output<u64>,
//! }
//!
//! impl MyModel {
//! pub fn new() -> Self {
//! let output: Output<_> = Default::default();
//! Self {
//! output_a: output.clone(),
//! output_b: output,
//! }
//! }
//! }
//!
//! impl Model for MyModel {}
//! ```
mod input;
mod output;

View File

@ -7,6 +7,7 @@ use crate::model::Model;
use crate::ports::{EventSink, LineError, LineId};
use crate::ports::{InputFn, ReplierFn};
use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock;
use broadcaster::{EventBroadcaster, QueryBroadcaster};
@ -17,9 +18,13 @@ use self::sender::{EventSinkSender, InputSender, ReplierSender};
/// `Output` ports can be connected to input ports, i.e. to asynchronous model
/// methods that return no value. They broadcast events to all connected input
/// ports.
///
/// When an `Output` is cloned, the information on connected ports remains
/// shared and therefore all clones use and modify the same list of connected
/// ports.
#[derive(Clone)]
pub struct Output<T: Clone + Send + 'static> {
broadcaster: EventBroadcaster<T>,
next_line_id: u64,
broadcaster: CachedRwLock<EventBroadcaster<T>>,
}
impl<T: Clone + Send + 'static> Output<T> {
@ -40,26 +45,16 @@ impl<T: Clone + Send + 'static> Output<T> {
F: for<'a> InputFn<'a, M, T, S> + Clone,
S: Send + 'static,
{
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.add(sender, line_id);
line_id
self.broadcaster.write().unwrap().add(sender)
}
/// Adds a connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
pub fn connect_sink<S: EventSink<T>>(&mut self, sink: &S) -> LineId {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(EventSinkSender::new(sink.writer()));
self.broadcaster.add(sender, line_id);
line_id
self.broadcaster.write().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter.
@ -69,7 +64,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// [`QuerySource`](crate::ports::QuerySource) instance and may result in
/// the disconnection of an arbitrary endpoint.
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.remove(line_id) {
if self.broadcaster.write().unwrap().remove(line_id) {
Ok(())
} else {
Err(LineError {})
@ -78,27 +73,31 @@ impl<T: Clone + Send + 'static> Output<T> {
/// Removes all connections.
pub fn disconnect_all(&mut self) {
self.broadcaster.clear();
self.broadcaster.write().unwrap().clear();
}
/// Broadcasts an event to all connected input ports.
pub async fn send(&mut self, arg: T) {
self.broadcaster.broadcast(arg).await.unwrap();
let broadcaster = self.broadcaster.write_scratchpad().unwrap();
broadcaster.broadcast(arg).await.unwrap();
}
}
impl<T: Clone + Send + 'static> Default for Output<T> {
fn default() -> Self {
Self {
broadcaster: EventBroadcaster::default(),
next_line_id: 0,
broadcaster: CachedRwLock::new(EventBroadcaster::default()),
}
}
}
impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Output ({} connected ports)", self.broadcaster.len())
write!(
f,
"Output ({} connected ports)",
self.broadcaster.read_unsync().len()
)
}
}
@ -107,9 +106,12 @@ impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
/// `Requestor` ports can be connected to replier ports, i.e. to asynchronous
/// model methods that return a value. They broadcast queries to all connected
/// replier ports.
///
/// When a `Requestor` is cloned, the information on connected ports remains
/// shared and therefore all clones use and modify the same list of connected
/// ports.
pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: QueryBroadcaster<T, R>,
next_line_id: u64,
broadcaster: CachedRwLock<QueryBroadcaster<T, R>>,
}
impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
@ -130,13 +132,8 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static,
{
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.add(sender, line_id);
line_id
self.broadcaster.write().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter.
@ -146,7 +143,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// [`QuerySource`](crate::ports::QuerySource) instance and may result in
/// the disconnection of an arbitrary endpoint.
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.remove(line_id) {
if self.broadcaster.write().unwrap().remove(line_id) {
Ok(())
} else {
Err(LineError {})
@ -155,26 +152,34 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// Removes all connections.
pub fn disconnect_all(&mut self) {
self.broadcaster.clear();
self.broadcaster.write().unwrap().clear();
}
/// Broadcasts a query to all connected replier ports.
pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ {
self.broadcaster.broadcast(arg).await.unwrap()
self.broadcaster
.write_scratchpad()
.unwrap()
.broadcast(arg)
.await
.unwrap()
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> {
fn default() -> Self {
Self {
broadcaster: QueryBroadcaster::default(),
next_line_id: 0,
broadcaster: CachedRwLock::new(QueryBroadcaster::default()),
}
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for Requestor<T, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Requestor ({} connected ports)", self.broadcaster.len())
write!(
f,
"Requestor ({} connected ports)",
self.broadcaster.read_unsync().len()
)
}
}

View File

@ -25,6 +25,8 @@ use crate::util::task_set::TaskSet;
/// - the outputs of all sender futures are returned all at once rather than
/// with an asynchronous iterator (a.k.a. async stream).
pub(super) struct BroadcasterInner<T: Clone, R> {
/// Line identifier for the next port to be connected.
next_line_id: u64,
/// The list of senders with their associated line identifier.
senders: Vec<(LineId, Box<dyn Sender<T, R>>)>,
/// Fields explicitly borrowed by the `BroadcastFuture`.
@ -38,15 +40,18 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>, id: LineId) {
self.senders.push((id, sender));
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
self.shared.futures_env.push(FutureEnv {
storage: None,
output: None,
});
self.senders.push((line_id, sender));
self.shared.futures_env.push(FutureEnv::default());
self.shared.task_set.resize(self.senders.len());
line_id
}
/// Removes the first sender with the specified identifier, if any.
@ -122,6 +127,7 @@ impl<T: Clone, R> Default for BroadcasterInner<T, R> {
let wake_src = wake_sink.source();
Self {
next_line_id: 0,
senders: Vec::new(),
shared: Shared {
wake_sink,
@ -133,12 +139,23 @@ impl<T: Clone, R> Default for BroadcasterInner<T, R> {
}
}
impl<T: Clone, R> Clone for BroadcasterInner<T, R> {
fn clone(&self) -> Self {
Self {
next_line_id: self.next_line_id,
senders: self.senders.clone(),
shared: self.shared.clone(),
}
}
}
/// An object that can efficiently broadcast events to several input ports.
///
/// This is very similar to `source::broadcaster::EventBroadcaster`, but
/// generates non-owned futures instead.
///
/// See `BroadcasterInner` for implementation details.
#[derive(Clone)]
pub(super) struct EventBroadcaster<T: Clone> {
/// The broadcaster core object.
inner: BroadcasterInner<T, ()>,
@ -151,8 +168,8 @@ impl<T: Clone> EventBroadcaster<T> {
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>, id: LineId) {
self.inner.add(sender, id);
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>) -> LineId {
self.inner.add(sender)
}
/// Removes the first sender with the specified identifier, if any.
@ -212,8 +229,8 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>, id: LineId) {
self.inner.add(sender, id);
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId {
self.inner.add(sender)
}
/// Removes the first sender with the specified identifier, if any.
@ -272,6 +289,14 @@ impl<T: Clone, R> Default for QueryBroadcaster<T, R> {
}
}
impl<T: Clone, R> Clone for QueryBroadcaster<T, R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
/// Data related to a sender future.
struct FutureEnv<R> {
/// Cached storage for the future.
@ -280,6 +305,15 @@ struct FutureEnv<R> {
output: Option<R>,
}
impl<R> Default for FutureEnv<R> {
fn default() -> Self {
Self {
storage: None,
output: None,
}
}
}
/// A type-erased `Send` future wrapped in a `RecycleBox`.
type RecycleBoxFuture<'a, R> = RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + 'a>;
@ -299,6 +333,23 @@ struct Shared<R> {
storage: Option<Vec<Pin<RecycleBoxFuture<'static, R>>>>,
}
impl<R> Clone for Shared<R> {
fn clone(&self) -> Self {
let wake_sink = WakeSink::new();
let wake_src = wake_sink.source();
let mut futures_env = Vec::new();
futures_env.resize_with(self.futures_env.len(), Default::default);
Self {
wake_sink,
task_set: TaskSet::with_len(wake_src, self.task_set.len()),
futures_env,
storage: None,
}
}
}
/// A future aggregating the outputs of a collection of sender futures.
///
/// The idea is to join all sender futures as efficiently as possible, meaning:
@ -537,12 +588,12 @@ mod tests {
let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address));
broadcaster.add(sender, LineId(id as u64));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
@ -585,12 +636,12 @@ mod tests {
let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
broadcaster.add(sender, LineId(id as u64));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
@ -664,6 +715,12 @@ mod tests {
}
}
impl<R> Clone for TestEvent<R> {
fn clone(&self) -> Self {
unreachable!()
}
}
// An object that can wake a `TestEvent`.
#[derive(Clone)]
struct TestEventWaker<R> {
@ -705,9 +762,9 @@ mod tests {
let (test_event3, waker3) = test_event::<usize>();
let mut broadcaster = QueryBroadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2));
broadcaster.add(Box::new(test_event3), LineId(3));
broadcaster.add(Box::new(test_event1));
broadcaster.add(Box::new(test_event2));
broadcaster.add(Box::new(test_event3));
let mut fut = Box::pin(broadcaster.broadcast(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));
@ -777,8 +834,8 @@ mod tests {
let (test_event2, waker2) = test_event::<usize>();
let mut broadcaster = QueryBroadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2));
broadcaster.add(Box::new(test_event1));
broadcaster.add(Box::new(test_event2));
let mut fut = Box::pin(broadcaster.broadcast(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));

View File

@ -6,6 +6,7 @@ use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::task::{Context, Poll};
use dyn_clone::DynClone;
use recycle_box::{coerce_box, RecycleBox};
use crate::channel;
@ -14,11 +15,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
/// An event or query sender abstracting over the target model and input or
/// replier method.
pub(super) trait Sender<T, R>: Send {
pub(super) trait Sender<T, R>: DynClone + Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>;
}
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
/// An object that can send events to an input port.
pub(super) struct InputSender<M: 'static, F, T, S>
where
@ -72,6 +75,24 @@ where
}
}
impl<M: Send, F, T, S> Clone for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
S: Send + 'static,
{
fn clone(&self) -> Self {
Self {
func: self.func.clone(),
sender: self.sender.clone(),
fut_storage: None,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
/// An object that can send a request to a replier port and retrieve a response.
pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
func: F,
@ -140,6 +161,26 @@ where
}
}
impl<M, F, T, R, S> Clone for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Send + 'static,
R: Send + 'static,
S: Send,
{
fn clone(&self) -> Self {
Self {
func: self.func.clone(),
sender: self.sender.clone(),
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
/// An object that can send a payload to an event sink.
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>> {
writer: W,
@ -157,9 +198,10 @@ impl<T: Send + 'static, W: EventSinkWriter<T>> EventSinkSender<T, W> {
}
}
impl<T, W: EventSinkWriter<T>> Sender<T, ()> for EventSinkSender<T, W>
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
@ -172,6 +214,20 @@ where
}
}
impl<T, W> Clone for EventSinkSender<T, W>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// Error returned when the mailbox was closed or dropped.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) struct SendError {}

View File

@ -14,7 +14,7 @@ pub trait EventSink<T> {
}
/// A writer handle to an event sink.
pub trait EventSinkWriter<T>: Send + Sync + 'static {
pub trait EventSinkWriter<T>: Clone + Send + Sync + 'static {
/// Writes a value to the associated sink.
fn write(&self, event: T);
}

View File

@ -133,6 +133,14 @@ impl<T: Send + 'static> EventSinkWriter<T> for EventBufferWriter<T> {
}
}
impl<T> Clone for EventBufferWriter<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> fmt::Debug for EventBufferWriter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventBufferWriter").finish_non_exhaustive()

View File

@ -10,7 +10,7 @@ struct Inner<T> {
slot: Mutex<Option<T>>,
}
/// An `EventSink` and `EventSinkStream` that only keeps the last event.
/// An [`EventSink`] and [`EventSinkStream`] that only keeps the last event.
///
/// Once the value is read, the iterator will return `None` until a new value is
/// received. If the slot contains a value when a new value is received, the
@ -113,6 +113,14 @@ impl<T: Send + 'static> EventSinkWriter<T> for EventSlotWriter<T> {
}
}
impl<T> Clone for EventSlotWriter<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> fmt::Debug for EventSlotWriter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EventStreamWriter").finish_non_exhaustive()

View File

@ -27,7 +27,6 @@ use super::ReplierFn;
/// simulation monitoring endpoint instantiated during bench assembly.
pub struct EventSource<T: Clone + Send + 'static> {
broadcaster: Arc<Mutex<EventBroadcaster<T>>>,
next_line_id: u64,
}
impl<T: Clone + Send + 'static> EventSource<T> {
@ -48,13 +47,8 @@ impl<T: Clone + Send + 'static> EventSource<T> {
F: for<'a> InputFn<'a, M, T, S> + Clone,
S: Send + 'static,
{
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.lock().unwrap().add(sender, line_id);
line_id
self.broadcaster.lock().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter.
@ -163,7 +157,6 @@ impl<T: Clone + Send + 'static> Default for EventSource<T> {
fn default() -> Self {
Self {
broadcaster: Arc::new(Mutex::new(EventBroadcaster::default())),
next_line_id: 0,
}
}
}
@ -187,7 +180,6 @@ impl<T: Clone + Send + 'static> fmt::Debug for EventSource<T> {
/// instantiated during bench assembly.
pub struct QuerySource<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: Arc<Mutex<QueryBroadcaster<T, R>>>,
next_line_id: u64,
}
impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
@ -208,13 +200,8 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static,
{
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.lock().unwrap().add(sender, line_id);
line_id
self.broadcaster.lock().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter.
@ -259,7 +246,6 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Default for QuerySource<T, R>
fn default() -> Self {
Self {
broadcaster: Arc::new(Mutex::new(QueryBroadcaster::default())),
next_line_id: 0,
}
}
}

View File

@ -24,6 +24,8 @@ use crate::util::task_set::TaskSet;
/// does, but the outputs of all sender futures are returned all at once rather
/// than with an asynchronous iterator (a.k.a. async stream).
pub(super) struct BroadcasterInner<T: Clone, R> {
/// Line identifier for the next port to be connected.
next_line_id: u64,
/// The list of senders with their associated line identifier.
senders: Vec<(LineId, Box<dyn Sender<T, R>>)>,
}
@ -35,8 +37,14 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>, id: LineId) {
self.senders.push((id, sender));
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
self.senders.push((line_id, sender));
line_id
}
/// Removes the first sender with the specified identifier, if any.
@ -89,6 +97,7 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
impl<T: Clone, R> Default for BroadcasterInner<T, R> {
fn default() -> Self {
Self {
next_line_id: 0,
senders: Vec::new(),
}
}
@ -112,8 +121,8 @@ impl<T: Clone + Send> EventBroadcaster<T> {
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>, id: LineId) {
self.inner.add(sender, id);
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>) -> LineId {
self.inner.add(sender)
}
/// Removes the first sender with the specified identifier, if any.
@ -190,8 +199,8 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>, id: LineId) {
self.inner.add(sender, id);
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId {
self.inner.add(sender)
}
/// Removes the first sender with the specified identifier, if any.
@ -462,12 +471,12 @@ mod tests {
let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address));
broadcaster.add(sender, LineId(id as u64));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
@ -510,12 +519,12 @@ mod tests {
let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
broadcaster.add(sender, LineId(id as u64));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
@ -629,9 +638,9 @@ mod tests {
let (test_event3, waker3) = test_event::<usize>();
let mut broadcaster = QueryBroadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2));
broadcaster.add(Box::new(test_event3), LineId(3));
broadcaster.add(Box::new(test_event1));
broadcaster.add(Box::new(test_event2));
broadcaster.add(Box::new(test_event3));
let mut fut = Box::pin(broadcaster.broadcast(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));
@ -701,8 +710,8 @@ mod tests {
let (test_event2, waker2) = test_event::<usize>();
let mut broadcaster = QueryBroadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2));
broadcaster.add(Box::new(test_event1));
broadcaster.add(Box::new(test_event2));
let mut fut = Box::pin(broadcaster.broadcast(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));

View File

@ -1,4 +1,5 @@
pub(crate) mod bit;
pub(crate) mod cached_rw_lock;
pub(crate) mod indexed_priority_queue;
pub(crate) mod priority_queue;
pub(crate) mod rng;

View File

@ -0,0 +1,111 @@
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, LockResult, Mutex, MutexGuard, PoisonError};
/// A cached read-write lock.
///
/// This read-write lock maintains a local cache in each clone for read
/// access. Regular writes are always synchronized and performed on the shared
/// data. Regular reads are synchronized only when the shared data has been
/// modified since the local cache was last synchronized. The local cache can
/// alternatively be used as a scratchpad without invalidating the shared data,
/// in which case all changes to the scratchpad will be lost on the next
/// synchronization.
#[derive(Clone)]
pub(crate) struct CachedRwLock<T: Clone> {
local: T,
local_epoch: usize,
shared: Arc<Mutex<T>>,
epoch: Arc<AtomicUsize>,
}
impl<T: Clone> CachedRwLock<T> {
/// Creates a new cached read-write lock in an ulocked state.
pub(crate) fn new(t: T) -> Self {
let shared = t.clone();
Self {
local: t,
local_epoch: 0,
shared: Arc::new(Mutex::new(shared)),
epoch: Arc::new(AtomicUsize::new(0)),
}
}
/// Gives access to the local cache without synchronization.
pub(crate) fn read_unsync(&self) -> &T {
&self.local
}
/// Synchronizes the local cache if it is behind the shared data and gives
/// access to it.
#[allow(dead_code)]
pub(crate) fn read(&mut self) -> LockResult<&T> {
if self.epoch.load(Ordering::Relaxed) != self.local_epoch {
match self.shared.lock() {
LockResult::Ok(shared) => {
self.local = shared.clone();
self.local_epoch = self.epoch.load(Ordering::Relaxed)
}
LockResult::Err(_) => return LockResult::Err(PoisonError::new(&self.local)),
}
}
LockResult::Ok(&self.local)
}
/// Gives write access to the local cache without synchronization so it can
/// be used as a scratchpad.
#[allow(dead_code)]
pub(crate) fn write_scratchpad_unsync(&mut self) -> &mut T {
&mut self.local
}
/// Synchronizes the local cache if it is behind the shared data and gives
/// write access to it so it can be used as a scratchpad.
pub(crate) fn write_scratchpad(&mut self) -> LockResult<&mut T> {
if self.epoch.load(Ordering::Relaxed) != self.local_epoch {
match self.shared.lock() {
LockResult::Ok(shared) => {
self.local = shared.clone();
self.local_epoch = self.epoch.load(Ordering::Relaxed)
}
LockResult::Err(_) => return LockResult::Err(PoisonError::new(&mut self.local)),
}
}
LockResult::Ok(&mut self.local)
}
/// Acquires a write lock on the shared data.
pub(crate) fn write(&mut self) -> LockResult<CachedRwLockWriteGuard<'_, T>> {
let guard = self.shared.lock();
let epoch = self.epoch.load(Ordering::Relaxed) + 1;
self.epoch.store(epoch, Ordering::Relaxed);
match guard {
LockResult::Ok(shared) => LockResult::Ok(CachedRwLockWriteGuard { guard: shared }),
LockResult::Err(poison) => LockResult::Err(PoisonError::new(CachedRwLockWriteGuard {
guard: poison.into_inner(),
})),
}
}
}
/// Write guard.
///
/// The lock is released when the guard is dropped.
pub(crate) struct CachedRwLockWriteGuard<'a, T: Clone> {
guard: MutexGuard<'a, T>,
}
impl<T: Clone> Deref for CachedRwLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
&self.guard
}
}
impl<T: Clone> DerefMut for CachedRwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.guard
}
}

View File

@ -271,6 +271,10 @@ impl TaskSet {
waker_ref(&self.tasks[idx])
}
pub(crate) fn len(&self) -> usize {
self.task_count
}
}
/// Internals shared between a `TaskSet` and its associated `Task`s.