From 0734dc2faca9ef4249a3bd539feaf1e159378eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Fri, 26 Apr 2024 11:06:40 +0200 Subject: [PATCH] Implement clonable outputs and add submodels example --- asynchronix/Cargo.toml | 1 + asynchronix/examples/assembly.rs | 153 ++++++++++++++++++++ asynchronix/examples/stepper_motor.rs | 3 +- asynchronix/src/ports.rs | 35 +++++ asynchronix/src/ports/output.rs | 73 +++++----- asynchronix/src/ports/output/broadcaster.rs | 95 +++++++++--- asynchronix/src/ports/output/sender.rs | 60 +++++++- asynchronix/src/ports/sink.rs | 2 +- asynchronix/src/ports/sink/event_buffer.rs | 8 + asynchronix/src/ports/sink/event_slot.rs | 10 +- asynchronix/src/ports/source.rs | 18 +-- asynchronix/src/ports/source/broadcaster.rs | 39 +++-- asynchronix/src/util.rs | 1 + asynchronix/src/util/cached_rw_lock.rs | 111 ++++++++++++++ asynchronix/src/util/task_set.rs | 4 + 15 files changed, 524 insertions(+), 89 deletions(-) create mode 100644 asynchronix/examples/assembly.rs create mode 100644 asynchronix/src/util/cached_rw_lock.rs diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 9533780..079c113 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -39,6 +39,7 @@ dev-logs = [] async-event = "0.1" crossbeam-utils = "0.8" diatomic-waker = "0.1" +dyn-clone = "1.0" futures-channel = "0.3" futures-task = "0.3" multishot = "0.3.2" diff --git a/asynchronix/examples/assembly.rs b/asynchronix/examples/assembly.rs new file mode 100644 index 0000000..e550656 --- /dev/null +++ b/asynchronix/examples/assembly.rs @@ -0,0 +1,153 @@ +//! Example: an assembly consisting of a current-controlled stepper motor and +//! its driver. +//! +//! This example demonstrates in particular: +//! +//! * submodels, +//! * outputs cloning, +//! * self-scheduling methods, +//! * model setup, +//! * model initialization, +//! * simulation monitoring with event streams. +//! +//! ```text +//! ┌──────────────────────────────────────────────┐ +//! │ Assembly │ +//! │ ┌──────────┐ ┌──────────┐ │ +//! PPS │ │ │ coil currents │ │ │position +//! Pulse rate ●───────▶│──▶│ Driver ├───────────────▶│ Motor ├──▶│─────────▶ +//! (±freq)│ │ │ (IA, IB) │ │ │(0:199) +//! │ └──────────┘ └──────────┘ │ +//! └──────────────────────────────────────────────┘ +//! ``` + +use std::time::Duration; + +use asynchronix::model::{Model, SetupContext}; +use asynchronix::ports::{EventBuffer, Output}; +use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::time::MonotonicTime; + +mod stepper_motor; + +pub use stepper_motor::{Driver, Motor}; + +pub struct MotorAssembly { + pub position: Output, + init_pos: u16, + load: Output, + pps: Output, +} + +impl MotorAssembly { + pub fn new(init_pos: u16) -> Self { + Self { + position: Default::default(), + init_pos, + load: Default::default(), + pps: Default::default(), + } + } + + /// Sets the pulse rate (sign = direction) [Hz] -- input port. + pub async fn pulse_rate(&mut self, pps: f64) { + self.pps.send(pps).await; + } + + /// Torque applied by the load [N·m] -- input port. + pub async fn load(&mut self, torque: f64) { + self.load.send(torque).await; + } +} + +impl Model for MotorAssembly { + fn setup(&mut self, setup_context: &SetupContext) { + let mut motor = Motor::new(self.init_pos); + let mut driver = Driver::new(1.0); + + // Mailboxes. + let motor_mbox = Mailbox::new(); + let driver_mbox = Mailbox::new(); + + // Connections. + self.pps.connect(Driver::pulse_rate, &driver_mbox); + self.load.connect(Motor::load, &motor_mbox); + driver.current_out.connect(Motor::current_in, &motor_mbox); + // Note: it is important to clone `position` from the parent to the + // submodel so that all connections made by the user to the parent model + // are preserved. Connections added after cloning are reflected in all + // clones. + motor.position = self.position.clone(); + + setup_context.add_model(driver, driver_mbox); + setup_context.add_model(motor, motor_mbox); + } +} + +fn main() { + // --------------- + // Bench assembly. + // --------------- + + // Models. + let init_pos = 123; + let mut assembly = MotorAssembly::new(init_pos); + + // Mailboxes. + let assembly_mbox = Mailbox::new(); + let assembly_addr = assembly_mbox.address(); + + // Model handles for simulation. + let mut position = EventBuffer::new(); + assembly.position.connect_sink(&position); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let mut simu = SimInit::new().add_model(assembly, assembly_mbox).init(t0); + + // ---------- + // Simulation. + // ---------- + + // Check initial conditions. + let mut t = t0; + assert_eq!(simu.time(), t); + assert_eq!(position.next(), Some(init_pos)); + assert!(position.next().is_none()); + + // Start the motor in 2s with a PPS of 10Hz. + simu.schedule_event( + Duration::from_secs(2), + MotorAssembly::pulse_rate, + 10.0, + &assembly_addr, + ) + .unwrap(); + + // Advance simulation time to two next events. + simu.step(); + t += Duration::new(2, 0); + assert_eq!(simu.time(), t); + simu.step(); + t += Duration::new(0, 100_000_000); + assert_eq!(simu.time(), t); + + // Whichever the starting position, after two phase increments from the + // driver the rotor should have synchronized with the driver, with a + // position given by this beautiful formula. + let mut pos = (((init_pos + 1) / 4) * 4 + 1) % Motor::STEPS_PER_REV; + assert_eq!(position.by_ref().last().unwrap(), pos); + + // Advance simulation time by 0.9s, which with a 10Hz PPS should correspond to + // 9 position increments. + simu.step_by(Duration::new(0, 900_000_000)); + t += Duration::new(0, 900_000_000); + assert_eq!(simu.time(), t); + for _ in 0..9 { + pos = (pos + 1) % Motor::STEPS_PER_REV; + assert_eq!(position.next(), Some(pos)); + } + assert!(position.next().is_none()); +} diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 3d24221..c733af5 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -40,7 +40,7 @@ impl Motor { pub const TORQUE_CONSTANT: f64 = 1.0; /// Creates a motor with the specified initial position. - fn new(position: u16) -> Self { + pub fn new(position: u16) -> Self { Self { position: Default::default(), pos: position % Self::STEPS_PER_REV, @@ -176,6 +176,7 @@ impl Driver { impl Model for Driver {} +#[allow(dead_code)] fn main() { // --------------- // Bench assembly. diff --git a/asynchronix/src/ports.rs b/asynchronix/src/ports.rs index f7ae6d6..c764302 100644 --- a/asynchronix/src/ports.rs +++ b/asynchronix/src/ports.rs @@ -12,6 +12,41 @@ //! contrast, since events are buffered in the mailbox of the target model, //! sending an event is a fire-and-forget operation. For this reason, output //! ports should generally be preferred over requestor ports when possible. +//! +//! `Output` and `Requestor` ports are clonable. Their clones are shallow +//! copies, meaning that any modification of the ports connected to one clone is +//! immediately reflected in other clones. +//! +//! #### Example +//! +//! The outputs in this example are clones of each other and remain therefore +//! always connected to the same inputs. For an example usage of outputs cloning +//! in submodels assemblies, see the [`assembly example`][assembly]. +//! +//! [assembly]: +//! https://github.com/asynchronics/asynchronix/tree/main/asynchronix/examples/assembly.rs +//! +//! ``` +//! use asynchronix::model::Model; +//! use asynchronix::ports::Output; +//! +//! pub struct MyModel { +//! pub output_a: Output, +//! pub output_b: Output, +//! } +//! +//! impl MyModel { +//! pub fn new() -> Self { +//! let output: Output<_> = Default::default(); +//! Self { +//! output_a: output.clone(), +//! output_b: output, +//! } +//! } +//! } +//! +//! impl Model for MyModel {} +//! ``` mod input; mod output; diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index 5f60e8a..d5599fd 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -7,6 +7,7 @@ use crate::model::Model; use crate::ports::{EventSink, LineError, LineId}; use crate::ports::{InputFn, ReplierFn}; use crate::simulation::Address; +use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; @@ -17,9 +18,13 @@ use self::sender::{EventSinkSender, InputSender, ReplierSender}; /// `Output` ports can be connected to input ports, i.e. to asynchronous model /// methods that return no value. They broadcast events to all connected input /// ports. +/// +/// When an `Output` is cloned, the information on connected ports remains +/// shared and therefore all clones use and modify the same list of connected +/// ports. +#[derive(Clone)] pub struct Output { - broadcaster: EventBroadcaster, - next_line_id: u64, + broadcaster: CachedRwLock>, } impl Output { @@ -40,26 +45,16 @@ impl Output { F: for<'a> InputFn<'a, M, T, S> + Clone, S: Send + 'static, { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; let sender = Box::new(InputSender::new(input, address.into().0)); - self.broadcaster.add(sender, line_id); - - line_id + self.broadcaster.write().unwrap().add(sender) } /// Adds a connection to an event sink such as an /// [`EventSlot`](crate::ports::EventSlot) or /// [`EventBuffer`](crate::ports::EventBuffer). pub fn connect_sink>(&mut self, sink: &S) -> LineId { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; let sender = Box::new(EventSinkSender::new(sink.writer())); - self.broadcaster.add(sender, line_id); - - line_id + self.broadcaster.write().unwrap().add(sender) } /// Removes the connection specified by the `LineId` parameter. @@ -69,7 +64,7 @@ impl Output { /// [`QuerySource`](crate::ports::QuerySource) instance and may result in /// the disconnection of an arbitrary endpoint. pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> { - if self.broadcaster.remove(line_id) { + if self.broadcaster.write().unwrap().remove(line_id) { Ok(()) } else { Err(LineError {}) @@ -78,27 +73,31 @@ impl Output { /// Removes all connections. pub fn disconnect_all(&mut self) { - self.broadcaster.clear(); + self.broadcaster.write().unwrap().clear(); } /// Broadcasts an event to all connected input ports. pub async fn send(&mut self, arg: T) { - self.broadcaster.broadcast(arg).await.unwrap(); + let broadcaster = self.broadcaster.write_scratchpad().unwrap(); + broadcaster.broadcast(arg).await.unwrap(); } } impl Default for Output { fn default() -> Self { Self { - broadcaster: EventBroadcaster::default(), - next_line_id: 0, + broadcaster: CachedRwLock::new(EventBroadcaster::default()), } } } impl fmt::Debug for Output { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Output ({} connected ports)", self.broadcaster.len()) + write!( + f, + "Output ({} connected ports)", + self.broadcaster.read_unsync().len() + ) } } @@ -107,9 +106,12 @@ impl fmt::Debug for Output { /// `Requestor` ports can be connected to replier ports, i.e. to asynchronous /// model methods that return a value. They broadcast queries to all connected /// replier ports. +/// +/// When a `Requestor` is cloned, the information on connected ports remains +/// shared and therefore all clones use and modify the same list of connected +/// ports. pub struct Requestor { - broadcaster: QueryBroadcaster, - next_line_id: u64, + broadcaster: CachedRwLock>, } impl Requestor { @@ -130,13 +132,8 @@ impl Requestor { F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, S: Send + 'static, { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; let sender = Box::new(ReplierSender::new(replier, address.into().0)); - self.broadcaster.add(sender, line_id); - - line_id + self.broadcaster.write().unwrap().add(sender) } /// Removes the connection specified by the `LineId` parameter. @@ -146,7 +143,7 @@ impl Requestor { /// [`QuerySource`](crate::ports::QuerySource) instance and may result in /// the disconnection of an arbitrary endpoint. pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> { - if self.broadcaster.remove(line_id) { + if self.broadcaster.write().unwrap().remove(line_id) { Ok(()) } else { Err(LineError {}) @@ -155,26 +152,34 @@ impl Requestor { /// Removes all connections. pub fn disconnect_all(&mut self) { - self.broadcaster.clear(); + self.broadcaster.write().unwrap().clear(); } /// Broadcasts a query to all connected replier ports. pub async fn send(&mut self, arg: T) -> impl Iterator + '_ { - self.broadcaster.broadcast(arg).await.unwrap() + self.broadcaster + .write_scratchpad() + .unwrap() + .broadcast(arg) + .await + .unwrap() } } impl Default for Requestor { fn default() -> Self { Self { - broadcaster: QueryBroadcaster::default(), - next_line_id: 0, + broadcaster: CachedRwLock::new(QueryBroadcaster::default()), } } } impl fmt::Debug for Requestor { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Requestor ({} connected ports)", self.broadcaster.len()) + write!( + f, + "Requestor ({} connected ports)", + self.broadcaster.read_unsync().len() + ) } } diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index f269312..cb39f52 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -25,6 +25,8 @@ use crate::util::task_set::TaskSet; /// - the outputs of all sender futures are returned all at once rather than /// with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { + /// Line identifier for the next port to be connected. + next_line_id: u64, /// The list of senders with their associated line identifier. senders: Vec<(LineId, Box>)>, /// Fields explicitly borrowed by the `BroadcastFuture`. @@ -38,15 +40,18 @@ impl BroadcasterInner { /// /// This method will panic if the total count of senders would reach /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.senders.push((id, sender)); + pub(super) fn add(&mut self, sender: Box>) -> LineId { + assert!(self.next_line_id != u64::MAX); + let line_id = LineId(self.next_line_id); + self.next_line_id += 1; - self.shared.futures_env.push(FutureEnv { - storage: None, - output: None, - }); + self.senders.push((line_id, sender)); + + self.shared.futures_env.push(FutureEnv::default()); self.shared.task_set.resize(self.senders.len()); + + line_id } /// Removes the first sender with the specified identifier, if any. @@ -122,6 +127,7 @@ impl Default for BroadcasterInner { let wake_src = wake_sink.source(); Self { + next_line_id: 0, senders: Vec::new(), shared: Shared { wake_sink, @@ -133,12 +139,23 @@ impl Default for BroadcasterInner { } } +impl Clone for BroadcasterInner { + fn clone(&self) -> Self { + Self { + next_line_id: self.next_line_id, + senders: self.senders.clone(), + shared: self.shared.clone(), + } + } +} + /// An object that can efficiently broadcast events to several input ports. /// /// This is very similar to `source::broadcaster::EventBroadcaster`, but /// generates non-owned futures instead. /// /// See `BroadcasterInner` for implementation details. +#[derive(Clone)] pub(super) struct EventBroadcaster { /// The broadcaster core object. inner: BroadcasterInner, @@ -151,8 +168,8 @@ impl EventBroadcaster { /// /// This method will panic if the total count of senders would reach /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.inner.add(sender, id); + pub(super) fn add(&mut self, sender: Box>) -> LineId { + self.inner.add(sender) } /// Removes the first sender with the specified identifier, if any. @@ -212,8 +229,8 @@ impl QueryBroadcaster { /// /// This method will panic if the total count of senders would reach /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.inner.add(sender, id); + pub(super) fn add(&mut self, sender: Box>) -> LineId { + self.inner.add(sender) } /// Removes the first sender with the specified identifier, if any. @@ -272,6 +289,14 @@ impl Default for QueryBroadcaster { } } +impl Clone for QueryBroadcaster { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + /// Data related to a sender future. struct FutureEnv { /// Cached storage for the future. @@ -280,6 +305,15 @@ struct FutureEnv { output: Option, } +impl Default for FutureEnv { + fn default() -> Self { + Self { + storage: None, + output: None, + } + } +} + /// A type-erased `Send` future wrapped in a `RecycleBox`. type RecycleBoxFuture<'a, R> = RecycleBox> + Send + 'a>; @@ -299,6 +333,23 @@ struct Shared { storage: Option>>>, } +impl Clone for Shared { + fn clone(&self) -> Self { + let wake_sink = WakeSink::new(); + let wake_src = wake_sink.source(); + + let mut futures_env = Vec::new(); + futures_env.resize_with(self.futures_env.len(), Default::default); + + Self { + wake_sink, + task_set: TaskSet::with_len(wake_src, self.task_set.len()), + futures_env, + storage: None, + } + } +} + /// A future aggregating the outputs of a collection of sender futures. /// /// The idea is to join all sender futures as efficiently as possible, meaning: @@ -537,12 +588,12 @@ mod tests { let mut mailboxes = Vec::new(); let mut broadcaster = EventBroadcaster::default(); - for id in 0..N_RECV { + for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); let sender = Box::new(InputSender::new(Counter::inc, address)); - broadcaster.add(sender, LineId(id as u64)); + broadcaster.add(sender); mailboxes.push(mailbox); } @@ -585,12 +636,12 @@ mod tests { let mut mailboxes = Vec::new(); let mut broadcaster = QueryBroadcaster::default(); - for id in 0..N_RECV { + for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); - broadcaster.add(sender, LineId(id as u64)); + broadcaster.add(sender); mailboxes.push(mailbox); } @@ -664,6 +715,12 @@ mod tests { } } + impl Clone for TestEvent { + fn clone(&self) -> Self { + unreachable!() + } + } + // An object that can wake a `TestEvent`. #[derive(Clone)] struct TestEventWaker { @@ -705,9 +762,9 @@ mod tests { let (test_event3, waker3) = test_event::(); let mut broadcaster = QueryBroadcaster::default(); - broadcaster.add(Box::new(test_event1), LineId(1)); - broadcaster.add(Box::new(test_event2), LineId(2)); - broadcaster.add(Box::new(test_event3), LineId(3)); + broadcaster.add(Box::new(test_event1)); + broadcaster.add(Box::new(test_event2)); + broadcaster.add(Box::new(test_event3)); let mut fut = Box::pin(broadcaster.broadcast(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); @@ -777,8 +834,8 @@ mod tests { let (test_event2, waker2) = test_event::(); let mut broadcaster = QueryBroadcaster::default(); - broadcaster.add(Box::new(test_event1), LineId(1)); - broadcaster.add(Box::new(test_event2), LineId(2)); + broadcaster.add(Box::new(test_event1)); + broadcaster.add(Box::new(test_event2)); let mut fut = Box::pin(broadcaster.broadcast(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 88cddfa..1c9ab02 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -6,6 +6,7 @@ use std::mem::ManuallyDrop; use std::pin::Pin; use std::task::{Context, Poll}; +use dyn_clone::DynClone; use recycle_box::{coerce_box, RecycleBox}; use crate::channel; @@ -14,11 +15,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; /// An event or query sender abstracting over the target model and input or /// replier method. -pub(super) trait Sender: Send { +pub(super) trait Sender: DynClone + Send { /// Asynchronously send the event or request. fn send(&mut self, arg: T) -> RecycledFuture<'_, Result>; } +dyn_clone::clone_trait_object!( Sender); + /// An object that can send events to an input port. pub(super) struct InputSender where @@ -72,6 +75,24 @@ where } } +impl Clone for InputSender +where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + 'static, + S: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + /// An object that can send a request to a replier port and retrieve a response. pub(super) struct ReplierSender { func: F, @@ -140,6 +161,26 @@ where } } +impl Clone for ReplierSender +where + M: Model, + F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, + T: Send + 'static, + R: Send + 'static, + S: Send, +{ + fn clone(&self) -> Self { + Self { + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + /// An object that can send a payload to an event sink. pub(super) struct EventSinkSender> { writer: W, @@ -157,9 +198,10 @@ impl> EventSinkSender { } } -impl> Sender for EventSinkSender +impl Sender for EventSinkSender where T: Send + 'static, + W: EventSinkWriter, { fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { let writer = &mut self.writer; @@ -172,6 +214,20 @@ where } } +impl Clone for EventSinkSender +where + T: Send + 'static, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {} diff --git a/asynchronix/src/ports/sink.rs b/asynchronix/src/ports/sink.rs index d22d008..e0fc0d1 100644 --- a/asynchronix/src/ports/sink.rs +++ b/asynchronix/src/ports/sink.rs @@ -14,7 +14,7 @@ pub trait EventSink { } /// A writer handle to an event sink. -pub trait EventSinkWriter: Send + Sync + 'static { +pub trait EventSinkWriter: Clone + Send + Sync + 'static { /// Writes a value to the associated sink. fn write(&self, event: T); } diff --git a/asynchronix/src/ports/sink/event_buffer.rs b/asynchronix/src/ports/sink/event_buffer.rs index 35b25ee..1cc8718 100644 --- a/asynchronix/src/ports/sink/event_buffer.rs +++ b/asynchronix/src/ports/sink/event_buffer.rs @@ -133,6 +133,14 @@ impl EventSinkWriter for EventBufferWriter { } } +impl Clone for EventBufferWriter { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + impl fmt::Debug for EventBufferWriter { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("EventBufferWriter").finish_non_exhaustive() diff --git a/asynchronix/src/ports/sink/event_slot.rs b/asynchronix/src/ports/sink/event_slot.rs index 62a6cef..135242f 100644 --- a/asynchronix/src/ports/sink/event_slot.rs +++ b/asynchronix/src/ports/sink/event_slot.rs @@ -10,7 +10,7 @@ struct Inner { slot: Mutex>, } -/// An `EventSink` and `EventSinkStream` that only keeps the last event. +/// An [`EventSink`] and [`EventSinkStream`] that only keeps the last event. /// /// Once the value is read, the iterator will return `None` until a new value is /// received. If the slot contains a value when a new value is received, the @@ -113,6 +113,14 @@ impl EventSinkWriter for EventSlotWriter { } } +impl Clone for EventSlotWriter { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + impl fmt::Debug for EventSlotWriter { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("EventStreamWriter").finish_non_exhaustive() diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 6850005..7c8ce3e 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -27,7 +27,6 @@ use super::ReplierFn; /// simulation monitoring endpoint instantiated during bench assembly. pub struct EventSource { broadcaster: Arc>>, - next_line_id: u64, } impl EventSource { @@ -48,13 +47,8 @@ impl EventSource { F: for<'a> InputFn<'a, M, T, S> + Clone, S: Send + 'static, { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; let sender = Box::new(InputSender::new(input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender, line_id); - - line_id + self.broadcaster.lock().unwrap().add(sender) } /// Removes the connection specified by the `LineId` parameter. @@ -163,7 +157,6 @@ impl Default for EventSource { fn default() -> Self { Self { broadcaster: Arc::new(Mutex::new(EventBroadcaster::default())), - next_line_id: 0, } } } @@ -187,7 +180,6 @@ impl fmt::Debug for EventSource { /// instantiated during bench assembly. pub struct QuerySource { broadcaster: Arc>>, - next_line_id: u64, } impl QuerySource { @@ -208,13 +200,8 @@ impl QuerySource { F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, S: Send + 'static, { - assert!(self.next_line_id != u64::MAX); - let line_id = LineId(self.next_line_id); - self.next_line_id += 1; let sender = Box::new(ReplierSender::new(replier, address.into().0)); - self.broadcaster.lock().unwrap().add(sender, line_id); - - line_id + self.broadcaster.lock().unwrap().add(sender) } /// Removes the connection specified by the `LineId` parameter. @@ -259,7 +246,6 @@ impl Default for QuerySource fn default() -> Self { Self { broadcaster: Arc::new(Mutex::new(QueryBroadcaster::default())), - next_line_id: 0, } } } diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index d3fb990..cff1d50 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -24,6 +24,8 @@ use crate::util::task_set::TaskSet; /// does, but the outputs of all sender futures are returned all at once rather /// than with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { + /// Line identifier for the next port to be connected. + next_line_id: u64, /// The list of senders with their associated line identifier. senders: Vec<(LineId, Box>)>, } @@ -35,8 +37,14 @@ impl BroadcasterInner { /// /// This method will panic if the total count of senders would reach /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.senders.push((id, sender)); + pub(super) fn add(&mut self, sender: Box>) -> LineId { + assert!(self.next_line_id != u64::MAX); + let line_id = LineId(self.next_line_id); + self.next_line_id += 1; + + self.senders.push((line_id, sender)); + + line_id } /// Removes the first sender with the specified identifier, if any. @@ -89,6 +97,7 @@ impl BroadcasterInner { impl Default for BroadcasterInner { fn default() -> Self { Self { + next_line_id: 0, senders: Vec::new(), } } @@ -112,8 +121,8 @@ impl EventBroadcaster { /// /// This method will panic if the total count of senders would reach /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.inner.add(sender, id); + pub(super) fn add(&mut self, sender: Box>) -> LineId { + self.inner.add(sender) } /// Removes the first sender with the specified identifier, if any. @@ -190,8 +199,8 @@ impl QueryBroadcaster { /// /// This method will panic if the total count of senders would reach /// `u32::MAX - 1`. - pub(super) fn add(&mut self, sender: Box>, id: LineId) { - self.inner.add(sender, id); + pub(super) fn add(&mut self, sender: Box>) -> LineId { + self.inner.add(sender) } /// Removes the first sender with the specified identifier, if any. @@ -462,12 +471,12 @@ mod tests { let mut mailboxes = Vec::new(); let mut broadcaster = EventBroadcaster::default(); - for id in 0..N_RECV { + for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); let sender = Box::new(InputSender::new(Counter::inc, address)); - broadcaster.add(sender, LineId(id as u64)); + broadcaster.add(sender); mailboxes.push(mailbox); } @@ -510,12 +519,12 @@ mod tests { let mut mailboxes = Vec::new(); let mut broadcaster = QueryBroadcaster::default(); - for id in 0..N_RECV { + for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); - broadcaster.add(sender, LineId(id as u64)); + broadcaster.add(sender); mailboxes.push(mailbox); } @@ -629,9 +638,9 @@ mod tests { let (test_event3, waker3) = test_event::(); let mut broadcaster = QueryBroadcaster::default(); - broadcaster.add(Box::new(test_event1), LineId(1)); - broadcaster.add(Box::new(test_event2), LineId(2)); - broadcaster.add(Box::new(test_event3), LineId(3)); + broadcaster.add(Box::new(test_event1)); + broadcaster.add(Box::new(test_event2)); + broadcaster.add(Box::new(test_event3)); let mut fut = Box::pin(broadcaster.broadcast(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); @@ -701,8 +710,8 @@ mod tests { let (test_event2, waker2) = test_event::(); let mut broadcaster = QueryBroadcaster::default(); - broadcaster.add(Box::new(test_event1), LineId(1)); - broadcaster.add(Box::new(test_event2), LineId(2)); + broadcaster.add(Box::new(test_event1)); + broadcaster.add(Box::new(test_event2)); let mut fut = Box::pin(broadcaster.broadcast(())); let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false)); diff --git a/asynchronix/src/util.rs b/asynchronix/src/util.rs index f618e05..631479a 100644 --- a/asynchronix/src/util.rs +++ b/asynchronix/src/util.rs @@ -1,4 +1,5 @@ pub(crate) mod bit; +pub(crate) mod cached_rw_lock; pub(crate) mod indexed_priority_queue; pub(crate) mod priority_queue; pub(crate) mod rng; diff --git a/asynchronix/src/util/cached_rw_lock.rs b/asynchronix/src/util/cached_rw_lock.rs new file mode 100644 index 0000000..eba8bf1 --- /dev/null +++ b/asynchronix/src/util/cached_rw_lock.rs @@ -0,0 +1,111 @@ +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, LockResult, Mutex, MutexGuard, PoisonError}; + +/// A cached read-write lock. +/// +/// This read-write lock maintains a local cache in each clone for read +/// access. Regular writes are always synchronized and performed on the shared +/// data. Regular reads are synchronized only when the shared data has been +/// modified since the local cache was last synchronized. The local cache can +/// alternatively be used as a scratchpad without invalidating the shared data, +/// in which case all changes to the scratchpad will be lost on the next +/// synchronization. +#[derive(Clone)] +pub(crate) struct CachedRwLock { + local: T, + local_epoch: usize, + shared: Arc>, + epoch: Arc, +} + +impl CachedRwLock { + /// Creates a new cached read-write lock in an ulocked state. + pub(crate) fn new(t: T) -> Self { + let shared = t.clone(); + Self { + local: t, + local_epoch: 0, + shared: Arc::new(Mutex::new(shared)), + epoch: Arc::new(AtomicUsize::new(0)), + } + } + + /// Gives access to the local cache without synchronization. + pub(crate) fn read_unsync(&self) -> &T { + &self.local + } + + /// Synchronizes the local cache if it is behind the shared data and gives + /// access to it. + #[allow(dead_code)] + pub(crate) fn read(&mut self) -> LockResult<&T> { + if self.epoch.load(Ordering::Relaxed) != self.local_epoch { + match self.shared.lock() { + LockResult::Ok(shared) => { + self.local = shared.clone(); + self.local_epoch = self.epoch.load(Ordering::Relaxed) + } + LockResult::Err(_) => return LockResult::Err(PoisonError::new(&self.local)), + } + } + LockResult::Ok(&self.local) + } + + /// Gives write access to the local cache without synchronization so it can + /// be used as a scratchpad. + #[allow(dead_code)] + pub(crate) fn write_scratchpad_unsync(&mut self) -> &mut T { + &mut self.local + } + + /// Synchronizes the local cache if it is behind the shared data and gives + /// write access to it so it can be used as a scratchpad. + pub(crate) fn write_scratchpad(&mut self) -> LockResult<&mut T> { + if self.epoch.load(Ordering::Relaxed) != self.local_epoch { + match self.shared.lock() { + LockResult::Ok(shared) => { + self.local = shared.clone(); + self.local_epoch = self.epoch.load(Ordering::Relaxed) + } + LockResult::Err(_) => return LockResult::Err(PoisonError::new(&mut self.local)), + } + } + LockResult::Ok(&mut self.local) + } + + /// Acquires a write lock on the shared data. + pub(crate) fn write(&mut self) -> LockResult> { + let guard = self.shared.lock(); + let epoch = self.epoch.load(Ordering::Relaxed) + 1; + self.epoch.store(epoch, Ordering::Relaxed); + + match guard { + LockResult::Ok(shared) => LockResult::Ok(CachedRwLockWriteGuard { guard: shared }), + LockResult::Err(poison) => LockResult::Err(PoisonError::new(CachedRwLockWriteGuard { + guard: poison.into_inner(), + })), + } + } +} + +/// Write guard. +/// +/// The lock is released when the guard is dropped. +pub(crate) struct CachedRwLockWriteGuard<'a, T: Clone> { + guard: MutexGuard<'a, T>, +} + +impl Deref for CachedRwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + &self.guard + } +} + +impl DerefMut for CachedRwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.guard + } +} diff --git a/asynchronix/src/util/task_set.rs b/asynchronix/src/util/task_set.rs index 90f3e41..e1145e8 100644 --- a/asynchronix/src/util/task_set.rs +++ b/asynchronix/src/util/task_set.rs @@ -271,6 +271,10 @@ impl TaskSet { waker_ref(&self.tasks[idx]) } + + pub(crate) fn len(&self) -> usize { + self.task_count + } } /// Internals shared between a `TaskSet` and its associated `Task`s.