forked from ROMEO/nexosim
Merge pull request #32 from asynchronics/feature/connect_map
Add map/filter_map variants of the `connect` method
This commit is contained in:
commit
252ada4946
@ -10,8 +10,12 @@ use crate::simulation::Address;
|
|||||||
use crate::util::cached_rw_lock::CachedRwLock;
|
use crate::util::cached_rw_lock::CachedRwLock;
|
||||||
|
|
||||||
use broadcaster::{EventBroadcaster, QueryBroadcaster};
|
use broadcaster::{EventBroadcaster, QueryBroadcaster};
|
||||||
|
use sender::FilterMapReplierSender;
|
||||||
|
|
||||||
use self::sender::{EventSinkSender, InputSender, ReplierSender};
|
use self::sender::{
|
||||||
|
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
|
||||||
|
MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender,
|
||||||
|
};
|
||||||
|
|
||||||
/// An output port.
|
/// An output port.
|
||||||
///
|
///
|
||||||
@ -57,6 +61,94 @@ impl<T: Clone + Send + 'static> Output<T> {
|
|||||||
self.broadcaster.write().unwrap().add(sender)
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting connection to an input port of the model
|
||||||
|
/// specified by the address.
|
||||||
|
///
|
||||||
|
/// Events are mapped to another type using the closure provided in
|
||||||
|
/// argument.
|
||||||
|
///
|
||||||
|
/// The input port must be an asynchronous method of a model of type `M`
|
||||||
|
/// taking as argument a value of the type returned by the mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn map_connect<M, C, F, U, S>(
|
||||||
|
&mut self,
|
||||||
|
map: C,
|
||||||
|
input: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send + Sync + 'static,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting connection to an event sink such as an
|
||||||
|
/// [`EventSlot`](crate::ports::EventSlot) or
|
||||||
|
/// [`EventBuffer`](crate::ports::EventBuffer).
|
||||||
|
///
|
||||||
|
/// Events are mapped to another type using the closure provided in
|
||||||
|
/// argument.
|
||||||
|
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
|
||||||
|
where
|
||||||
|
C: Fn(T) -> U + Send + Sync + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: EventSink<U>,
|
||||||
|
{
|
||||||
|
let sender = Box::new(MapEventSinkSender::new(map, sink.writer()));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting, filtered connection to an input port of the
|
||||||
|
/// model specified by the address.
|
||||||
|
///
|
||||||
|
/// Events are mapped to another type using the closure provided in
|
||||||
|
/// argument, or ignored if the closure returns `None`.
|
||||||
|
///
|
||||||
|
/// The input port must be an asynchronous method of a model of type `M`
|
||||||
|
/// taking as argument a value of the type returned by the mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn filter_map_connect<M, C, F, U, S>(
|
||||||
|
&mut self,
|
||||||
|
filter_map: C,
|
||||||
|
input: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(FilterMapInputSender::new(
|
||||||
|
filter_map,
|
||||||
|
input,
|
||||||
|
address.into().0,
|
||||||
|
));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting connection to an event sink such as an
|
||||||
|
/// [`EventSlot`](crate::ports::EventSlot) or
|
||||||
|
/// [`EventBuffer`](crate::ports::EventBuffer).
|
||||||
|
///
|
||||||
|
/// Events are mapped to another type using the closure provided in
|
||||||
|
/// argument.
|
||||||
|
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
|
||||||
|
where
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: EventSink<U>,
|
||||||
|
{
|
||||||
|
let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer()));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes the connection specified by the `LineId` parameter.
|
/// Removes the connection specified by the `LineId` parameter.
|
||||||
///
|
///
|
||||||
/// It is a logic error to specify a line identifier from another
|
/// It is a logic error to specify a line identifier from another
|
||||||
@ -125,7 +217,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
|
|||||||
///
|
///
|
||||||
/// The replier port must be an asynchronous method of a model of type `M`
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
/// returning a value of type `R` and taking as argument a value of type `T`
|
/// returning a value of type `R` and taking as argument a value of type `T`
|
||||||
/// plus, optionally, a scheduler reference.
|
/// plus, optionally, a context reference.
|
||||||
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
|
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
@ -136,6 +228,76 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
|
|||||||
self.broadcaster.write().unwrap().add(sender)
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting connection to a replier port of the model
|
||||||
|
/// specified by the address.
|
||||||
|
///
|
||||||
|
/// Queries and replies are mapped to other types using the closures
|
||||||
|
/// provided in argument.
|
||||||
|
///
|
||||||
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
|
/// returning a value of the type returned by the reply mapping closure and
|
||||||
|
/// taking as argument a value of the type returned by the query mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn map_connect<M, C, D, F, U, Q, S>(
|
||||||
|
&mut self,
|
||||||
|
query_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
replier: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send + Sync + 'static,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(MapReplierSender::new(
|
||||||
|
query_map,
|
||||||
|
reply_map,
|
||||||
|
replier,
|
||||||
|
address.into().0,
|
||||||
|
));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting, filtered connection to a replier port of the
|
||||||
|
/// model specified by the address.
|
||||||
|
///
|
||||||
|
/// Queries and replies are mapped to other types using the closures
|
||||||
|
/// provided in argument, or ignored if the query closure returns `None`.
|
||||||
|
///
|
||||||
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
|
/// returning a value of the type returned by the reply mapping closure and
|
||||||
|
/// taking as argument a value of the type returned by the query mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
|
||||||
|
&mut self,
|
||||||
|
query_filer_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
replier: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(FilterMapReplierSender::new(
|
||||||
|
query_filer_map,
|
||||||
|
reply_map,
|
||||||
|
replier,
|
||||||
|
address.into().0,
|
||||||
|
));
|
||||||
|
self.broadcaster.write().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes the connection specified by the `LineId` parameter.
|
/// Removes the connection specified by the `LineId` parameter.
|
||||||
///
|
///
|
||||||
/// It is a logic error to specify a line identifier from another
|
/// It is a logic error to specify a line identifier from another
|
||||||
|
@ -4,9 +4,8 @@ use std::pin::Pin;
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use diatomic_waker::WakeSink;
|
use diatomic_waker::WakeSink;
|
||||||
use recycle_box::{coerce_box, RecycleBox};
|
|
||||||
|
|
||||||
use super::sender::{SendError, Sender};
|
use super::sender::{RecycledFuture, SendError, Sender};
|
||||||
use super::LineId;
|
use super::LineId;
|
||||||
use crate::util::task_set::TaskSet;
|
use crate::util::task_set::TaskSet;
|
||||||
|
|
||||||
@ -20,8 +19,8 @@ use crate::util::task_set::TaskSet;
|
|||||||
/// This is somewhat similar to what `FuturesOrdered` in the `futures` crate
|
/// This is somewhat similar to what `FuturesOrdered` in the `futures` crate
|
||||||
/// does, but with some key differences:
|
/// does, but with some key differences:
|
||||||
///
|
///
|
||||||
/// - tasks and future storage are reusable to avoid repeated allocation, so
|
/// - tasks, output storage and future storage are reusable to avoid repeated
|
||||||
/// allocation occurs only after a new sender is added,
|
/// allocation, so allocation occurs only after a new sender is added,
|
||||||
/// - the outputs of all sender futures are returned all at once rather than
|
/// - the outputs of all sender futures are returned all at once rather than
|
||||||
/// with an asynchronous iterator (a.k.a. async stream).
|
/// with an asynchronous iterator (a.k.a. async stream).
|
||||||
pub(super) struct BroadcasterInner<T: Clone, R> {
|
pub(super) struct BroadcasterInner<T: Clone, R> {
|
||||||
@ -46,10 +45,12 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
self.next_line_id += 1;
|
self.next_line_id += 1;
|
||||||
|
|
||||||
self.senders.push((line_id, sender));
|
self.senders.push((line_id, sender));
|
||||||
|
self.shared.outputs.push(None);
|
||||||
|
|
||||||
self.shared.futures_env.push(FutureEnv::default());
|
// The storage is alway an empty vector so we just book some capacity.
|
||||||
|
if let Some(storage) = self.shared.storage.as_mut() {
|
||||||
self.shared.task_set.resize(self.senders.len());
|
let _ = storage.try_reserve(self.senders.len());
|
||||||
|
};
|
||||||
|
|
||||||
line_id
|
line_id
|
||||||
}
|
}
|
||||||
@ -61,8 +62,7 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
pub(super) fn remove(&mut self, id: LineId) -> bool {
|
pub(super) fn remove(&mut self, id: LineId) -> bool {
|
||||||
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
|
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
|
||||||
self.senders.swap_remove(pos);
|
self.senders.swap_remove(pos);
|
||||||
self.shared.futures_env.swap_remove(pos);
|
self.shared.outputs.truncate(self.senders.len());
|
||||||
self.shared.task_set.resize(self.senders.len());
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -73,8 +73,7 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
/// Removes all senders.
|
/// Removes all senders.
|
||||||
pub(super) fn clear(&mut self) {
|
pub(super) fn clear(&mut self) {
|
||||||
self.senders.clear();
|
self.senders.clear();
|
||||||
self.shared.futures_env.clear();
|
self.shared.outputs.clear();
|
||||||
self.shared.task_set.resize(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of connected senders.
|
/// Returns the number of connected senders.
|
||||||
@ -82,41 +81,35 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
self.senders.len()
|
self.senders.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Efficiently broadcasts a message or a query to multiple addresses.
|
/// Return a list of futures broadcasting an event or query to multiple
|
||||||
///
|
/// addresses.
|
||||||
/// This method does not collect the responses from queries.
|
#[allow(clippy::type_complexity)]
|
||||||
fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> {
|
fn futures(
|
||||||
|
&mut self,
|
||||||
|
arg: T,
|
||||||
|
) -> (
|
||||||
|
&'_ mut Shared<R>,
|
||||||
|
Vec<RecycledFuture<'_, Result<R, SendError>>>,
|
||||||
|
) {
|
||||||
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
|
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
|
||||||
|
|
||||||
// Broadcast the message and collect all futures.
|
// Broadcast the message and collect all futures.
|
||||||
let mut iter = self
|
let mut iter = self.senders.iter_mut();
|
||||||
.senders
|
while let Some(sender) = iter.next() {
|
||||||
.iter_mut()
|
|
||||||
.zip(self.shared.futures_env.iter_mut());
|
|
||||||
while let Some((sender, futures_env)) = iter.next() {
|
|
||||||
let future_cache = futures_env
|
|
||||||
.storage
|
|
||||||
.take()
|
|
||||||
.unwrap_or_else(|| RecycleBox::new(()));
|
|
||||||
|
|
||||||
// Move the argument rather than clone it for the last future.
|
// Move the argument rather than clone it for the last future.
|
||||||
if iter.len() == 0 {
|
if iter.len() == 0 {
|
||||||
let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> =
|
if let Some(fut) = sender.1.send(arg) {
|
||||||
coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg)));
|
futures.push(fut);
|
||||||
|
}
|
||||||
futures.push(RecycleBox::into_pin(future));
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> = coerce_box!(
|
if let Some(fut) = sender.1.send(arg.clone()) {
|
||||||
RecycleBox::recycle(future_cache, sender.1.send(arg.clone()))
|
futures.push(fut);
|
||||||
);
|
}
|
||||||
|
|
||||||
futures.push(RecycleBox::into_pin(future));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate the global future.
|
(&mut self.shared, futures)
|
||||||
BroadcastFuture::new(&mut self.shared, futures)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,7 +125,7 @@ impl<T: Clone, R> Default for BroadcasterInner<T, R> {
|
|||||||
shared: Shared {
|
shared: Shared {
|
||||||
wake_sink,
|
wake_sink,
|
||||||
task_set: TaskSet::new(wake_src),
|
task_set: TaskSet::new(wake_src),
|
||||||
futures_env: Vec::new(),
|
outputs: Vec::new(),
|
||||||
storage: None,
|
storage: None,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -195,10 +188,22 @@ impl<T: Clone> EventBroadcaster<T> {
|
|||||||
match self.inner.senders.as_mut_slice() {
|
match self.inner.senders.as_mut_slice() {
|
||||||
// No sender.
|
// No sender.
|
||||||
[] => Ok(()),
|
[] => Ok(()),
|
||||||
// One sender.
|
|
||||||
[sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}),
|
// One sender at most.
|
||||||
// Multiple senders.
|
[sender] => match sender.1.send(arg) {
|
||||||
_ => self.inner.broadcast(arg).await,
|
None => Ok(()),
|
||||||
|
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
|
||||||
|
},
|
||||||
|
|
||||||
|
// Possibly multiple senders.
|
||||||
|
_ => {
|
||||||
|
let (shared, mut futures) = self.inner.futures(arg);
|
||||||
|
match futures.as_mut_slice() {
|
||||||
|
[] => Ok(()),
|
||||||
|
[fut] => fut.await.map_err(|_| BroadcastError {}),
|
||||||
|
_ => BroadcastFuture::new(shared, futures).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -256,26 +261,50 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
arg: T,
|
arg: T,
|
||||||
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
|
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
|
||||||
match self.inner.senders.as_mut_slice() {
|
let output_count = match self.inner.senders.as_mut_slice() {
|
||||||
// No sender.
|
// No sender.
|
||||||
[] => {}
|
[] => 0,
|
||||||
// One sender.
|
|
||||||
|
// One sender at most.
|
||||||
[sender] => {
|
[sender] => {
|
||||||
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
|
if let Some(fut) = sender.1.send(arg) {
|
||||||
self.inner.shared.futures_env[0].output = Some(output);
|
let output = fut.await.map_err(|_| BroadcastError {})?;
|
||||||
|
self.inner.shared.outputs[0] = Some(output);
|
||||||
|
|
||||||
|
1
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Possibly multiple senders.
|
||||||
|
_ => {
|
||||||
|
let (shared, mut futures) = self.inner.futures(arg);
|
||||||
|
let output_count = futures.len();
|
||||||
|
|
||||||
|
match futures.as_mut_slice() {
|
||||||
|
[] => {}
|
||||||
|
[fut] => {
|
||||||
|
let output = fut.await.map_err(|_| BroadcastError {})?;
|
||||||
|
shared.outputs[0] = Some(output);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
BroadcastFuture::new(shared, futures).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
output_count
|
||||||
}
|
}
|
||||||
// Multiple senders.
|
|
||||||
_ => self.inner.broadcast(arg).await?,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// At this point all outputs should be available so `unwrap` can be
|
// At this point all outputs should be available.
|
||||||
// called on the output of each future.
|
|
||||||
let outputs = self
|
let outputs = self
|
||||||
.inner
|
.inner
|
||||||
.shared
|
.shared
|
||||||
.futures_env
|
.outputs
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.map(|t| t.output.take().unwrap());
|
.take(output_count)
|
||||||
|
.map(|t| t.take().unwrap());
|
||||||
|
|
||||||
Ok(outputs)
|
Ok(outputs)
|
||||||
}
|
}
|
||||||
@ -297,40 +326,20 @@ impl<T: Clone, R> Clone for QueryBroadcaster<T, R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Data related to a sender future.
|
|
||||||
struct FutureEnv<R> {
|
|
||||||
/// Cached storage for the future.
|
|
||||||
storage: Option<RecycleBox<()>>,
|
|
||||||
/// Output of the associated future.
|
|
||||||
output: Option<R>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R> Default for FutureEnv<R> {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
storage: None,
|
|
||||||
output: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A type-erased `Send` future wrapped in a `RecycleBox`.
|
|
||||||
type RecycleBoxFuture<'a, R> = RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + 'a>;
|
|
||||||
|
|
||||||
/// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`.
|
/// Fields of `Broadcaster` that are explicitly borrowed by a `BroadcastFuture`.
|
||||||
struct Shared<R> {
|
struct Shared<R> {
|
||||||
/// Thread-safe waker handle.
|
/// Thread-safe waker handle.
|
||||||
wake_sink: WakeSink,
|
wake_sink: WakeSink,
|
||||||
/// Tasks associated to the sender futures.
|
/// Tasks associated to the sender futures.
|
||||||
task_set: TaskSet,
|
task_set: TaskSet,
|
||||||
/// Data related to the sender futures.
|
/// Outputs of the sender futures.
|
||||||
futures_env: Vec<FutureEnv<R>>,
|
outputs: Vec<Option<R>>,
|
||||||
/// Cached storage for the sender futures.
|
/// Cached storage for the sender futures.
|
||||||
///
|
///
|
||||||
/// When it exists, the cached storage is always an empty vector but it
|
/// When it exists, the cached storage is always an empty vector but it
|
||||||
/// typically has a non-zero capacity. Its purpose is to reuse the
|
/// typically has a non-zero capacity. Its purpose is to reuse the
|
||||||
/// previously allocated capacity when creating new sender futures.
|
/// previously allocated capacity when creating new sender futures.
|
||||||
storage: Option<Vec<Pin<RecycleBoxFuture<'static, R>>>>,
|
storage: Option<Vec<Pin<RecycledFuture<'static, R>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> Clone for Shared<R> {
|
impl<R> Clone for Shared<R> {
|
||||||
@ -338,13 +347,13 @@ impl<R> Clone for Shared<R> {
|
|||||||
let wake_sink = WakeSink::new();
|
let wake_sink = WakeSink::new();
|
||||||
let wake_src = wake_sink.source();
|
let wake_src = wake_sink.source();
|
||||||
|
|
||||||
let mut futures_env = Vec::new();
|
let mut outputs = Vec::new();
|
||||||
futures_env.resize_with(self.futures_env.len(), Default::default);
|
outputs.resize_with(self.outputs.len(), Default::default);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
wake_sink,
|
wake_sink,
|
||||||
task_set: TaskSet::with_len(wake_src, self.task_set.len()),
|
task_set: TaskSet::new(wake_src),
|
||||||
futures_env,
|
outputs,
|
||||||
storage: None,
|
storage: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -363,7 +372,7 @@ pub(super) struct BroadcastFuture<'a, R> {
|
|||||||
/// Reference to the shared fields of the `Broadcast` object.
|
/// Reference to the shared fields of the `Broadcast` object.
|
||||||
shared: &'a mut Shared<R>,
|
shared: &'a mut Shared<R>,
|
||||||
/// List of all send futures.
|
/// List of all send futures.
|
||||||
futures: ManuallyDrop<Vec<Pin<RecycleBoxFuture<'a, R>>>>,
|
futures: ManuallyDrop<Vec<RecycledFuture<'a, Result<R, SendError>>>>,
|
||||||
/// The total count of futures that have not yet been polled to completion.
|
/// The total count of futures that have not yet been polled to completion.
|
||||||
pending_futures_count: usize,
|
pending_futures_count: usize,
|
||||||
/// State of completion of the future.
|
/// State of completion of the future.
|
||||||
@ -372,14 +381,17 @@ pub(super) struct BroadcastFuture<'a, R> {
|
|||||||
|
|
||||||
impl<'a, R> BroadcastFuture<'a, R> {
|
impl<'a, R> BroadcastFuture<'a, R> {
|
||||||
/// Creates a new `BroadcastFuture`.
|
/// Creates a new `BroadcastFuture`.
|
||||||
fn new(shared: &'a mut Shared<R>, futures: Vec<Pin<RecycleBoxFuture<'a, R>>>) -> Self {
|
fn new(
|
||||||
|
shared: &'a mut Shared<R>,
|
||||||
|
futures: Vec<RecycledFuture<'a, Result<R, SendError>>>,
|
||||||
|
) -> Self {
|
||||||
let pending_futures_count = futures.len();
|
let pending_futures_count = futures.len();
|
||||||
|
shared.task_set.resize(pending_futures_count);
|
||||||
|
|
||||||
assert!(shared.futures_env.len() == pending_futures_count);
|
for output in shared.outputs.iter_mut().take(pending_futures_count) {
|
||||||
|
// Empty the output slots to be used. This is necessary in case the
|
||||||
for futures_env in shared.futures_env.iter_mut() {
|
// previous broadcast future was cancelled.
|
||||||
// Drop the previous output if necessary.
|
output.take();
|
||||||
futures_env.output.take();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BroadcastFuture {
|
BroadcastFuture {
|
||||||
@ -395,12 +407,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Safety: this is safe since `self.futures` is never accessed after it
|
// Safety: this is safe since `self.futures` is never accessed after it
|
||||||
// is moved out.
|
// is moved out.
|
||||||
let mut futures = unsafe { ManuallyDrop::take(&mut self.futures) };
|
let futures = unsafe { ManuallyDrop::take(&mut self.futures) };
|
||||||
|
|
||||||
// Recycle the future-containing boxes.
|
|
||||||
for (future, futures_env) in futures.drain(..).zip(self.shared.futures_env.iter_mut()) {
|
|
||||||
futures_env.storage = Some(RecycleBox::vacate_pinned(future));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recycle the vector that contained the futures.
|
// Recycle the vector that contained the futures.
|
||||||
self.shared.storage = Some(recycle_vec(futures));
|
self.shared.storage = Some(recycle_vec(futures));
|
||||||
@ -413,7 +420,11 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
|
|||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = &mut *self;
|
let this = &mut *self;
|
||||||
|
|
||||||
assert_ne!(this.state, FutureState::Completed);
|
assert_ne!(
|
||||||
|
this.state,
|
||||||
|
FutureState::Completed,
|
||||||
|
"broadcast future polled after completion"
|
||||||
|
);
|
||||||
|
|
||||||
// Poll all sender futures once if this is the first time the broadcast
|
// Poll all sender futures once if this is the first time the broadcast
|
||||||
// future is polled.
|
// future is polled.
|
||||||
@ -425,14 +436,14 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
|
|||||||
this.shared.task_set.discard_scheduled();
|
this.shared.task_set.discard_scheduled();
|
||||||
|
|
||||||
for task_idx in 0..this.futures.len() {
|
for task_idx in 0..this.futures.len() {
|
||||||
let future_env = &mut this.shared.futures_env[task_idx];
|
let output = &mut this.shared.outputs[task_idx];
|
||||||
let future = &mut this.futures[task_idx];
|
let future = std::pin::Pin::new(&mut this.futures[task_idx]);
|
||||||
let task_waker_ref = this.shared.task_set.waker_of(task_idx);
|
let task_waker_ref = this.shared.task_set.waker_of(task_idx);
|
||||||
let task_cx_ref = &mut Context::from_waker(&task_waker_ref);
|
let task_cx_ref = &mut Context::from_waker(&task_waker_ref);
|
||||||
|
|
||||||
match future.as_mut().poll(task_cx_ref) {
|
match future.poll(task_cx_ref) {
|
||||||
Poll::Ready(Ok(output)) => {
|
Poll::Ready(Ok(o)) => {
|
||||||
future_env.output = Some(output);
|
*output = Some(o);
|
||||||
this.pending_futures_count -= 1;
|
this.pending_futures_count -= 1;
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(_)) => {
|
Poll::Ready(Err(_)) => {
|
||||||
@ -477,20 +488,20 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
for task_idx in scheduled_tasks {
|
for task_idx in scheduled_tasks {
|
||||||
let future_env = &mut this.shared.futures_env[task_idx];
|
let output = &mut this.shared.outputs[task_idx];
|
||||||
|
|
||||||
// Do not poll completed futures.
|
// Do not poll completed futures.
|
||||||
if future_env.output.is_some() {
|
if output.is_some() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let future = &mut this.futures[task_idx];
|
let future = std::pin::Pin::new(&mut this.futures[task_idx]);
|
||||||
let task_waker_ref = this.shared.task_set.waker_of(task_idx);
|
let task_waker_ref = this.shared.task_set.waker_of(task_idx);
|
||||||
let task_cx_ref = &mut Context::from_waker(&task_waker_ref);
|
let task_cx_ref = &mut Context::from_waker(&task_waker_ref);
|
||||||
|
|
||||||
match future.as_mut().poll(task_cx_ref) {
|
match future.poll(task_cx_ref) {
|
||||||
Poll::Ready(Ok(output)) => {
|
Poll::Ready(Ok(o)) => {
|
||||||
future_env.output = Some(output);
|
*output = Some(o);
|
||||||
this.pending_futures_count -= 1;
|
this.pending_futures_count -= 1;
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(_)) => {
|
Poll::Ready(Err(_)) => {
|
||||||
@ -703,6 +714,7 @@ mod tests {
|
|||||||
use loom::sync::atomic::{AtomicBool, Ordering};
|
use loom::sync::atomic::{AtomicBool, Ordering};
|
||||||
use loom::thread;
|
use loom::thread;
|
||||||
|
|
||||||
|
use recycle_box::RecycleBox;
|
||||||
use waker_fn::waker_fn;
|
use waker_fn::waker_fn;
|
||||||
|
|
||||||
use super::super::sender::RecycledFuture;
|
use super::super::sender::RecycledFuture;
|
||||||
@ -714,15 +726,15 @@ mod tests {
|
|||||||
fut_storage: Option<RecycleBox<()>>,
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
}
|
}
|
||||||
impl<R: Send> Sender<(), R> for TestEvent<R> {
|
impl<R: Send> Sender<(), R> for TestEvent<R> {
|
||||||
fn send(&mut self, _arg: ()) -> RecycledFuture<'_, Result<R, SendError>> {
|
fn send(&mut self, _arg: ()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let fut_storage = &mut self.fut_storage;
|
let fut_storage = &mut self.fut_storage;
|
||||||
let receiver = &mut self.receiver;
|
let receiver = &mut self.receiver;
|
||||||
|
|
||||||
RecycledFuture::new(fut_storage, async {
|
Some(RecycledFuture::new(fut_storage, async {
|
||||||
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
|
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
|
||||||
|
|
||||||
Ok(stream.next().await.unwrap())
|
Ok(stream.next().await.unwrap())
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -758,6 +770,14 @@ mod tests {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This tests fails with "Concurrent load and mut accesses" even though the
|
||||||
|
// `task_list` implementation which triggers it does not use any unsafe.
|
||||||
|
// This is most certainly related to this Loom bug:
|
||||||
|
//
|
||||||
|
// https://github.com/tokio-rs/loom/issues/260
|
||||||
|
//
|
||||||
|
// Disabling until the bug is fixed.
|
||||||
|
#[ignore]
|
||||||
#[test]
|
#[test]
|
||||||
fn loom_broadcast_basic() {
|
fn loom_broadcast_basic() {
|
||||||
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
||||||
@ -831,6 +851,14 @@ mod tests {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This tests fails with "Concurrent load and mut accesses" even though the
|
||||||
|
// `task_list` implementation which triggers it does not use any unsafe.
|
||||||
|
// This is most certainly related to this Loom bug:
|
||||||
|
//
|
||||||
|
// https://github.com/tokio-rs/loom/issues/260
|
||||||
|
//
|
||||||
|
// Disabling until the bug is fixed.
|
||||||
|
#[ignore]
|
||||||
#[test]
|
#[test]
|
||||||
fn loom_broadcast_spurious() {
|
fn loom_broadcast_spurious() {
|
||||||
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
||||||
|
@ -4,6 +4,7 @@ use std::future::Future;
|
|||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::mem::ManuallyDrop;
|
use std::mem::ManuallyDrop;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use dyn_clone::DynClone;
|
use dyn_clone::DynClone;
|
||||||
@ -17,17 +18,15 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
|
|||||||
/// replier method.
|
/// replier method.
|
||||||
pub(super) trait Sender<T, R>: DynClone + Send {
|
pub(super) trait Sender<T, R>: DynClone + Send {
|
||||||
/// Asynchronously send the event or request.
|
/// Asynchronously send the event or request.
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>;
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
|
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
|
||||||
|
|
||||||
/// An object that can send events to an input port.
|
/// An object that can send events to an input port.
|
||||||
pub(super) struct InputSender<M: 'static, F, T, S>
|
pub(super) struct InputSender<M, F, T, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: 'static,
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + 'static,
|
|
||||||
{
|
{
|
||||||
func: F,
|
func: F,
|
||||||
sender: channel::Sender<M>,
|
sender: channel::Sender<M>,
|
||||||
@ -36,11 +35,9 @@ where
|
|||||||
_phantom_closure_marker: PhantomData<S>,
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send, F, T, S> InputSender<M, F, T, S>
|
impl<M, F, T, S> InputSender<M, F, T, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: 'static,
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + 'static,
|
|
||||||
{
|
{
|
||||||
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -53,14 +50,14 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
|
impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
S: Send + 'static,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
|
|
||||||
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
||||||
@ -69,18 +66,16 @@ where
|
|||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
fut.await.map_err(|_| SendError {})
|
fut.await.map_err(|_| SendError {})
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send, F, T, S> Clone for InputSender<M, F, T, S>
|
impl<M, F, T, S> Clone for InputSender<M, F, T, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: 'static,
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
F: Clone,
|
||||||
T: Send + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -93,8 +88,322 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An object that can send a request to a replier port and retrieve a response.
|
/// An object that can send mapped events to an input port.
|
||||||
pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
|
pub(super) struct MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
map: Arc<C>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U)>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(map: C, func: F, sender: channel::Sender<M>) -> Self {
|
||||||
|
Self {
|
||||||
|
map: Arc::new(map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send + Sync,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let arg = (self.map)(arg);
|
||||||
|
|
||||||
|
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = func.call(model, arg, scheduler);
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
|
fut.await.map_err(|_| SendError {})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> Clone for MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
F: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
map: self.map.clone(),
|
||||||
|
func: self.func.clone(),
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can filter and send mapped events to an input port.
|
||||||
|
pub(super) struct FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
filter_map: Arc<C>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_filter_map: PhantomData<fn(T) -> Option<U>>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U)>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(filter_map: C, func: F, sender: channel::Sender<M>) -> Self {
|
||||||
|
Self {
|
||||||
|
filter_map: Arc::new(filter_map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_filter_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
(self.filter_map)(arg).map(|arg| {
|
||||||
|
let func = self.func.clone();
|
||||||
|
|
||||||
|
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = func.call(model, arg, scheduler);
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
});
|
||||||
|
|
||||||
|
RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
|
fut.await.map_err(|_| SendError {})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> Clone for FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
F: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
filter_map: self.filter_map.clone(),
|
||||||
|
func: self.func.clone(),
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_filter_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can send an event to an event sink.
|
||||||
|
pub(super) struct EventSinkSender<T, W> {
|
||||||
|
writer: W,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_event: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, W> EventSinkSender<T, W> {
|
||||||
|
pub(super) fn new(writer: W) -> Self {
|
||||||
|
Self {
|
||||||
|
writer,
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_event: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
|
||||||
|
where
|
||||||
|
T: Send + 'static,
|
||||||
|
W: EventSinkWriter<T>,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
let writer = &mut self.writer;
|
||||||
|
|
||||||
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
|
writer.write(arg);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, W: Clone> Clone for EventSinkSender<T, W> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
writer: self.writer.clone(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_event: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can send mapped events to an event sink.
|
||||||
|
pub(super) struct MapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
C: Fn(T) -> U,
|
||||||
|
{
|
||||||
|
writer: W,
|
||||||
|
map: Arc<C>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_event: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
C: Fn(T) -> U,
|
||||||
|
{
|
||||||
|
pub(super) fn new(map: C, writer: W) -> Self {
|
||||||
|
Self {
|
||||||
|
writer,
|
||||||
|
map: Arc::new(map),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_event: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
T: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
C: Fn(T) -> U + Send + Sync,
|
||||||
|
W: EventSinkWriter<U>,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
let writer = &mut self.writer;
|
||||||
|
let arg = (self.map)(arg);
|
||||||
|
|
||||||
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
|
writer.write(arg);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
C: Fn(T) -> U,
|
||||||
|
W: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
writer: self.writer.clone(),
|
||||||
|
map: self.map.clone(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_event: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can filter and send mapped events to an event sink.
|
||||||
|
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
C: Fn(T) -> Option<U>,
|
||||||
|
{
|
||||||
|
writer: W,
|
||||||
|
filter_map: Arc<C>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_event: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
C: Fn(T) -> Option<U>,
|
||||||
|
{
|
||||||
|
pub(super) fn new(filter_map: C, writer: W) -> Self {
|
||||||
|
Self {
|
||||||
|
writer,
|
||||||
|
filter_map: Arc::new(filter_map),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_event: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
T: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync,
|
||||||
|
W: EventSinkWriter<U>,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
let writer = &mut self.writer;
|
||||||
|
|
||||||
|
(self.filter_map)(arg).map(|arg| {
|
||||||
|
RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
|
writer.write(arg);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
|
||||||
|
where
|
||||||
|
C: Fn(T) -> Option<U>,
|
||||||
|
W: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
writer: self.writer.clone(),
|
||||||
|
filter_map: self.filter_map.clone(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_event: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can send requests to a replier port and retrieve responses.
|
||||||
|
pub(super) struct ReplierSender<M, F, T, R, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
func: F,
|
func: F,
|
||||||
sender: channel::Sender<M>,
|
sender: channel::Sender<M>,
|
||||||
receiver: multishot::Receiver<R>,
|
receiver: multishot::Receiver<R>,
|
||||||
@ -106,9 +415,6 @@ pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
|
|||||||
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
|
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> ReplierFn<'a, M, T, R, S>,
|
|
||||||
T: Send + 'static,
|
|
||||||
R: Send + 'static,
|
|
||||||
{
|
{
|
||||||
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -130,7 +436,7 @@ where
|
|||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = &mut self.sender;
|
let sender = &mut self.sender;
|
||||||
let reply_receiver = &mut self.receiver;
|
let reply_receiver = &mut self.receiver;
|
||||||
@ -149,6 +455,219 @@ where
|
|||||||
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Some(RecycledFuture::new(fut_storage, async move {
|
||||||
|
// Send the message.
|
||||||
|
send_fut.await.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
|
// Wait until the message is processed and the reply is sent back.
|
||||||
|
// If an error is received, it most likely means the mailbox was
|
||||||
|
// dropped before the message was processed.
|
||||||
|
reply_receiver.recv().await.map_err(|_| SendError {})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, F, T, R, S> Clone for ReplierSender<M, F, T, R, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
func: self.func.clone(),
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
receiver: multishot::Receiver::new(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can send mapped requests to a replier port and retrieve
|
||||||
|
/// mapped responses.
|
||||||
|
pub(super) struct MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
|
query_map: Arc<C>,
|
||||||
|
reply_map: Arc<D>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
receiver: multishot::Receiver<Q>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_query_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_reply_map: PhantomData<fn(Q) -> R>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
|
pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender<M>) -> Self {
|
||||||
|
Self {
|
||||||
|
query_map: Arc::new(query_map),
|
||||||
|
reply_map: Arc::new(reply_map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
receiver: multishot::Receiver::new(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send + Sync,
|
||||||
|
D: Fn(Q) -> R + Send + Sync,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let arg = (self.query_map)(arg);
|
||||||
|
let sender = &mut self.sender;
|
||||||
|
let reply_receiver = &mut self.receiver;
|
||||||
|
let fut_storage = &mut self.fut_storage;
|
||||||
|
let reply_map = &*self.reply_map;
|
||||||
|
|
||||||
|
// The previous future generated by this method should have been polled
|
||||||
|
// to completion so a new sender should be readily available.
|
||||||
|
let reply_sender = reply_receiver.sender().unwrap();
|
||||||
|
|
||||||
|
let send_fut = sender.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = async move {
|
||||||
|
let reply = func.call(model, arg, scheduler).await;
|
||||||
|
reply_sender.send(reply);
|
||||||
|
};
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(RecycledFuture::new(fut_storage, async move {
|
||||||
|
// Send the message.
|
||||||
|
send_fut.await.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
|
// Wait until the message is processed and the reply is sent back.
|
||||||
|
// If an error is received, it most likely means the mailbox was
|
||||||
|
// dropped before the message was processed.
|
||||||
|
reply_receiver
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
.map(reply_map)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Clone for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
query_map: self.query_map.clone(),
|
||||||
|
reply_map: self.reply_map.clone(),
|
||||||
|
func: self.func.clone(),
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
receiver: multishot::Receiver::new(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can filter and send mapped requests to a replier port and
|
||||||
|
/// retrieve mapped responses.
|
||||||
|
pub(super) struct FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
|
query_filter_map: Arc<C>,
|
||||||
|
reply_map: Arc<D>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
receiver: multishot::Receiver<Q>,
|
||||||
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
|
_phantom_query_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_reply_map: PhantomData<fn(Q) -> R>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
{
|
||||||
|
pub(super) fn new(
|
||||||
|
query_filter_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
query_filter_map: Arc::new(query_filter_map),
|
||||||
|
reply_map: Arc::new(reply_map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
receiver: multishot::Receiver::new(),
|
||||||
|
fut_storage: None,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + Sync,
|
||||||
|
D: Fn(Q) -> R + Send + Sync,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
|
(self.query_filter_map)(arg).map(|arg| {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let sender = &mut self.sender;
|
||||||
|
let reply_receiver = &mut self.receiver;
|
||||||
|
let fut_storage = &mut self.fut_storage;
|
||||||
|
let reply_map = &*self.reply_map;
|
||||||
|
|
||||||
|
// The previous future generated by this method should have been polled
|
||||||
|
// to completion so a new sender should be readily available.
|
||||||
|
let reply_sender = reply_receiver.sender().unwrap();
|
||||||
|
|
||||||
|
let send_fut = sender.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = async move {
|
||||||
|
let reply = func.call(model, arg, scheduler).await;
|
||||||
|
reply_sender.send(reply);
|
||||||
|
};
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
});
|
||||||
|
|
||||||
RecycledFuture::new(fut_storage, async move {
|
RecycledFuture::new(fut_storage, async move {
|
||||||
// Send the message.
|
// Send the message.
|
||||||
send_fut.await.map_err(|_| SendError {})?;
|
send_fut.await.map_err(|_| SendError {})?;
|
||||||
@ -156,78 +675,37 @@ where
|
|||||||
// Wait until the message is processed and the reply is sent back.
|
// Wait until the message is processed and the reply is sent back.
|
||||||
// If an error is received, it most likely means the mailbox was
|
// If an error is received, it most likely means the mailbox was
|
||||||
// dropped before the message was processed.
|
// dropped before the message was processed.
|
||||||
reply_receiver.recv().await.map_err(|_| SendError {})
|
reply_receiver
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
.map(reply_map)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M, F, T, R, S> Clone for ReplierSender<M, F, T, R, S>
|
impl<M, C, D, F, T, R, U, Q, S> Clone for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
|
F: Clone,
|
||||||
T: Send + 'static,
|
|
||||||
R: Send + 'static,
|
|
||||||
S: Send,
|
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
query_filter_map: self.query_filter_map.clone(),
|
||||||
|
reply_map: self.reply_map.clone(),
|
||||||
func: self.func.clone(),
|
func: self.func.clone(),
|
||||||
sender: self.sender.clone(),
|
sender: self.sender.clone(),
|
||||||
receiver: multishot::Receiver::new(),
|
receiver: multishot::Receiver::new(),
|
||||||
fut_storage: None,
|
fut_storage: None,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
_phantom_closure: PhantomData,
|
_phantom_closure: PhantomData,
|
||||||
_phantom_closure_marker: PhantomData,
|
_phantom_closure_marker: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An object that can send a payload to an event sink.
|
|
||||||
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>> {
|
|
||||||
writer: W,
|
|
||||||
fut_storage: Option<RecycleBox<()>>,
|
|
||||||
_phantom_event: PhantomData<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Send + 'static, W: EventSinkWriter<T>> EventSinkSender<T, W> {
|
|
||||||
pub(super) fn new(writer: W) -> Self {
|
|
||||||
Self {
|
|
||||||
writer,
|
|
||||||
fut_storage: None,
|
|
||||||
_phantom_event: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
|
|
||||||
where
|
|
||||||
T: Send + 'static,
|
|
||||||
W: EventSinkWriter<T>,
|
|
||||||
{
|
|
||||||
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
|
|
||||||
let writer = &mut self.writer;
|
|
||||||
|
|
||||||
RecycledFuture::new(&mut self.fut_storage, async move {
|
|
||||||
writer.write(arg);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, W> Clone for EventSinkSender<T, W>
|
|
||||||
where
|
|
||||||
T: Send + 'static,
|
|
||||||
W: EventSinkWriter<T>,
|
|
||||||
{
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
writer: self.writer.clone(),
|
|
||||||
fut_storage: None,
|
|
||||||
_phantom_event: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Error returned when the mailbox was closed or dropped.
|
/// Error returned when the mailbox was closed or dropped.
|
||||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||||
pub(super) struct SendError {}
|
pub(super) struct SendError {}
|
||||||
|
@ -13,9 +13,11 @@ use crate::simulation::{
|
|||||||
};
|
};
|
||||||
use crate::util::slot;
|
use crate::util::slot;
|
||||||
|
|
||||||
use broadcaster::ReplyIterator;
|
use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator};
|
||||||
use broadcaster::{EventBroadcaster, QueryBroadcaster};
|
use sender::{
|
||||||
use sender::{InputSender, ReplierSender};
|
FilterMapInputSender, FilterMapReplierSender, InputSender, MapInputSender, MapReplierSender,
|
||||||
|
ReplierSender,
|
||||||
|
};
|
||||||
|
|
||||||
use super::ReplierFn;
|
use super::ReplierFn;
|
||||||
|
|
||||||
@ -51,6 +53,58 @@ impl<T: Clone + Send + 'static> EventSource<T> {
|
|||||||
self.broadcaster.lock().unwrap().add(sender)
|
self.broadcaster.lock().unwrap().add(sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting connection to an input port of the model
|
||||||
|
/// specified by the address.
|
||||||
|
///
|
||||||
|
/// Events are mapped to another type using the closure provided in
|
||||||
|
/// argument.
|
||||||
|
///
|
||||||
|
/// The input port must be an asynchronous method of a model of type `M`
|
||||||
|
/// taking as argument a value of the type returned by the mapping closure
|
||||||
|
/// plus, optionally, a context reference.
|
||||||
|
pub fn map_connect<M, C, F, U, S>(
|
||||||
|
&mut self,
|
||||||
|
map: C,
|
||||||
|
input: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send + 'static,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
|
||||||
|
self.broadcaster.lock().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting, filtered connection to an input port of the
|
||||||
|
/// model specified by the address.
|
||||||
|
///
|
||||||
|
/// Events are mapped to another type using the closure provided in
|
||||||
|
/// argument, or ignored if the closure returns `None`.
|
||||||
|
///
|
||||||
|
/// The input port must be an asynchronous method of a model of type `M`
|
||||||
|
/// taking as argument a value of the type returned by the mapping closure
|
||||||
|
/// plus, optionally, a context reference.
|
||||||
|
pub fn filter_map_connect<M, C, F, U, S>(
|
||||||
|
&mut self,
|
||||||
|
map: C,
|
||||||
|
input: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + 'static,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0));
|
||||||
|
self.broadcaster.lock().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes the connection specified by the `LineId` parameter.
|
/// Removes the connection specified by the `LineId` parameter.
|
||||||
///
|
///
|
||||||
/// It is a logic error to specify a line identifier from another
|
/// It is a logic error to specify a line identifier from another
|
||||||
@ -193,7 +247,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
|
|||||||
///
|
///
|
||||||
/// The replier port must be an asynchronous method of a model of type `M`
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
/// returning a value of type `R` and taking as argument a value of type `T`
|
/// returning a value of type `R` and taking as argument a value of type `T`
|
||||||
/// plus, optionally, a scheduler reference.
|
/// plus, optionally, a context reference.
|
||||||
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
|
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
@ -204,6 +258,76 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
|
|||||||
self.broadcaster.lock().unwrap().add(sender)
|
self.broadcaster.lock().unwrap().add(sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting connection to a replier port of the model
|
||||||
|
/// specified by the address.
|
||||||
|
///
|
||||||
|
/// Queries and replies are mapped to other types using the closures
|
||||||
|
/// provided in argument.
|
||||||
|
///
|
||||||
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
|
/// returning a value of the type returned by the reply mapping closure and
|
||||||
|
/// taking as argument a value of the type returned by the query mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn map_connect<M, C, D, F, U, Q, S>(
|
||||||
|
&mut self,
|
||||||
|
query_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
replier: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send + 'static,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(MapReplierSender::new(
|
||||||
|
query_map,
|
||||||
|
reply_map,
|
||||||
|
replier,
|
||||||
|
address.into().0,
|
||||||
|
));
|
||||||
|
self.broadcaster.lock().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adds an auto-converting, filtered connection to a replier port of the
|
||||||
|
/// model specified by the address.
|
||||||
|
///
|
||||||
|
/// Queries and replies are mapped to other types using the closures
|
||||||
|
/// provided in argument, or ignored if the query closure returns `None`.
|
||||||
|
///
|
||||||
|
/// The replier port must be an asynchronous method of a model of type `M`
|
||||||
|
/// returning a value of the type returned by the reply mapping closure and
|
||||||
|
/// taking as argument a value of the type returned by the query mapping
|
||||||
|
/// closure plus, optionally, a context reference.
|
||||||
|
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
|
||||||
|
&mut self,
|
||||||
|
query_filter_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
replier: F,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> LineId
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send + 'static,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let sender = Box::new(FilterMapReplierSender::new(
|
||||||
|
query_filter_map,
|
||||||
|
reply_map,
|
||||||
|
replier,
|
||||||
|
address.into().0,
|
||||||
|
));
|
||||||
|
self.broadcaster.lock().unwrap().add(sender)
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes the connection specified by the `LineId` parameter.
|
/// Removes the connection specified by the `LineId` parameter.
|
||||||
///
|
///
|
||||||
/// It is a logic error to specify a line identifier from another
|
/// It is a logic error to specify a line identifier from another
|
||||||
|
@ -71,26 +71,27 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
self.senders.len()
|
self.senders.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Efficiently broadcasts a message or a query to multiple addresses.
|
/// Return a list of futures broadcasting an event or query to multiple
|
||||||
///
|
/// addresses.
|
||||||
/// This method does not collect the responses from queries.
|
fn futures(&mut self, arg: T) -> Vec<SenderFutureState<R>> {
|
||||||
fn broadcast(&mut self, arg: T) -> BroadcastFuture<R> {
|
let mut future_states = Vec::new();
|
||||||
let mut future_states = Vec::with_capacity(self.senders.len());
|
|
||||||
|
|
||||||
// Broadcast the message and collect all futures.
|
// Broadcast the message and collect all futures.
|
||||||
let mut iter = self.senders.iter_mut();
|
let mut iter = self.senders.iter_mut();
|
||||||
while let Some(sender) = iter.next() {
|
while let Some(sender) = iter.next() {
|
||||||
// Move the argument rather than clone it for the last future.
|
// Move the argument rather than clone it for the last future.
|
||||||
if iter.len() == 0 {
|
if iter.len() == 0 {
|
||||||
future_states.push(SenderFutureState::Pending(sender.1.send(arg)));
|
if let Some(fut) = sender.1.send(arg) {
|
||||||
|
future_states.push(SenderFutureState::Pending(fut));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if let Some(fut) = sender.1.send(arg.clone()) {
|
||||||
future_states.push(SenderFutureState::Pending(sender.1.send(arg.clone())));
|
future_states.push(SenderFutureState::Pending(fut));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate the global future.
|
future_states
|
||||||
BroadcastFuture::new(future_states)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,17 +158,27 @@ impl<T: Clone + Send> EventBroadcaster<T> {
|
|||||||
let fut = match self.inner.senders.as_mut_slice() {
|
let fut = match self.inner.senders.as_mut_slice() {
|
||||||
// No sender.
|
// No sender.
|
||||||
[] => Fut::Empty,
|
[] => Fut::Empty,
|
||||||
// One sender.
|
// One sender at most.
|
||||||
[sender] => Fut::Single(sender.1.send(arg)),
|
[sender] => Fut::Single(sender.1.send(arg)),
|
||||||
// Multiple senders.
|
// Possibly multiple senders.
|
||||||
_ => Fut::Multiple(self.inner.broadcast(arg)),
|
_ => Fut::Multiple(self.inner.futures(arg)),
|
||||||
};
|
};
|
||||||
|
|
||||||
async {
|
async {
|
||||||
match fut {
|
match fut {
|
||||||
Fut::Empty => Ok(()),
|
// No sender.
|
||||||
Fut::Single(fut) => fut.await.map_err(|_| BroadcastError {}),
|
Fut::Empty | Fut::Single(None) => Ok(()),
|
||||||
Fut::Multiple(fut) => fut.await.map(|_| ()),
|
|
||||||
|
Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}),
|
||||||
|
|
||||||
|
Fut::Multiple(mut futures) => match futures.as_mut_slice() {
|
||||||
|
// No sender.
|
||||||
|
[] => Ok(()),
|
||||||
|
// One sender.
|
||||||
|
[SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}),
|
||||||
|
// Multiple senders.
|
||||||
|
_ => BroadcastFuture::new(futures).await.map(|_| ()),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -235,20 +246,39 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
|
|||||||
let fut = match self.inner.senders.as_mut_slice() {
|
let fut = match self.inner.senders.as_mut_slice() {
|
||||||
// No sender.
|
// No sender.
|
||||||
[] => Fut::Empty,
|
[] => Fut::Empty,
|
||||||
// One sender.
|
// One sender at most.
|
||||||
[sender] => Fut::Single(sender.1.send(arg)),
|
[sender] => Fut::Single(sender.1.send(arg)),
|
||||||
// Multiple senders.
|
// Possibly multiple senders.
|
||||||
_ => Fut::Multiple(self.inner.broadcast(arg)),
|
_ => Fut::Multiple(self.inner.futures(arg)),
|
||||||
};
|
};
|
||||||
|
|
||||||
async {
|
async {
|
||||||
match fut {
|
match fut {
|
||||||
Fut::Empty => Ok(ReplyIterator(Vec::new().into_iter())),
|
// No sender.
|
||||||
Fut::Single(fut) => fut
|
Fut::Empty | Fut::Single(None) => Ok(ReplyIterator(Vec::new().into_iter())),
|
||||||
|
|
||||||
|
Fut::Single(Some(fut)) => fut
|
||||||
.await
|
.await
|
||||||
.map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()))
|
.map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()))
|
||||||
.map_err(|_| BroadcastError {}),
|
.map_err(|_| BroadcastError {}),
|
||||||
Fut::Multiple(fut) => fut.await.map_err(|_| BroadcastError {}),
|
|
||||||
|
Fut::Multiple(mut futures) => match futures.as_mut_slice() {
|
||||||
|
// No sender.
|
||||||
|
[] => Ok(ReplyIterator(Vec::new().into_iter())),
|
||||||
|
|
||||||
|
// One sender.
|
||||||
|
[SenderFutureState::Pending(fut)] => fut
|
||||||
|
.await
|
||||||
|
.map(|reply| {
|
||||||
|
ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())
|
||||||
|
})
|
||||||
|
.map_err(|_| BroadcastError {}),
|
||||||
|
|
||||||
|
// Multiple senders.
|
||||||
|
_ => BroadcastFuture::new(futures)
|
||||||
|
.await
|
||||||
|
.map_err(|_| BroadcastError {}),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -598,14 +628,17 @@ mod tests {
|
|||||||
receiver: Option<mpsc::UnboundedReceiver<Option<R>>>,
|
receiver: Option<mpsc::UnboundedReceiver<Option<R>>>,
|
||||||
}
|
}
|
||||||
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
|
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
|
||||||
fn send(&mut self, _arg: ()) -> Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>> {
|
fn send(
|
||||||
|
&mut self,
|
||||||
|
_arg: (),
|
||||||
|
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
|
||||||
let receiver = self.receiver.take().unwrap();
|
let receiver = self.receiver.take().unwrap();
|
||||||
|
|
||||||
Box::pin(async move {
|
Some(Box::pin(async move {
|
||||||
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
|
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
|
||||||
|
|
||||||
Ok(stream.next().await.unwrap())
|
Ok(stream.next().await.unwrap())
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -634,8 +667,16 @@ mod tests {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This tests fails with "Concurrent load and mut accesses" even though the
|
||||||
|
// `task_list` implementation which triggers it does not use any unsafe.
|
||||||
|
// This is most certainly related to this Loom bug:
|
||||||
|
//
|
||||||
|
// https://github.com/tokio-rs/loom/issues/260
|
||||||
|
//
|
||||||
|
// Disabling until the bug is fixed.
|
||||||
|
#[ignore]
|
||||||
#[test]
|
#[test]
|
||||||
fn loom_broadcast_basic() {
|
fn loom_broadcast_query_basic() {
|
||||||
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
||||||
|
|
||||||
let mut builder = Builder::new();
|
let mut builder = Builder::new();
|
||||||
@ -707,8 +748,16 @@ mod tests {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This tests fails with "Concurrent load and mut accesses" even though the
|
||||||
|
// `task_list` implementation which triggers it does not use any unsafe.
|
||||||
|
// This is most certainly related to this Loom bug:
|
||||||
|
//
|
||||||
|
// https://github.com/tokio-rs/loom/issues/260
|
||||||
|
//
|
||||||
|
// Disabling until the bug is fixed.
|
||||||
|
#[ignore]
|
||||||
#[test]
|
#[test]
|
||||||
fn loom_broadcast_spurious() {
|
fn loom_broadcast_query_spurious() {
|
||||||
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
const DEFAULT_PREEMPTION_BOUND: usize = 3;
|
||||||
|
|
||||||
let mut builder = Builder::new();
|
let mut builder = Builder::new();
|
||||||
|
@ -3,6 +3,7 @@ use std::fmt;
|
|||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures_channel::oneshot;
|
use futures_channel::oneshot;
|
||||||
use recycle_box::{coerce_box, RecycleBox};
|
use recycle_box::{coerce_box, RecycleBox};
|
||||||
@ -16,22 +17,23 @@ pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendErro
|
|||||||
/// An event or query sender abstracting over the target model and input method.
|
/// An event or query sender abstracting over the target model and input method.
|
||||||
pub(super) trait Sender<T, R>: Send {
|
pub(super) trait Sender<T, R>: Send {
|
||||||
/// Asynchronously send the event or request.
|
/// Asynchronously send the event or request.
|
||||||
fn send(&mut self, arg: T) -> SenderFuture<R>;
|
fn send(&mut self, arg: T) -> Option<SenderFuture<R>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An object that can send events to an input port.
|
/// An object that can send events to an input port.
|
||||||
pub(super) struct InputSender<M: 'static, F, T, S> {
|
pub(super) struct InputSender<M, F, T, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
func: F,
|
func: F,
|
||||||
sender: channel::Sender<M>,
|
sender: channel::Sender<M>,
|
||||||
_phantom_closure: PhantomData<fn(&mut M, T)>,
|
_phantom_closure: PhantomData<fn(&mut M, T)>,
|
||||||
_phantom_closure_marker: PhantomData<S>,
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send, F, T, S> InputSender<M, F, T, S>
|
impl<M, F, T, S> InputSender<M, F, T, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: 'static,
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + 'static,
|
|
||||||
{
|
{
|
||||||
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -43,14 +45,126 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Send, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
|
impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
S: Send + 'static,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> SenderFuture<()> {
|
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let sender = self.sender.clone();
|
||||||
|
|
||||||
|
Some(Box::pin(async move {
|
||||||
|
sender
|
||||||
|
.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = func.call(model, arg, scheduler);
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can send mapped events to an input port.
|
||||||
|
pub(super) struct MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
map: C,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
_phantom_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, T)>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(map: C, func: F, sender: channel::Sender<M>) -> Self {
|
||||||
|
Self {
|
||||||
|
map,
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
_phantom_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let arg = (self.map)(arg);
|
||||||
|
let sender = self.sender.clone();
|
||||||
|
|
||||||
|
Some(Box::pin(async move {
|
||||||
|
sender
|
||||||
|
.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = func.call(model, arg, scheduler);
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can filter and send mapped events to an input port.
|
||||||
|
pub(super) struct FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
filter_map: C,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
_phantom_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, T)>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(filter_map: C, func: F, sender: channel::Sender<M>) -> Self {
|
||||||
|
Self {
|
||||||
|
filter_map,
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
_phantom_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send,
|
||||||
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
||||||
|
(self.filter_map)(arg).map(|arg| {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
|
|
||||||
@ -63,12 +177,16 @@ where
|
|||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.map_err(|_| SendError {})
|
.map_err(|_| SendError {})
|
||||||
|
}) as SenderFuture<()>
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An object that can send a request to a replier port and retrieve a response.
|
/// An object that can send a request to a replier port and retrieve a response.
|
||||||
pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
|
pub(super) struct ReplierSender<M, F, T, R, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
func: F,
|
func: F,
|
||||||
sender: channel::Sender<M>,
|
sender: channel::Sender<M>,
|
||||||
_phantom_closure: PhantomData<fn(&mut M, T) -> R>,
|
_phantom_closure: PhantomData<fn(&mut M, T) -> R>,
|
||||||
@ -77,10 +195,7 @@ pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
|
|||||||
|
|
||||||
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
|
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, T, R, S>,
|
|
||||||
T: Send + 'static,
|
|
||||||
R: Send + 'static,
|
|
||||||
{
|
{
|
||||||
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -100,11 +215,161 @@ where
|
|||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> SenderFuture<R> {
|
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
let (reply_sender, reply_receiver) = oneshot::channel();
|
let (reply_sender, reply_receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
Some(Box::pin(async move {
|
||||||
|
sender
|
||||||
|
.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = async move {
|
||||||
|
let reply = func.call(model, arg, scheduler).await;
|
||||||
|
let _ = reply_sender.send(reply);
|
||||||
|
};
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
|
reply_receiver.await.map_err(|_| SendError {})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can send a mapped request to a replier port and retrieve a
|
||||||
|
/// mapped response.
|
||||||
|
pub(super) struct MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
query_map: C,
|
||||||
|
reply_map: Arc<D>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
_phantom_query_map: PhantomData<fn(T) -> U>,
|
||||||
|
_phantom_reply_map: PhantomData<fn(Q) -> R>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender<M>) -> Self {
|
||||||
|
Self {
|
||||||
|
query_map,
|
||||||
|
reply_map: Arc::new(reply_map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> U + Send,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let arg = (self.query_map)(arg);
|
||||||
|
let sender = self.sender.clone();
|
||||||
|
let reply_map = self.reply_map.clone();
|
||||||
|
let (reply_sender, reply_receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
Some(Box::pin(async move {
|
||||||
|
sender
|
||||||
|
.send(move |model, scheduler, recycle_box| {
|
||||||
|
let fut = async move {
|
||||||
|
let reply = func.call(model, arg, scheduler).await;
|
||||||
|
let _ = reply_sender.send(reply);
|
||||||
|
};
|
||||||
|
|
||||||
|
coerce_box!(RecycleBox::recycle(recycle_box, fut))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
|
reply_receiver
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
.map(&*reply_map)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An object that can filter and send a mapped request to a replier port and
|
||||||
|
/// retrieve a mapped response.
|
||||||
|
pub(super) struct FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
query_filter_map: C,
|
||||||
|
reply_map: Arc<D>,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
_phantom_query_map: PhantomData<fn(T) -> Option<U>>,
|
||||||
|
_phantom_reply_map: PhantomData<fn(Q) -> R>,
|
||||||
|
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
|
||||||
|
_phantom_closure_marker: PhantomData<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(
|
||||||
|
query_filter_map: C,
|
||||||
|
reply_map: D,
|
||||||
|
func: F,
|
||||||
|
sender: channel::Sender<M>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
query_filter_map,
|
||||||
|
reply_map: Arc::new(reply_map),
|
||||||
|
func,
|
||||||
|
sender,
|
||||||
|
_phantom_query_map: PhantomData,
|
||||||
|
_phantom_reply_map: PhantomData,
|
||||||
|
_phantom_closure: PhantomData,
|
||||||
|
_phantom_closure_marker: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
C: Fn(T) -> Option<U> + Send,
|
||||||
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
|
T: Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
U: Send + 'static,
|
||||||
|
Q: Send + 'static,
|
||||||
|
S: Send,
|
||||||
|
{
|
||||||
|
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
||||||
|
(self.query_filter_map)(arg).map(|arg| {
|
||||||
|
let func = self.func.clone();
|
||||||
|
let sender = self.sender.clone();
|
||||||
|
let reply_map = self.reply_map.clone();
|
||||||
|
let (reply_sender, reply_receiver) = oneshot::channel();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
sender
|
sender
|
||||||
.send(move |model, scheduler, recycle_box| {
|
.send(move |model, scheduler, recycle_box| {
|
||||||
@ -118,7 +383,11 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(|_| SendError {})?;
|
.map_err(|_| SendError {})?;
|
||||||
|
|
||||||
reply_receiver.await.map_err(|_| SendError {})
|
reply_receiver
|
||||||
|
.await
|
||||||
|
.map_err(|_| SendError {})
|
||||||
|
.map(&*reply_map)
|
||||||
|
}) as SenderFuture<R>
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,60 +196,12 @@ impl TaskSet {
|
|||||||
next: AtomicU32::new(SLEEPING),
|
next: AtomicU32::new(SLEEPING),
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to shrink the vector of tasks.
|
// The vector of tasks is never shrunk as this is a fairly costly
|
||||||
//
|
// operation and is not strictly necessary. Typically, inactive tasks
|
||||||
// The main issue when shrinking the vector of tasks is that stale
|
// left at the back of the vector are never waken anyway, and if they
|
||||||
// wakers may still be around and may at any moment be scheduled and
|
// are, they are filtered out by the task iterator.
|
||||||
// insert their task index in the list of scheduled tasks. If it cannot
|
|
||||||
// be guaranteed that this will not happen, then the vector of tasks
|
|
||||||
// cannot be shrunk further, otherwise the iterator for scheduled tasks
|
|
||||||
// will later fail when reaching a task with an invalid index.
|
|
||||||
//
|
|
||||||
// We follow a 2-steps strategy:
|
|
||||||
//
|
|
||||||
// 1) remove all tasks currently in the list of scheduled task and set
|
|
||||||
// them to `SLEEPING` state in case some of them might have an index
|
|
||||||
// that will be invalidated when the vector of tasks is shrunk;
|
|
||||||
//
|
|
||||||
// 2) attempt to iteratively shrink the vector of tasks by removing
|
|
||||||
// tasks starting from the back of the vector:
|
|
||||||
// - If a task is in the `SLEEPING` state, then its `next` pointer is
|
|
||||||
// changed to an arbitrary value other than`SLEEPING`, but the task
|
|
||||||
// is not inserted in the list of scheduled tasks; this way, the
|
|
||||||
// task will be effectively rendered inactive. The task can now be
|
|
||||||
// removed from the vector.
|
|
||||||
// - If a task is found in a non-`SLEEPING` state (meaning that there
|
|
||||||
// was a race and the task was scheduled after step 1) then abandon
|
|
||||||
// further shrinking and leave this task in the vector; the iterator
|
|
||||||
// for scheduled tasks mitigates such situation by only yielding
|
|
||||||
// task indices that are within the expected range.
|
|
||||||
|
|
||||||
// Step 1: unscheduled tasks that may be scheduled.
|
|
||||||
self.discard_scheduled();
|
|
||||||
|
|
||||||
// Step 2: attempt to remove tasks starting at the back of the vector.
|
|
||||||
while self.tasks.len() > len {
|
|
||||||
// There is at least one task since `len()` was non-zero.
|
|
||||||
let task = self.tasks.last().unwrap();
|
|
||||||
|
|
||||||
// Ordering: Relaxed ordering is sufficient since the task is
|
|
||||||
// effectively discarded.
|
|
||||||
if task
|
|
||||||
.next
|
|
||||||
.compare_exchange(SLEEPING, EMPTY, Ordering::Relaxed, Ordering::Relaxed)
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
// The task could not be removed for now so the set of tasks cannot
|
|
||||||
// be shrunk further.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.tasks.pop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `true` if one or more sub-tasks are currently scheduled.
|
/// Returns `true` if one or more sub-tasks are currently scheduled.
|
||||||
@ -271,10 +223,6 @@ impl TaskSet {
|
|||||||
|
|
||||||
waker_ref(&self.tasks[idx])
|
waker_ref(&self.tasks[idx])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn len(&self) -> usize {
|
|
||||||
self.task_count
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internals shared between a `TaskSet` and its associated `Task`s.
|
/// Internals shared between a `TaskSet` and its associated `Task`s.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user