forked from ROMEO/nexosim
Take message ref in (filter)map_connect closures
This avoids preemptive cloning when the closures don't consume the message, which is common when the filtering closure returns `None`.
This commit is contained in:
parent
1f3e04e796
commit
1b1db5e0b8
@ -78,7 +78,7 @@ impl<T: Clone + Send + 'static> Output<T> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send + Sync + 'static,
|
C: Fn(&T) -> U + Send + Sync + 'static,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
@ -95,7 +95,7 @@ impl<T: Clone + Send + 'static> Output<T> {
|
|||||||
/// argument.
|
/// argument.
|
||||||
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
|
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
|
||||||
where
|
where
|
||||||
C: Fn(T) -> U + Send + Sync + 'static,
|
C: Fn(&T) -> U + Send + Sync + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: EventSink<U>,
|
S: EventSink<U>,
|
||||||
{
|
{
|
||||||
@ -120,7 +120,7 @@ impl<T: Clone + Send + 'static> Output<T> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
@ -141,7 +141,7 @@ impl<T: Clone + Send + 'static> Output<T> {
|
|||||||
/// argument.
|
/// argument.
|
||||||
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
|
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
|
||||||
where
|
where
|
||||||
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: EventSink<U>,
|
S: EventSink<U>,
|
||||||
{
|
{
|
||||||
@ -247,7 +247,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send + Sync + 'static,
|
C: Fn(&T) -> U + Send + Sync + 'static,
|
||||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
@ -282,7 +282,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send + Sync + 'static,
|
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
|
||||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
|
@ -98,13 +98,13 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
while let Some(sender) = iter.next() {
|
while let Some(sender) = iter.next() {
|
||||||
// Move the argument rather than clone it for the last future.
|
// Move the argument rather than clone it for the last future.
|
||||||
if iter.len() == 0 {
|
if iter.len() == 0 {
|
||||||
if let Some(fut) = sender.1.send(arg) {
|
if let Some(fut) = sender.1.send_owned(arg) {
|
||||||
futures.push(fut);
|
futures.push(fut);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(fut) = sender.1.send(arg.clone()) {
|
if let Some(fut) = sender.1.send(&arg) {
|
||||||
futures.push(fut);
|
futures.push(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -190,7 +190,7 @@ impl<T: Clone> EventBroadcaster<T> {
|
|||||||
[] => Ok(()),
|
[] => Ok(()),
|
||||||
|
|
||||||
// One sender at most.
|
// One sender at most.
|
||||||
[sender] => match sender.1.send(arg) {
|
[sender] => match sender.1.send_owned(arg) {
|
||||||
None => Ok(()),
|
None => Ok(()),
|
||||||
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
|
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
|
||||||
},
|
},
|
||||||
@ -267,7 +267,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
|
|||||||
|
|
||||||
// One sender at most.
|
// One sender at most.
|
||||||
[sender] => {
|
[sender] => {
|
||||||
if let Some(fut) = sender.1.send(arg) {
|
if let Some(fut) = sender.1.send_owned(arg) {
|
||||||
let output = fut.await.map_err(|_| BroadcastError {})?;
|
let output = fut.await.map_err(|_| BroadcastError {})?;
|
||||||
self.inner.shared.outputs[0] = Some(output);
|
self.inner.shared.outputs[0] = Some(output);
|
||||||
|
|
||||||
@ -667,7 +667,7 @@ mod tests {
|
|||||||
let mailbox = Receiver::new(10);
|
let mailbox = Receiver::new(10);
|
||||||
let address = mailbox.sender();
|
let address = mailbox.sender();
|
||||||
let id_filter_sender = Box::new(FilterMapInputSender::new(
|
let id_filter_sender = Box::new(FilterMapInputSender::new(
|
||||||
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|
||||||
SumModel::increment,
|
SumModel::increment,
|
||||||
address,
|
address,
|
||||||
));
|
));
|
||||||
@ -802,7 +802,7 @@ mod tests {
|
|||||||
let mailbox = Receiver::new(10);
|
let mailbox = Receiver::new(10);
|
||||||
let address = mailbox.sender();
|
let address = mailbox.sender();
|
||||||
let sender = Box::new(FilterMapReplierSender::new(
|
let sender = Box::new(FilterMapReplierSender::new(
|
||||||
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|
||||||
|x| 3 * x,
|
|x| 3 * x,
|
||||||
DoubleModel::double,
|
DoubleModel::double,
|
||||||
address,
|
address,
|
||||||
@ -917,7 +917,7 @@ mod tests {
|
|||||||
fut_storage: Option<RecycleBox<()>>,
|
fut_storage: Option<RecycleBox<()>>,
|
||||||
}
|
}
|
||||||
impl<R: Send> Sender<(), R> for TestEvent<R> {
|
impl<R: Send> Sender<(), R> for TestEvent<R> {
|
||||||
fn send(&mut self, _arg: ()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
fn send(&mut self, _arg: &()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let fut_storage = &mut self.fut_storage;
|
let fut_storage = &mut self.fut_storage;
|
||||||
let receiver = &mut self.receiver;
|
let receiver = &mut self.receiver;
|
||||||
|
|
||||||
|
@ -17,8 +17,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
|
|||||||
/// An event or query sender abstracting over the target model and input or
|
/// An event or query sender abstracting over the target model and input or
|
||||||
/// replier method.
|
/// replier method.
|
||||||
pub(super) trait Sender<T, R>: DynClone + Send {
|
pub(super) trait Sender<T, R>: DynClone + Send {
|
||||||
/// Asynchronously send the event or request.
|
/// Asynchronously sends a message using a reference to the message.
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
|
||||||
|
|
||||||
|
/// Asynchronously sends an owned message.
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
|
self.send(&arg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
|
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
|
||||||
@ -54,10 +59,14 @@ impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
|
|||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Clone + Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
self.send_owned(arg.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
|
|
||||||
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
let fut = self.sender.send(move |model, scheduler, recycle_box| {
|
||||||
@ -122,13 +131,13 @@ where
|
|||||||
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
|
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send + Sync,
|
C: Fn(&T) -> U + Send + Sync,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let arg = (self.map)(arg);
|
let arg = (self.map)(arg);
|
||||||
|
|
||||||
@ -196,13 +205,13 @@ where
|
|||||||
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
|
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send + Sync,
|
C: Fn(&T) -> Option<U> + Send + Sync,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
(self.filter_map)(arg).map(|arg| {
|
(self.filter_map)(arg).map(|arg| {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
|
|
||||||
@ -256,10 +265,14 @@ impl<T, W> EventSinkSender<T, W> {
|
|||||||
|
|
||||||
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
|
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
|
||||||
where
|
where
|
||||||
T: Send + 'static,
|
T: Clone + Send + 'static,
|
||||||
W: EventSinkWriter<T>,
|
W: EventSinkWriter<T>,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
|
self.send_owned(arg.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let writer = &mut self.writer;
|
let writer = &mut self.writer;
|
||||||
|
|
||||||
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
Some(RecycledFuture::new(&mut self.fut_storage, async move {
|
||||||
@ -283,7 +296,7 @@ impl<T, W: Clone> Clone for EventSinkSender<T, W> {
|
|||||||
/// An object that can send mapped events to an event sink.
|
/// An object that can send mapped events to an event sink.
|
||||||
pub(super) struct MapEventSinkSender<T, U, W, C>
|
pub(super) struct MapEventSinkSender<T, U, W, C>
|
||||||
where
|
where
|
||||||
C: Fn(T) -> U,
|
C: Fn(&T) -> U,
|
||||||
{
|
{
|
||||||
writer: W,
|
writer: W,
|
||||||
map: Arc<C>,
|
map: Arc<C>,
|
||||||
@ -293,7 +306,7 @@ where
|
|||||||
|
|
||||||
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
|
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
|
||||||
where
|
where
|
||||||
C: Fn(T) -> U,
|
C: Fn(&T) -> U,
|
||||||
{
|
{
|
||||||
pub(super) fn new(map: C, writer: W) -> Self {
|
pub(super) fn new(map: C, writer: W) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -309,10 +322,10 @@ impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
|
|||||||
where
|
where
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
C: Fn(T) -> U + Send + Sync,
|
C: Fn(&T) -> U + Send + Sync,
|
||||||
W: EventSinkWriter<U>,
|
W: EventSinkWriter<U>,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let writer = &mut self.writer;
|
let writer = &mut self.writer;
|
||||||
let arg = (self.map)(arg);
|
let arg = (self.map)(arg);
|
||||||
|
|
||||||
@ -326,7 +339,7 @@ where
|
|||||||
|
|
||||||
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
|
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
|
||||||
where
|
where
|
||||||
C: Fn(T) -> U,
|
C: Fn(&T) -> U,
|
||||||
W: Clone,
|
W: Clone,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
@ -342,7 +355,7 @@ where
|
|||||||
/// An object that can filter and send mapped events to an event sink.
|
/// An object that can filter and send mapped events to an event sink.
|
||||||
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
|
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
|
||||||
where
|
where
|
||||||
C: Fn(T) -> Option<U>,
|
C: Fn(&T) -> Option<U>,
|
||||||
{
|
{
|
||||||
writer: W,
|
writer: W,
|
||||||
filter_map: Arc<C>,
|
filter_map: Arc<C>,
|
||||||
@ -352,7 +365,7 @@ where
|
|||||||
|
|
||||||
impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
|
impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
|
||||||
where
|
where
|
||||||
C: Fn(T) -> Option<U>,
|
C: Fn(&T) -> Option<U>,
|
||||||
{
|
{
|
||||||
pub(super) fn new(filter_map: C, writer: W) -> Self {
|
pub(super) fn new(filter_map: C, writer: W) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -368,10 +381,10 @@ impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
|
|||||||
where
|
where
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
C: Fn(T) -> Option<U> + Send + Sync,
|
C: Fn(&T) -> Option<U> + Send + Sync,
|
||||||
W: EventSinkWriter<U>,
|
W: EventSinkWriter<U>,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
|
||||||
let writer = &mut self.writer;
|
let writer = &mut self.writer;
|
||||||
|
|
||||||
(self.filter_map)(arg).map(|arg| {
|
(self.filter_map)(arg).map(|arg| {
|
||||||
@ -386,7 +399,7 @@ where
|
|||||||
|
|
||||||
impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
|
impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
|
||||||
where
|
where
|
||||||
C: Fn(T) -> Option<U>,
|
C: Fn(&T) -> Option<U>,
|
||||||
W: Clone,
|
W: Clone,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
@ -432,11 +445,15 @@ impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
|
|||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Clone + Send + 'static,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
|
self.send_owned(arg.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = &mut self.sender;
|
let sender = &mut self.sender;
|
||||||
let reply_receiver = &mut self.receiver;
|
let reply_receiver = &mut self.receiver;
|
||||||
@ -525,7 +542,7 @@ where
|
|||||||
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send + Sync,
|
C: Fn(&T) -> U + Send + Sync,
|
||||||
D: Fn(Q) -> R + Send + Sync,
|
D: Fn(Q) -> R + Send + Sync,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
@ -534,7 +551,7 @@ where
|
|||||||
Q: Send + 'static,
|
Q: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let arg = (self.query_map)(arg);
|
let arg = (self.query_map)(arg);
|
||||||
let sender = &mut self.sender;
|
let sender = &mut self.sender;
|
||||||
@ -638,7 +655,7 @@ where
|
|||||||
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send + Sync,
|
C: Fn(&T) -> Option<U> + Send + Sync,
|
||||||
D: Fn(Q) -> R + Send + Sync,
|
D: Fn(Q) -> R + Send + Sync,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
@ -647,7 +664,7 @@ where
|
|||||||
Q: Send + 'static,
|
Q: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
|
||||||
(self.query_filter_map)(arg).map(|arg| {
|
(self.query_filter_map)(arg).map(|arg| {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = &mut self.sender;
|
let sender = &mut self.sender;
|
||||||
|
@ -70,7 +70,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send + 'static,
|
C: for<'a> Fn(&'a T) -> U + Send + 'static,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
@ -96,7 +96,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send + 'static,
|
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
@ -277,7 +277,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send + 'static,
|
C: for<'a> Fn(&'a T) -> U + Send + 'static,
|
||||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
@ -312,7 +312,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
|
|||||||
) -> LineId
|
) -> LineId
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send + 'static,
|
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
|
||||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
|
@ -79,14 +79,14 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
|
|||||||
// Broadcast the message and collect all futures.
|
// Broadcast the message and collect all futures.
|
||||||
let mut iter = self.senders.iter_mut();
|
let mut iter = self.senders.iter_mut();
|
||||||
while let Some(sender) = iter.next() {
|
while let Some(sender) = iter.next() {
|
||||||
// Move the argument rather than clone it for the last future.
|
// Move the argument for the last future to avoid undue cloning.
|
||||||
if iter.len() == 0 {
|
if iter.len() == 0 {
|
||||||
if let Some(fut) = sender.1.send(arg) {
|
if let Some(fut) = sender.1.send_owned(arg) {
|
||||||
future_states.push(SenderFutureState::Pending(fut));
|
future_states.push(SenderFutureState::Pending(fut));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if let Some(fut) = sender.1.send(arg.clone()) {
|
if let Some(fut) = sender.1.send(&arg) {
|
||||||
future_states.push(SenderFutureState::Pending(fut));
|
future_states.push(SenderFutureState::Pending(fut));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -159,7 +159,7 @@ impl<T: Clone + Send> EventBroadcaster<T> {
|
|||||||
// No sender.
|
// No sender.
|
||||||
[] => Fut::Empty,
|
[] => Fut::Empty,
|
||||||
// One sender at most.
|
// One sender at most.
|
||||||
[sender] => Fut::Single(sender.1.send(arg)),
|
[sender] => Fut::Single(sender.1.send_owned(arg)),
|
||||||
// Possibly multiple senders.
|
// Possibly multiple senders.
|
||||||
_ => Fut::Multiple(self.inner.futures(arg)),
|
_ => Fut::Multiple(self.inner.futures(arg)),
|
||||||
};
|
};
|
||||||
@ -247,7 +247,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
|
|||||||
// No sender.
|
// No sender.
|
||||||
[] => Fut::Empty,
|
[] => Fut::Empty,
|
||||||
// One sender at most.
|
// One sender at most.
|
||||||
[sender] => Fut::Single(sender.1.send(arg)),
|
[sender] => Fut::Single(sender.1.send_owned(arg)),
|
||||||
// Possibly multiple senders.
|
// Possibly multiple senders.
|
||||||
_ => Fut::Multiple(self.inner.futures(arg)),
|
_ => Fut::Multiple(self.inner.futures(arg)),
|
||||||
};
|
};
|
||||||
@ -569,7 +569,7 @@ mod tests {
|
|||||||
let mailbox = Receiver::new(10);
|
let mailbox = Receiver::new(10);
|
||||||
let address = mailbox.sender();
|
let address = mailbox.sender();
|
||||||
let id_filter_sender = Box::new(FilterMapInputSender::new(
|
let id_filter_sender = Box::new(FilterMapInputSender::new(
|
||||||
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|
||||||
SumModel::increment,
|
SumModel::increment,
|
||||||
address,
|
address,
|
||||||
));
|
));
|
||||||
@ -704,7 +704,7 @@ mod tests {
|
|||||||
let mailbox = Receiver::new(10);
|
let mailbox = Receiver::new(10);
|
||||||
let address = mailbox.sender();
|
let address = mailbox.sender();
|
||||||
let sender = Box::new(FilterMapReplierSender::new(
|
let sender = Box::new(FilterMapReplierSender::new(
|
||||||
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|
||||||
|x| 3 * x,
|
|x| 3 * x,
|
||||||
DoubleModel::double,
|
DoubleModel::double,
|
||||||
address,
|
address,
|
||||||
@ -821,7 +821,7 @@ mod tests {
|
|||||||
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
|
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
|
||||||
fn send(
|
fn send(
|
||||||
&mut self,
|
&mut self,
|
||||||
_arg: (),
|
_arg: &(),
|
||||||
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
|
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
|
||||||
let receiver = self.receiver.take().unwrap();
|
let receiver = self.receiver.take().unwrap();
|
||||||
|
|
||||||
|
@ -16,8 +16,13 @@ pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendErro
|
|||||||
|
|
||||||
/// An event or query sender abstracting over the target model and input method.
|
/// An event or query sender abstracting over the target model and input method.
|
||||||
pub(super) trait Sender<T, R>: Send {
|
pub(super) trait Sender<T, R>: Send {
|
||||||
/// Asynchronously send the event or request.
|
/// Asynchronously sends a message using a reference to the message.
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<R>>;
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>>;
|
||||||
|
|
||||||
|
/// Asynchronously sends an owned message.
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
||||||
|
self.send(&arg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An object that can send events to an input port.
|
/// An object that can send events to an input port.
|
||||||
@ -49,10 +54,14 @@ impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
|
|||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Clone + Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
|
||||||
|
self.send_owned(arg.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
|
|
||||||
@ -101,13 +110,13 @@ where
|
|||||||
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
|
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send,
|
C: Fn(&T) -> U + Send,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let arg = (self.map)(arg);
|
let arg = (self.map)(arg);
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
@ -157,13 +166,13 @@ where
|
|||||||
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
|
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send,
|
C: Fn(&T) -> Option<U> + Send,
|
||||||
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
F: for<'a> InputFn<'a, M, U, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
U: Send + 'static,
|
U: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
|
||||||
(self.filter_map)(arg).map(|arg| {
|
(self.filter_map)(arg).map(|arg| {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
@ -211,11 +220,15 @@ impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
|
|||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Clone + Send + 'static,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
|
||||||
|
self.send_owned(arg.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
let (reply_sender, reply_receiver) = oneshot::channel();
|
let (reply_sender, reply_receiver) = oneshot::channel();
|
||||||
@ -275,7 +288,7 @@ where
|
|||||||
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> U + Send,
|
C: Fn(&T) -> U + Send,
|
||||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
@ -284,7 +297,7 @@ where
|
|||||||
Q: Send + 'static,
|
Q: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let arg = (self.query_map)(arg);
|
let arg = (self.query_map)(arg);
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
@ -354,7 +367,7 @@ where
|
|||||||
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
|
||||||
where
|
where
|
||||||
M: Model,
|
M: Model,
|
||||||
C: Fn(T) -> Option<U> + Send,
|
C: Fn(&T) -> Option<U> + Send,
|
||||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
@ -363,7 +376,7 @@ where
|
|||||||
Q: Send + 'static,
|
Q: Send + 'static,
|
||||||
S: Send,
|
S: Send,
|
||||||
{
|
{
|
||||||
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
|
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
|
||||||
(self.query_filter_map)(arg).map(|arg| {
|
(self.query_filter_map)(arg).map(|arg| {
|
||||||
let func = self.func.clone();
|
let func = self.func.clone();
|
||||||
let sender = self.sender.clone();
|
let sender = self.sender.clone();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user