1
0
forked from ROMEO/nexosim

Simplify output broadcaster implementation

This commit is contained in:
Serge Barral
2024-08-05 09:56:13 +02:00
parent 7f244d2334
commit 2270a94b8d

View File

@ -4,9 +4,8 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use diatomic_waker::WakeSink; use diatomic_waker::WakeSink;
use recycle_box::{coerce_box, RecycleBox};
use super::sender::{SendError, Sender}; use super::sender::{RecycledFuture, SendError, Sender};
use super::LineId; use super::LineId;
use crate::util::task_set::TaskSet; use crate::util::task_set::TaskSet;
@ -20,8 +19,8 @@ use crate::util::task_set::TaskSet;
/// This is somewhat similar to what `FuturesOrdered` in the `futures` crate /// This is somewhat similar to what `FuturesOrdered` in the `futures` crate
/// does, but with some key differences: /// does, but with some key differences:
/// ///
/// - tasks and future storage are reusable to avoid repeated allocation, so /// - tasks, output storage and future storage are reusable to avoid repeated
/// allocation occurs only after a new sender is added, /// allocation, so allocation occurs only after a new sender is added,
/// - the outputs of all sender futures are returned all at once rather than /// - the outputs of all sender futures are returned all at once rather than
/// with an asynchronous iterator (a.k.a. async stream). /// with an asynchronous iterator (a.k.a. async stream).
pub(super) struct BroadcasterInner<T: Clone, R> { pub(super) struct BroadcasterInner<T: Clone, R> {
@ -46,11 +45,15 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
self.next_line_id += 1; self.next_line_id += 1;
self.senders.push((line_id, sender)); self.senders.push((line_id, sender));
self.shared.outputs.push(None);
self.shared.futures_env.push(FutureEnv::default());
self.shared.task_set.resize(self.senders.len()); self.shared.task_set.resize(self.senders.len());
// The storage is alway an empty vector so we just book some capacity.
self.shared
.storage
.as_mut()
.map(|s| s.try_reserve(self.senders.len()).unwrap());
line_id line_id
} }
@ -61,7 +64,7 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
pub(super) fn remove(&mut self, id: LineId) -> bool { pub(super) fn remove(&mut self, id: LineId) -> bool {
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
self.senders.swap_remove(pos); self.senders.swap_remove(pos);
self.shared.futures_env.swap_remove(pos); self.shared.outputs.truncate(self.senders.len());
self.shared.task_set.resize(self.senders.len()); self.shared.task_set.resize(self.senders.len());
return true; return true;
@ -73,7 +76,7 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
/// Removes all senders. /// Removes all senders.
pub(super) fn clear(&mut self) { pub(super) fn clear(&mut self) {
self.senders.clear(); self.senders.clear();
self.shared.futures_env.clear(); self.shared.outputs.clear();
self.shared.task_set.resize(0); self.shared.task_set.resize(0);
} }
@ -89,30 +92,15 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
// Broadcast the message and collect all futures. // Broadcast the message and collect all futures.
let mut iter = self let mut iter = self.senders.iter_mut();
.senders while let Some(sender) = iter.next() {
.iter_mut()
.zip(self.shared.futures_env.iter_mut());
while let Some((sender, futures_env)) = iter.next() {
let future_cache = futures_env
.storage
.take()
.unwrap_or_else(|| RecycleBox::new(()));
// Move the argument rather than clone it for the last future. // Move the argument rather than clone it for the last future.
if iter.len() == 0 { if iter.len() == 0 {
let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> = futures.push(sender.1.send(arg));
coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg)));
futures.push(RecycleBox::into_pin(future));
break; break;
} }
let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> = coerce_box!( futures.push(sender.1.send(arg.clone()));
RecycleBox::recycle(future_cache, sender.1.send(arg.clone()))
);
futures.push(RecycleBox::into_pin(future));
} }
// Generate the global future. // Generate the global future.
@ -132,7 +120,7 @@ impl<T: Clone, R> Default for BroadcasterInner<T, R> {
shared: Shared { shared: Shared {
wake_sink, wake_sink,
task_set: TaskSet::new(wake_src), task_set: TaskSet::new(wake_src),
futures_env: Vec::new(), outputs: Vec::new(),
storage: None, storage: None,
}, },
} }
@ -262,7 +250,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
// One sender. // One sender.
[sender] => { [sender] => {
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
self.inner.shared.futures_env[0].output = Some(output); self.inner.shared.outputs[0] = Some(output);
} }
// Multiple senders. // Multiple senders.
_ => self.inner.broadcast(arg).await?, _ => self.inner.broadcast(arg).await?,
@ -273,9 +261,9 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
let outputs = self let outputs = self
.inner .inner
.shared .shared
.futures_env .outputs
.iter_mut() .iter_mut()
.map(|t| t.output.take().unwrap()); .map(|t| t.take().unwrap());
Ok(outputs) Ok(outputs)
} }
@ -297,40 +285,20 @@ impl<T: Clone, R> Clone for QueryBroadcaster<T, R> {
} }
} }
/// Data related to a sender future.
struct FutureEnv<R> {
/// Cached storage for the future.
storage: Option<RecycleBox<()>>,
/// Output of the associated future.
output: Option<R>,
}
impl<R> Default for FutureEnv<R> {
fn default() -> Self {
Self {
storage: None,
output: None,
}
}
}
/// A type-erased `Send` future wrapped in a `RecycleBox`.
type RecycleBoxFuture<'a, R> = RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + 'a>;
/// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`. /// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`.
struct Shared<R> { struct Shared<R> {
/// Thread-safe waker handle. /// Thread-safe waker handle.
wake_sink: WakeSink, wake_sink: WakeSink,
/// Tasks associated to the sender futures. /// Tasks associated to the sender futures.
task_set: TaskSet, task_set: TaskSet,
/// Data related to the sender futures. /// Outputs of the sender futures.
futures_env: Vec<FutureEnv<R>>, outputs: Vec<Option<R>>,
/// Cached storage for the sender futures. /// Cached storage for the sender futures.
/// ///
/// When it exists, the cached storage is always an empty vector but it /// When it exists, the cached storage is always an empty vector but it
/// typically has a non-zero capacity. Its purpose is to reuse the /// typically has a non-zero capacity. Its purpose is to reuse the
/// previously allocated capacity when creating new sender futures. /// previously allocated capacity when creating new sender futures.
storage: Option<Vec<Pin<RecycleBoxFuture<'static, R>>>>, storage: Option<Vec<Pin<RecycledFuture<'static, R>>>>,
} }
impl<R> Clone for Shared<R> { impl<R> Clone for Shared<R> {
@ -338,13 +306,13 @@ impl<R> Clone for Shared<R> {
let wake_sink = WakeSink::new(); let wake_sink = WakeSink::new();
let wake_src = wake_sink.source(); let wake_src = wake_sink.source();
let mut futures_env = Vec::new(); let mut outputs = Vec::new();
futures_env.resize_with(self.futures_env.len(), Default::default); outputs.resize_with(self.outputs.len(), Default::default);
Self { Self {
wake_sink, wake_sink,
task_set: TaskSet::with_len(wake_src, self.task_set.len()), task_set: TaskSet::with_len(wake_src, self.task_set.len()),
futures_env, outputs,
storage: None, storage: None,
} }
} }
@ -363,7 +331,7 @@ pub(super) struct BroadcastFuture<'a, R> {
/// Reference to the shared fields of the `Broadcast` object. /// Reference to the shared fields of the `Broadcast` object.
shared: &'a mut Shared<R>, shared: &'a mut Shared<R>,
/// List of all send futures. /// List of all send futures.
futures: ManuallyDrop<Vec<Pin<RecycleBoxFuture<'a, R>>>>, futures: ManuallyDrop<Vec<RecycledFuture<'a, Result<R, SendError>>>>,
/// The total count of futures that have not yet been polled to completion. /// The total count of futures that have not yet been polled to completion.
pending_futures_count: usize, pending_futures_count: usize,
/// State of completion of the future. /// State of completion of the future.
@ -372,14 +340,17 @@ pub(super) struct BroadcastFuture<'a, R> {
impl<'a, R> BroadcastFuture<'a, R> { impl<'a, R> BroadcastFuture<'a, R> {
/// Creates a new `BroadcastFuture`. /// Creates a new `BroadcastFuture`.
fn new(shared: &'a mut Shared<R>, futures: Vec<Pin<RecycleBoxFuture<'a, R>>>) -> Self { fn new(
shared: &'a mut Shared<R>,
futures: Vec<RecycledFuture<'a, Result<R, SendError>>>,
) -> Self {
let pending_futures_count = futures.len(); let pending_futures_count = futures.len();
assert!(shared.futures_env.len() == pending_futures_count); assert!(shared.outputs.len() == pending_futures_count);
for futures_env in shared.futures_env.iter_mut() { for output in shared.outputs.iter_mut() {
// Drop the previous output if necessary. // Drop the previous output if necessary.
futures_env.output.take(); output.take();
} }
BroadcastFuture { BroadcastFuture {
@ -395,12 +366,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> {
fn drop(&mut self) { fn drop(&mut self) {
// Safety: this is safe since `self.futures` is never accessed after it // Safety: this is safe since `self.futures` is never accessed after it
// is moved out. // is moved out.
let mut futures = unsafe { ManuallyDrop::take(&mut self.futures) }; let futures = unsafe { ManuallyDrop::take(&mut self.futures) };
// Recycle the future-containing boxes.
for (future, futures_env) in futures.drain(..).zip(self.shared.futures_env.iter_mut()) {
futures_env.storage = Some(RecycleBox::vacate_pinned(future));
}
// Recycle the vector that contained the futures. // Recycle the vector that contained the futures.
self.shared.storage = Some(recycle_vec(futures)); self.shared.storage = Some(recycle_vec(futures));
@ -425,14 +391,14 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
this.shared.task_set.discard_scheduled(); this.shared.task_set.discard_scheduled();
for task_idx in 0..this.futures.len() { for task_idx in 0..this.futures.len() {
let future_env = &mut this.shared.futures_env[task_idx]; let output = &mut this.shared.outputs[task_idx];
let future = &mut this.futures[task_idx]; let future = std::pin::Pin::new(&mut this.futures[task_idx]);
let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_waker_ref = this.shared.task_set.waker_of(task_idx);
let task_cx_ref = &mut Context::from_waker(&task_waker_ref); let task_cx_ref = &mut Context::from_waker(&task_waker_ref);
match future.as_mut().poll(task_cx_ref) { match future.poll(task_cx_ref) {
Poll::Ready(Ok(output)) => { Poll::Ready(Ok(o)) => {
future_env.output = Some(output); *output = Some(o);
this.pending_futures_count -= 1; this.pending_futures_count -= 1;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
@ -477,20 +443,20 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
}; };
for task_idx in scheduled_tasks { for task_idx in scheduled_tasks {
let future_env = &mut this.shared.futures_env[task_idx]; let output = &mut this.shared.outputs[task_idx];
// Do not poll completed futures. // Do not poll completed futures.
if future_env.output.is_some() { if output.is_some() {
continue; continue;
} }
let future = &mut this.futures[task_idx]; let future = std::pin::Pin::new(&mut this.futures[task_idx]);
let task_waker_ref = this.shared.task_set.waker_of(task_idx); let task_waker_ref = this.shared.task_set.waker_of(task_idx);
let task_cx_ref = &mut Context::from_waker(&task_waker_ref); let task_cx_ref = &mut Context::from_waker(&task_waker_ref);
match future.as_mut().poll(task_cx_ref) { match future.poll(task_cx_ref) {
Poll::Ready(Ok(output)) => { Poll::Ready(Ok(o)) => {
future_env.output = Some(output); *output = Some(o);
this.pending_futures_count -= 1; this.pending_futures_count -= 1;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
@ -703,6 +669,7 @@ mod tests {
use loom::sync::atomic::{AtomicBool, Ordering}; use loom::sync::atomic::{AtomicBool, Ordering};
use loom::thread; use loom::thread;
use recycle_box::RecycleBox;
use waker_fn::waker_fn; use waker_fn::waker_fn;
use super::super::sender::RecycledFuture; use super::super::sender::RecycledFuture;