From b5187ded442294e7dcf7651fee019681410545a4 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 7 Aug 2024 10:11:53 +0200 Subject: [PATCH] Optimize filtered connections from outputs --- asynchronix/src/ports/output.rs | 36 ++++ asynchronix/src/ports/output/broadcaster.rs | 118 ++++++++---- asynchronix/src/ports/output/sender.rs | 190 +++++++++++++++----- asynchronix/src/ports/source/broadcaster.rs | 3 +- 4 files changed, 269 insertions(+), 78 deletions(-) diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index 95d982b..f2c04d7 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -10,6 +10,7 @@ use crate::simulation::Address; use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; +use sender::FilterMapReplierSender; use self::sender::{ EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender, @@ -262,6 +263,41 @@ impl Requestor { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting, filtered connection to a replier port of the + /// model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + query_filer_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filer_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index d8a8afe..07027d2 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -46,13 +46,11 @@ impl BroadcasterInner { self.senders.push((line_id, sender)); self.shared.outputs.push(None); - self.shared.task_set.resize(self.senders.len()); // The storage is alway an empty vector so we just book some capacity. - self.shared - .storage - .as_mut() - .map(|s| s.try_reserve(self.senders.len()).unwrap()); + self.shared.storage.as_mut().map(|s| { + let _ = s.try_reserve(self.senders.len()); + }); line_id } @@ -65,7 +63,6 @@ impl BroadcasterInner { if let Some(pos) = self.senders.iter().position(|s| s.0 == id) { self.senders.swap_remove(pos); self.shared.outputs.truncate(self.senders.len()); - self.shared.task_set.resize(self.senders.len()); return true; } @@ -77,7 +74,6 @@ impl BroadcasterInner { pub(super) fn clear(&mut self) { self.senders.clear(); self.shared.outputs.clear(); - self.shared.task_set.resize(0); } /// Returns the number of connected senders. @@ -85,10 +81,15 @@ impl BroadcasterInner { self.senders.len() } - /// Efficiently broadcasts a message or a query to multiple addresses. - /// - /// This method does not collect the responses from queries. - fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> { + /// Return a list of futures broadcasting an event or query to multiple + /// addresses. + fn futures( + &mut self, + arg: T, + ) -> ( + &'_ mut Shared, + Vec>>, + ) { let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default()); // Broadcast the message and collect all futures. @@ -96,15 +97,18 @@ impl BroadcasterInner { while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - futures.push(sender.1.send(arg)); + if let Some(fut) = sender.1.send(arg) { + futures.push(fut); + } break; } - futures.push(sender.1.send(arg.clone())); + if let Some(fut) = sender.1.send(arg.clone()) { + futures.push(fut); + } } - // Generate the global future. - BroadcastFuture::new(&mut self.shared, futures) + (&mut self.shared, futures) } } @@ -183,10 +187,22 @@ impl EventBroadcaster { match self.inner.senders.as_mut_slice() { // No sender. [] => Ok(()), - // One sender. - [sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}), - // Multiple senders. - _ => self.inner.broadcast(arg).await, + + // One sender at most. + [sender] => match sender.1.send(arg) { + None => Ok(()), + Some(fut) => fut.await.map_err(|_| BroadcastError {}), + }, + + // Possibly multiple senders. + _ => { + let (shared, mut futures) = self.inner.futures(arg); + match futures.as_mut_slice() { + [] => Ok(()), + [fut] => fut.await.map_err(|_| BroadcastError {}), + _ => BroadcastFuture::new(shared, futures).await, + } + } } } } @@ -244,25 +260,49 @@ impl QueryBroadcaster { &mut self, arg: T, ) -> Result + '_, BroadcastError> { - match self.inner.senders.as_mut_slice() { + let output_count = match self.inner.senders.as_mut_slice() { // No sender. - [] => {} - // One sender. + [] => 0, + + // One sender at most. [sender] => { - let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?; - self.inner.shared.outputs[0] = Some(output); + if let Some(fut) = sender.1.send(arg) { + let output = fut.await.map_err(|_| BroadcastError {})?; + self.inner.shared.outputs[0] = Some(output); + + 1 + } else { + 0 + } + } + + // Possibly multiple senders. + _ => { + let (shared, mut futures) = self.inner.futures(arg); + let output_count = futures.len(); + + match futures.as_mut_slice() { + [] => {} + [fut] => { + let output = fut.await.map_err(|_| BroadcastError {})?; + shared.outputs[0] = Some(output); + } + _ => { + BroadcastFuture::new(shared, futures).await?; + } + } + + output_count } - // Multiple senders. - _ => self.inner.broadcast(arg).await?, }; - // At this point all outputs should be available so `unwrap` can be - // called on the output of each future. + // At this point all outputs should be available. let outputs = self .inner .shared .outputs .iter_mut() + .take(output_count) .map(|t| t.take().unwrap()); Ok(outputs) @@ -311,7 +351,7 @@ impl Clone for Shared { Self { wake_sink, - task_set: TaskSet::with_len(wake_src, self.task_set.len()), + task_set: TaskSet::new(wake_src), outputs, storage: None, } @@ -345,11 +385,11 @@ impl<'a, R> BroadcastFuture<'a, R> { futures: Vec>>, ) -> Self { let pending_futures_count = futures.len(); + shared.task_set.resize(pending_futures_count); - assert!(shared.outputs.len() == pending_futures_count); - - for output in shared.outputs.iter_mut() { - // Drop the previous output if necessary. + for output in shared.outputs.iter_mut().take(pending_futures_count) { + // Empty the output slots to be used. This is necessary in case the + // previous broadcast future was cancelled. output.take(); } @@ -379,7 +419,11 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - assert_ne!(this.state, FutureState::Completed); + assert_ne!( + this.state, + FutureState::Completed, + "broadcast future polled after completion" + ); // Poll all sender futures once if this is the first time the broadcast // future is polled. @@ -681,15 +725,15 @@ mod tests { fut_storage: Option>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> RecycledFuture<'_, Result> { + fn send(&mut self, _arg: ()) -> Option>> { let fut_storage = &mut self.fut_storage; let receiver = &mut self.receiver; - RecycledFuture::new(fut_storage, async { + Some(RecycledFuture::new(fut_storage, async { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); Ok(stream.next().await.unwrap()) - }) + })) } } diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 5b285cc..9ddb9c7 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::fmt; -use std::future::{ready, Future}; +use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; @@ -18,7 +18,7 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; /// replier method. pub(super) trait Sender: DynClone + Send { /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result>; + fn send(&mut self, arg: T) -> Option>>; } dyn_clone::clone_trait_object!( Sender); @@ -57,7 +57,7 @@ where T: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let fut = self.sender.send(move |model, scheduler, recycle_box| { @@ -66,9 +66,9 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { fut.await.map_err(|_| SendError {}) - }) + })) } } @@ -128,7 +128,7 @@ where U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let arg = (self.map)(arg); @@ -138,9 +138,9 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { fut.await.map_err(|_| SendError {}) - }) + })) } } @@ -202,23 +202,20 @@ where U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let func = self.func.clone(); + fn send(&mut self, arg: T) -> Option>> { + (self.filter_map)(arg).map(|arg| { + let func = self.func.clone(); - match (self.filter_map)(arg) { - Some(arg) => { - let fut = self.sender.send(move |model, scheduler, recycle_box| { - let fut = func.call(model, arg, scheduler); + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); - coerce_box!(RecycleBox::recycle(recycle_box, fut)) - }); + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); - RecycledFuture::new(&mut self.fut_storage, async move { - fut.await.map_err(|_| SendError {}) - }) - } - None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), - } + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + }) } } @@ -262,14 +259,14 @@ where T: Send + 'static, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { writer.write(arg); Ok(()) - }) + })) } } @@ -315,15 +312,15 @@ where C: Fn(T) -> U + Send + Sync, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; let arg = (self.map)(arg); - RecycledFuture::new(&mut self.fut_storage, async move { + Some(RecycledFuture::new(&mut self.fut_storage, async move { writer.write(arg); Ok(()) - }) + })) } } @@ -374,17 +371,16 @@ where C: Fn(T) -> Option + Send + Sync, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + fn send(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; - match (self.filter_map)(arg) { - Some(arg) => RecycledFuture::new(&mut self.fut_storage, async move { + (self.filter_map)(arg).map(|arg| { + RecycledFuture::new(&mut self.fut_storage, async move { writer.write(arg); Ok(()) - }), - None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), - } + }) + }) } } @@ -440,7 +436,7 @@ where R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let sender = &mut self.sender; let reply_receiver = &mut self.receiver; @@ -459,7 +455,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(fut_storage, async move { + Some(RecycledFuture::new(fut_storage, async move { // Send the message. send_fut.await.map_err(|_| SendError {})?; @@ -467,7 +463,7 @@ where // If an error is received, it most likely means the mailbox was // dropped before the message was processed. reply_receiver.recv().await.map_err(|_| SendError {}) - }) + })) } } @@ -538,7 +534,7 @@ where Q: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + fn send(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let arg = (self.query_map)(arg); let sender = &mut self.sender; @@ -559,7 +555,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(fut_storage, async move { + Some(RecycledFuture::new(fut_storage, async move { // Send the message. send_fut.await.map_err(|_| SendError {})?; @@ -571,7 +567,7 @@ where .await .map_err(|_| SendError {}) .map(reply_map) - }) + })) } } @@ -596,6 +592,120 @@ where } } +/// An object that can filter and send mapped requests to a replier port and +/// retrieve mapped responses. +pub(super) struct FilterMapReplierSender +where + M: Model, +{ + query_filter_map: Arc, + reply_map: Arc, + func: F, + sender: channel::Sender, + receiver: multishot::Receiver, + fut_storage: Option>, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapReplierSender +where + M: Model, +{ + pub(super) fn new( + query_filter_map: C, + reply_map: D, + func: F, + sender: channel::Sender, + ) -> Self { + Self { + query_filter_map: Arc::new(query_filter_map), + reply_map: Arc::new(reply_map), + func, + sender, + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapReplierSender +where + M: Model, + C: Fn(T) -> Option + Send + Sync, + D: Fn(Q) -> R + Send + Sync, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option>> { + (self.query_filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = &mut self.sender; + let reply_receiver = &mut self.receiver; + let fut_storage = &mut self.fut_storage; + let reply_map = &*self.reply_map; + + // The previous future generated by this method should have been polled + // to completion so a new sender should be readily available. + let reply_sender = reply_receiver.sender().unwrap(); + + let send_fut = sender.send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(fut_storage, async move { + // Send the message. + send_fut.await.map_err(|_| SendError {})?; + + // Wait until the message is processed and the reply is sent back. + // If an error is received, it most likely means the mailbox was + // dropped before the message was processed. + reply_receiver + .recv() + .await + .map_err(|_| SendError {}) + .map(reply_map) + }) + }) + } +} + +impl Clone for FilterMapReplierSender +where + M: Model, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + query_filter_map: self.query_filter_map.clone(), + reply_map: self.reply_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {} diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 138cc14..bb36054 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -71,7 +71,8 @@ impl BroadcasterInner { self.senders.len() } - /// Return a list of futures broadcasting an event or query to multiple addresses. + /// Return a list of futures broadcasting an event or query to multiple + /// addresses. fn futures(&mut self, arg: T) -> Vec> { let mut future_states = Vec::new();