forked from ROMEO/nexosim
Optimize filtered connections from outputs
This commit is contained in:
parent
2270a94b8d
commit
b5187ded44
@ -10,6 +10,7 @@ use crate::simulation::Address;
|
|||||||
use crate::util::cached_rw_lock::CachedRwLock;
|
use crate::util::cached_rw_lock::CachedRwLock;
|
||||||
|
|
||||||
use broadcaster::{EventBroadcaster, QueryBroadcaster};
|
use broadcaster::{EventBroadcaster, QueryBroadcaster};
|
||||||
|
use sender::FilterMapReplierSender;
|
||||||
|
|
||||||
use self::sender::{
|
use self::sender::{
|
||||||
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
|
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
|
||||||
@ -262,6 +263,41 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
|
|||||||
self.broadcaster.write().unwrap().add(sender)
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting, filtered connection to a replier port of the
|
||||||
|
/// model specified by the address.
|
||||||
|
///
|
||||||
|
/// Queries and replies are mapped to other types using the closures
|
||||||
|
/// provided in argument, or ignored if the query closure returns `None`.
|
||||||
|
///
|
||||||
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
|
/// returning a value of the type returned by the reply mapping closure and
|
||||||
|
/// taking as argument a value of the type returned by the query mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
|
||||||
|
&mut self,
|
||||||
|
query_filer_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
replier: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(FilterMapReplierSender::new(
|
||||||
|
query_filer_map,
|
||||||
|
reply_map,
|
||||||
|
replier,
|
||||||
|
address.into().0,
|
||||||
|
));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes the connection specified by the `LineId` parameter.
|
/// Removes the connection specified by the `LineId` parameter.
|
||||||
///
|
///
|
||||||
/// It is a logic error to specify a line identifier from another
|
/// It is a logic error to specify a line identifier from another
|
||||||
|
@ -46,13 +46,11 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
|
|
||||||
self.senders.push((line_id, sender));
|
self.senders.push((line_id, sender));
|
||||||
self.shared.outputs.push(None);
|
self.shared.outputs.push(None);
|
||||||
self.shared.task_set.resize(self.senders.len());
|
|
||||||
|
|
||||||
// The storage is alway an empty vector so we just book some capacity.
|
// The storage is alway an empty vector so we just book some capacity.
|
||||||
self.shared
|
self.shared.storage.as_mut().map(|s| {
|
||||||
.storage
|
let _ = s.try_reserve(self.senders.len());
|
||||||
.as_mut()
|
});
|
||||||
.map(|s| s.try_reserve(self.senders.len()).unwrap());
|
|
||||||
|
|
||||||
line_id
|
line_id
|
||||||
}
|
}
|
||||||
@ -65,7 +63,6 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
|
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
|
||||||
self.senders.swap_remove(pos);
|
self.senders.swap_remove(pos);
|
||||||
self.shared.outputs.truncate(self.senders.len());
|
self.shared.outputs.truncate(self.senders.len());
|
||||||
self.shared.task_set.resize(self.senders.len());
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -77,7 +74,6 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
pub(super) fn clear(&mut self) {
|
pub(super) fn clear(&mut self) {
|
||||||
self.senders.clear();
|
self.senders.clear();
|
||||||
self.shared.outputs.clear();
|
self.shared.outputs.clear();
|
||||||
self.shared.task_set.resize(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of connected senders.
|
/// Returns the number of connected senders.
|
||||||
@ -85,10 +81,15 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
self.senders.len()
|
self.senders.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Efficiently broadcasts a message or a query to multiple addresses.
|
/// Return a list of futures broadcasting an event or query to multiple
|
||||||
///
|
/// addresses.
|
||||||
/// This method does not collect the responses from queries.
|
fn futures(
|
||||||
fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> {
|
&mut self,
|
||||||
|
arg: T,
|
||||||
|
) -> (
|
||||||
|
&'_ mut Shared<R>,
|
||||||
|
Vec<RecycledFuture<'_, Result<R, SendError>>>,
|
||||||
|
) {
|
||||||
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
|
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
|
||||||
|
|
||||||
// Broadcast the message and collect all futures.
|
// Broadcast the message and collect all futures.
|
||||||
@ -96,15 +97,18 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
while let Some(sender) = iter.next() {
|
while let Some(sender) = iter.next() {
|
||||||
// Move the argument rather than clone it for the last future.
|
// Move the argument rather than clone it for the last future.
|
||||||
if iter.len() == 0 {
|
if iter.len() == 0 {
|
||||||
futures.push(sender.1.send(arg));
|
if let Some(fut) = sender.1.send(arg) {
|
||||||
|
futures.push(fut);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
futures.push(sender.1.send(arg.clone()));
|
if let Some(fut) = sender.1.send(arg.clone()) {
|
||||||
|
futures.push(fut);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate the global future.
|
(&mut self.shared, futures)
|
||||||
BroadcastFuture::new(&mut self.shared, futures)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,10 +187,22 @@ impl<T: Clone> EventBroadcaster<T> {
|
|||||||
match self.inner.senders.as_mut_slice() {
|
match self.inner.senders.as_mut_slice() {
|
||||||
// No sender.
|
// No sender.
|
||||||
[] => Ok(()),
|
[] => Ok(()),
|
||||||
// One sender.
|
|
||||||
[sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}),
|
// One sender at most.
|
||||||
// Multiple senders.
|
[sender] => match sender.1.send(arg) {
|
||||||
_ => self.inner.broadcast(arg).await,
|
None => Ok(()),
|
||||||
|
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
|
||||||
|
},
|
||||||
|
|
||||||
|
// Possibly multiple senders.
|
||||||
|
_ => {
|
||||||
|
let (shared, mut futures) = self.inner.futures(arg);
|
||||||
|
match futures.as_mut_slice() {
|
||||||
|
[] => Ok(()),
|
||||||
|
[fut] => fut.await.map_err(|_| BroadcastError {}),
|
||||||
|
_ => BroadcastFuture::new(shared, futures).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -244,25 +260,49 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
arg: T,
|
arg: T,
|
||||||
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
|
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
|
||||||
match self.inner.senders.as_mut_slice() {
|
let output_count = match self.inner.senders.as_mut_slice() {
|
||||||
// No sender.
|
// No sender.
|
||||||
[] => {}
|
[] => 0,
|
||||||
// One sender.
|
|
||||||
|
// One sender at most.
|
||||||
[sender] => {
|
[sender] => {
|
||||||
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
|
if let Some(fut) = sender.1.send(arg) {
|
||||||
self.inner.shared.outputs[0] = Some(output);
|
let output = fut.await.map_err(|_| BroadcastError {})?;
|
||||||
|
self.inner.shared.outputs[0] = Some(output);
|
||||||
|
|
||||||
|
1
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Possibly multiple senders.
|
||||||
|
_ => {
|
||||||
|
let (shared, mut futures) = self.inner.futures(arg);
|
||||||
|
let output_count = futures.len();
|
||||||
|
|
||||||
|
match futures.as_mut_slice() {
|
||||||
|
[] => {}
|
||||||
|
[fut] => {
|
||||||
|
let output = fut.await.map_err(|_| BroadcastError {})?;
|
||||||
|
shared.outputs[0] = Some(output);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
BroadcastFuture::new(shared, futures).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
output_count
|
||||||
}
|
}
|
||||||
// Multiple senders.
|
|
||||||
_ => self.inner.broadcast(arg).await?,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// At this point all outputs should be available so `unwrap` can be
|
// At this point all outputs should be available.
|
||||||
// called on the output of each future.
|
|
||||||
let outputs = self
|
let outputs = self
|
||||||
.inner
|
.inner
|
||||||
.shared
|
.shared
|
||||||
.outputs
|
.outputs
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
|
.take(output_count)
|
||||||
.map(|t| t.take().unwrap());
|
.map(|t| t.take().unwrap());
|
||||||
|
|
||||||
Ok(outputs)
|
Ok(outputs)
|
||||||
@ -311,7 +351,7 @@ impl<R> Clone for Shared<R> {
|
|||||||
|
|
||||||
Self {
|
Self {
|
||||||
wake_sink,
|
wake_sink,
|
||||||
task_set: TaskSet::with_len(wake_src, self.task_set.len()),
|
task_set: TaskSet::new(wake_src),
|
||||||
outputs,
|
outputs,
|
||||||
storage: None,
|
storage: None,
|
||||||
}
|
}
|
||||||
@ -345,11 +385,11 @@ impl<'a, R> BroadcastFuture<'a, R> {
|
|||||||
futures: Vec<RecycledFuture<'a, Result<R, SendError>>>,
|
futures: Vec<RecycledFuture<'a, Result<R, SendError>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let pending_futures_count = futures.len();
|
let pending_futures_count = futures.len();
|
||||||
|
shared.task_set.resize(pending_futures_count);
|
||||||
|
|
||||||
assert!(shared.outputs.len() == pending_futures_count);
|
for output in shared.outputs.iter_mut().take(pending_futures_count) {
|
||||||
|
// Empty the output slots to be used. This is necessary in case the
|
||||||
for output in shared.outputs.iter_mut() {
|
// previous broadcast future was cancelled.
|
||||||
// Drop the previous output if necessary.
|
|
||||||
output.take();
|
output.take();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,7 +419,11 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
|
|||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = &mut *self;
|
let this = &mut *self;
|
||||||
|
|
||||||
assert_ne!(this.state, FutureState::Completed);
|
assert_ne!(
|
||||||
|
this.state,
|
||||||
|
FutureState::Completed,
|
||||||
|
"broadcast future polled after completion"
|
||||||
|
);
|
||||||
|
|
||||||
// Poll all sender futures once if this is the first time the broadcast
|
// Poll all sender futures once if this is the first time the broadcast
|
||||||
// future is polled.
|
// future is polled.
|
||||||
@ -681,15 +725,15 @@ mod tests {
|
|||||||
fut_storage: Option<RecycleBox<()>>,
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
}
|
}
|
||||||
impl<R: Send> Sender<(), R> for TestEvent<R> {
|
impl<R: Send> Sender<(), R> for TestEvent<R> {
|
||||||
fn send(&mut self, _arg: ()) -> RecycledFuture<'_, Result<R, SendError>> {
|
fn send(&mut self, _arg: ()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let fut_storage = &mut self.fut_storage;
|
let fut_storage = &mut self.fut_storage;
|
||||||
let receiver = &mut self.receiver;
|
let receiver = &mut self.receiver;
|
||||||
|
|
||||||
RecycledFuture::new(fut_storage, async {
|
Some(RecycledFuture::new(fut_storage, async {
|
||||||
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
|
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
|
||||||
|
|
||||||
Ok(stream.next().await.unwrap())
|
Ok(stream.next().await.unwrap())
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::future::{ready, Future};
|
use std::future::Future;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::mem::ManuallyDrop;
|
use std::mem::ManuallyDrop;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
@ -18,7 +18,7 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
|
|||||||
/// replier method.
|
/// replier method.
|
||||||
pub(super) trait Sender<T, R>: DynClone + Send {
|
pub(super) trait Sender<T, R>: DynClone + Send {
|
||||||
/// Asynchronously send the event or request.
|
/// Asynchronously send the event or request.
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>;
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
|
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
|
||||||
@ -57,7 +57,7 @@ where
|
|||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
|
|
||||||
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
||||||
@ -66,9 +66,9 @@ where
|
|||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
fut.await.map_err(|_| SendError {})
|
fut.await.map_err(|_| SendError {})
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,7 +128,7 @@ where
|
|||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let arg = (self.map)(arg);
|
let arg = (self.map)(arg);
|
||||||
|
|
||||||
@ -138,9 +138,9 @@ where
|
|||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
fut.await.map_err(|_| SendError {})
|
fut.await.map_err(|_| SendError {})
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,23 +202,20 @@ where
|
|||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let func = self.func.clone();
|
(self.filter_map)(arg).map(|arg| {
|
||||||
|
let func = self.func.clone();
|
||||||
|
|
||||||
match (self.filter_map)(arg) {
|
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
||||||
Some(arg) => {
|
let fut = func.call(model, arg, scheduler);
|
||||||
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
|
||||||
let fut = func.call(model, arg, scheduler);
|
|
||||||
|
|
||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
fut.await.map_err(|_| SendError {})
|
fut.await.map_err(|_| SendError {})
|
||||||
})
|
})
|
||||||
}
|
})
|
||||||
None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,14 +259,14 @@ where
|
|||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
W: EventSinkWriter<T>,
|
W: EventSinkWriter<T>,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let writer = &mut self.writer;
|
let writer = &mut self.writer;
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
writer.write(arg);
|
writer.write(arg);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -315,15 +312,15 @@ where
|
|||||||
C: Fn(T) -> U + Send + Sync,
|
C: Fn(T) -> U + Send + Sync,
|
||||||
W: EventSinkWriter<U>,
|
W: EventSinkWriter<U>,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let writer = &mut self.writer;
|
let writer = &mut self.writer;
|
||||||
let arg = (self.map)(arg);
|
let arg = (self.map)(arg);
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
writer.write(arg);
|
writer.write(arg);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -374,17 +371,16 @@ where
|
|||||||
C: Fn(T) -> Option<U> + Send + Sync,
|
C: Fn(T) -> Option<U> + Send + Sync,
|
||||||
W: EventSinkWriter<U>,
|
W: EventSinkWriter<U>,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let writer = &mut self.writer;
|
let writer = &mut self.writer;
|
||||||
|
|
||||||
match (self.filter_map)(arg) {
|
(self.filter_map)(arg).map(|arg| {
|
||||||
Some(arg) => RecycledFuture::new(&mut self.fut_storage, async move {
|
RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
writer.write(arg);
|
writer.write(arg);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}),
|
})
|
||||||
None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))),
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,7 +436,7 @@ where
|
|||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = &mut self.sender;
|
let sender = &mut self.sender;
|
||||||
let reply_receiver = &mut self.receiver;
|
let reply_receiver = &mut self.receiver;
|
||||||
@ -459,7 +455,7 @@ where
|
|||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
RecycledFuture::new(fut_storage, async move {
|
Some(RecycledFuture::new(fut_storage, async move {
|
||||||
// Send the message.
|
// Send the message.
|
||||||
send_fut.await.map_err(|_| SendError {})?;
|
send_fut.await.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
@ -467,7 +463,7 @@ where
|
|||||||
// If an error is received, it most likely means the mailbox was
|
// If an error is received, it most likely means the mailbox was
|
||||||
// dropped before the message was processed.
|
// dropped before the message was processed.
|
||||||
reply_receiver.recv().await.map_err(|_| SendError {})
|
reply_receiver.recv().await.map_err(|_| SendError {})
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -538,7 +534,7 @@ where
|
|||||||
Q: Send + 'static,
|
Q: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let arg = (self.query_map)(arg);
|
let arg = (self.query_map)(arg);
|
||||||
let sender = &mut self.sender;
|
let sender = &mut self.sender;
|
||||||
@ -559,7 +555,7 @@ where
|
|||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
RecycledFuture::new(fut_storage, async move {
|
Some(RecycledFuture::new(fut_storage, async move {
|
||||||
// Send the message.
|
// Send the message.
|
||||||
send_fut.await.map_err(|_| SendError {})?;
|
send_fut.await.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
@ -571,7 +567,7 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(|_| SendError {})
|
.map_err(|_| SendError {})
|
||||||
.map(reply_map)
|
.map(reply_map)
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -596,6 +592,120 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An object that can filter and send mapped requests to a replier port and
|
||||||
|
/// retrieve mapped responses.
|
||||||
|
pub(super) struct FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
|
query_filter_map: Arc<C>,
|
||||||
|
reply_map: Arc<D>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
receiver: multishot::Receiver<Q>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_query_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_reply_map: PhantomData<fn(Q) -> R>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
|
pub(super) fn new(
|
||||||
|
query_filter_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
query_filter_map: Arc::new(query_filter_map),
|
||||||
|
reply_map: Arc::new(reply_map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
receiver: multishot::Receiver::new(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync,
|
||||||
|
D: Fn(Q) -> R + Send + Sync,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
|
(self.query_filter_map)(arg).map(|arg| {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let sender = &mut self.sender;
|
||||||
|
let reply_receiver = &mut self.receiver;
|
||||||
|
let fut_storage = &mut self.fut_storage;
|
||||||
|
let reply_map = &*self.reply_map;
|
||||||
|
|
||||||
|
// The previous future generated by this method should have been polled
|
||||||
|
// to completion so a new sender should be readily available.
|
||||||
|
let reply_sender = reply_receiver.sender().unwrap();
|
||||||
|
|
||||||
|
let send_fut = sender.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = async move {
|
||||||
|
let reply = func.call(model, arg, scheduler).await;
|
||||||
|
reply_sender.send(reply);
|
||||||
|
};
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
});
|
||||||
|
|
||||||
|
RecycledFuture::new(fut_storage, async move {
|
||||||
|
// Send the message.
|
||||||
|
send_fut.await.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
|
// Wait until the message is processed and the reply is sent back.
|
||||||
|
// If an error is received, it most likely means the mailbox was
|
||||||
|
// dropped before the message was processed.
|
||||||
|
reply_receiver
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
.map(reply_map)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Clone for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
query_filter_map: self.query_filter_map.clone(),
|
||||||
|
reply_map: self.reply_map.clone(),
|
||||||
|
func: self.func.clone(),
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
receiver: multishot::Receiver::new(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Error returned when the mailbox was closed or dropped.
|
/// Error returned when the mailbox was closed or dropped.
|
||||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||||
pub(super) struct SendError {}
|
pub(super) struct SendError {}
|
||||||
|
@ -71,7 +71,8 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
self.senders.len()
|
self.senders.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a list of futures broadcasting an event or query to multiple addresses.
|
/// Return a list of futures broadcasting an event or query to multiple
|
||||||
|
/// addresses.
|
||||||
fn futures(&mut self, arg: T) -> Vec<SenderFutureState<R>> {
|
fn futures(&mut self, arg: T) -> Vec<SenderFutureState<R>> {
|
||||||
let mut future_states = Vec::new();
|
let mut future_states = Vec::new();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user