From 2270a94b8d0063869381e6321079107a28afe736 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 5 Aug 2024 09:56:13 +0200 Subject: [PATCH] Simplify output broadcaster implementation --- asynchronix/src/ports/output/broadcaster.rs | 127 ++++++++------------ 1 file changed, 47 insertions(+), 80 deletions(-) diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 0be241a..d8a8afe 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -4,9 +4,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use diatomic_waker::WakeSink; -use recycle_box::{coerce_box, RecycleBox}; -use super::sender::{SendError, Sender}; +use super::sender::{RecycledFuture, SendError, Sender}; use super::LineId; use crate::util::task_set::TaskSet; @@ -20,8 +19,8 @@ use crate::util::task_set::TaskSet; /// This is somewhat similar to what `FuturesOrdered` in the `futures` crate /// does, but with some key differences: /// -/// - tasks and future storage are reusable to avoid repeated allocation, so -/// allocation occurs only after a new sender is added, +/// - tasks, output storage and future storage are reusable to avoid repeated +/// allocation, so allocation occurs only after a new sender is added, /// - the outputs of all sender futures are returned all at once rather than /// with an asynchronous iterator (a.k.a. async stream). pub(super) struct BroadcasterInner { @@ -46,11 +45,15 @@ impl BroadcasterInner { self.next_line_id += 1; self.senders.push((line_id, sender)); - - self.shared.futures_env.push(FutureEnv::default()); - + self.shared.outputs.push(None); self.shared.task_set.resize(self.senders.len()); + // The storage is alway an empty vector so we just book some capacity. + self.shared + .storage + .as_mut() + .map(|s| s.try_reserve(self.senders.len()).unwrap()); + line_id } @@ -61,7 +64,7 @@ impl BroadcasterInner { pub(super) fn remove(&mut self, id: LineId) -> bool { if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { self.senders.swap_remove(pos); - self.shared.futures_env.swap_remove(pos); + self.shared.outputs.truncate(self.senders.len()); self.shared.task_set.resize(self.senders.len()); return true; @@ -73,7 +76,7 @@ impl BroadcasterInner { /// Removes all senders. pub(super) fn clear(&mut self) { self.senders.clear(); - self.shared.futures_env.clear(); + self.shared.outputs.clear(); self.shared.task_set.resize(0); } @@ -89,30 +92,15 @@ impl BroadcasterInner { let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. - let mut iter = self - .senders - .iter_mut() - .zip(self.shared.futures_env.iter_mut()); - while let Some((sender, futures_env)) = iter.next() { - let future_cache = futures_env - .storage - .take() - .unwrap_or_else(|| RecycleBox::new(())); - + let mut iter = self.senders.iter_mut(); + while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - let future: RecycleBox> + Send + '_> = - coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg))); - - futures.push(RecycleBox::into_pin(future)); + futures.push(sender.1.send(arg)); break; } - let future: RecycleBox> + Send + '_> = coerce_box!( - RecycleBox::recycle(future_cache, sender.1.send(arg.clone())) - ); - - futures.push(RecycleBox::into_pin(future)); + futures.push(sender.1.send(arg.clone())); } // Generate the global future. @@ -132,7 +120,7 @@ impl Default for BroadcasterInner { shared: Shared { wake_sink, task_set: TaskSet::new(wake_src), - futures_env: Vec::new(), + outputs: Vec::new(), storage: None, }, } @@ -262,7 +250,7 @@ impl QueryBroadcaster { // One sender. [sender] => { let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.inner.shared.futures_env[0].output = Some(output); + self.inner.shared.outputs[0] = Some(output); } // Multiple senders. _ => self.inner.broadcast(arg).await?, @@ -273,9 +261,9 @@ impl QueryBroadcaster { let outputs = self .inner .shared - .futures_env + .outputs .iter_mut() - .map(|t| t.output.take().unwrap()); + .map(|t| t.take().unwrap()); Ok(outputs) } @@ -297,40 +285,20 @@ impl Clone for QueryBroadcaster { } } -/// Data related to a sender future. -struct FutureEnv { - /// Cached storage for the future. - storage: Option>, - /// Output of the associated future. - output: Option, -} - -impl Default for FutureEnv { - fn default() -> Self { - Self { - storage: None, - output: None, - } - } -} - -/// A type-erased `Send` future wrapped in a `RecycleBox`. -type RecycleBoxFuture<'a, R> = RecycleBox> + Send + 'a>; - /// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`. struct Shared { /// Thread-safe waker handle. wake_sink: WakeSink, /// Tasks associated to the sender futures. task_set: TaskSet, - /// Data related to the sender futures. - futures_env: Vec>, + /// Outputs of the sender futures. + outputs: Vec>, /// Cached storage for the sender futures. /// /// When it exists, the cached storage is always an empty vector but it /// typically has a non-zero capacity. Its purpose is to reuse the /// previously allocated capacity when creating new sender futures. - storage: Option>>>, + storage: Option>>>, } impl Clone for Shared { @@ -338,13 +306,13 @@ impl Clone for Shared { let wake_sink = WakeSink::new(); let wake_src = wake_sink.source(); - let mut futures_env = Vec::new(); - futures_env.resize_with(self.futures_env.len(), Default::default); + let mut outputs = Vec::new(); + outputs.resize_with(self.outputs.len(), Default::default); Self { wake_sink, task_set: TaskSet::with_len(wake_src, self.task_set.len()), - futures_env, + outputs, storage: None, } } @@ -363,7 +331,7 @@ pub(super) struct BroadcastFuture<'a, R> { /// Reference to the shared fields of the `Broadcast` object. shared: &'a mut Shared, /// List of all send futures. - futures: ManuallyDrop>>>, + futures: ManuallyDrop>>>, /// The total count of futures that have not yet been polled to completion. pending_futures_count: usize, /// State of completion of the future. @@ -372,14 +340,17 @@ pub(super) struct BroadcastFuture<'a, R> { impl<'a, R> BroadcastFuture<'a, R> { /// Creates a new `BroadcastFuture`. - fn new(shared: &'a mut Shared, futures: Vec>>) -> Self { + fn new( + shared: &'a mut Shared, + futures: Vec>>, + ) -> Self { let pending_futures_count = futures.len(); - assert!(shared.futures_env.len() == pending_futures_count); + assert!(shared.outputs.len() == pending_futures_count); - for futures_env in shared.futures_env.iter_mut() { + for output in shared.outputs.iter_mut() { // Drop the previous output if necessary. - futures_env.output.take(); + output.take(); } BroadcastFuture { @@ -395,12 +366,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> { fn drop(&mut self) { // Safety: this is safe since `self.futures` is never accessed after it // is moved out. - let mut futures = unsafe { ManuallyDrop::take(&mut self.futures) }; - - // Recycle the future-containing boxes. - for (future, futures_env) in futures.drain(..).zip(self.shared.futures_env.iter_mut()) { - futures_env.storage = Some(RecycleBox::vacate_pinned(future)); - } + let futures = unsafe { ManuallyDrop::take(&mut self.futures) }; // Recycle the vector that contained the futures. self.shared.storage = Some(recycle_vec(futures)); @@ -425,14 +391,14 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { this.shared.task_set.discard_scheduled(); for task_idx in 0..this.futures.len() { - let future_env = &mut this.shared.futures_env[task_idx]; - let future = &mut this.futures[task_idx]; + let output = &mut this.shared.outputs[task_idx]; + let future = std::pin::Pin::new(&mut this.futures[task_idx]); let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_cx_ref = &mut Context::from_waker(&task_waker_ref); - match future.as_mut().poll(task_cx_ref) { - Poll::Ready(Ok(output)) => { - future_env.output = Some(output); + match future.poll(task_cx_ref) { + Poll::Ready(Ok(o)) => { + *output = Some(o); this.pending_futures_count -= 1; } Poll::Ready(Err(_)) => { @@ -477,20 +443,20 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { }; for task_idx in scheduled_tasks { - let future_env = &mut this.shared.futures_env[task_idx]; + let output = &mut this.shared.outputs[task_idx]; // Do not poll completed futures. - if future_env.output.is_some() { + if output.is_some() { continue; } - let future = &mut this.futures[task_idx]; + let future = std::pin::Pin::new(&mut this.futures[task_idx]); let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_cx_ref = &mut Context::from_waker(&task_waker_ref); - match future.as_mut().poll(task_cx_ref) { - Poll::Ready(Ok(output)) => { - future_env.output = Some(output); + match future.poll(task_cx_ref) { + Poll::Ready(Ok(o)) => { + *output = Some(o); this.pending_futures_count -= 1; } Poll::Ready(Err(_)) => { @@ -703,6 +669,7 @@ mod tests { use loom::sync::atomic::{AtomicBool, Ordering}; use loom::thread; + use recycle_box::RecycleBox; use waker_fn::waker_fn; use super::super::sender::RecycledFuture;