diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index f2c04d7..25aed9e 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -78,7 +78,7 @@ impl Output { ) -> LineId where M: Model, - C: Fn(T) -> U + Send + Sync + 'static, + C: Fn(&T) -> U + Send + Sync + 'static, F: for<'a> InputFn<'a, M, U, S> + Clone, U: Send + 'static, S: Send + 'static, @@ -95,7 +95,7 @@ impl Output { /// argument. pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId where - C: Fn(T) -> U + Send + Sync + 'static, + C: Fn(&T) -> U + Send + Sync + 'static, U: Send + 'static, S: EventSink, { @@ -120,7 +120,7 @@ impl Output { ) -> LineId where M: Model, - C: Fn(T) -> Option + Send + Sync + 'static, + C: Fn(&T) -> Option + Send + Sync + 'static, F: for<'a> InputFn<'a, M, U, S> + Clone, U: Send + 'static, S: Send + 'static, @@ -141,7 +141,7 @@ impl Output { /// argument. pub fn filter_map_connect_sink(&mut self, filter_map: C, sink: &S) -> LineId where - C: Fn(T) -> Option + Send + Sync + 'static, + C: Fn(&T) -> Option + Send + Sync + 'static, U: Send + 'static, S: EventSink, { @@ -247,7 +247,7 @@ impl Requestor { ) -> LineId where M: Model, - C: Fn(T) -> U + Send + Sync + 'static, + C: Fn(&T) -> U + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, U: Send + 'static, @@ -282,7 +282,7 @@ impl Requestor { ) -> LineId where M: Model, - C: Fn(T) -> Option + Send + Sync + 'static, + C: Fn(&T) -> Option + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, U: Send + 'static, diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 9611c8f..5592406 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -98,13 +98,13 @@ impl BroadcasterInner { while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - if let Some(fut) = sender.1.send(arg) { + if let Some(fut) = sender.1.send_owned(arg) { futures.push(fut); } break; } - if let Some(fut) = sender.1.send(arg.clone()) { + if let Some(fut) = sender.1.send(&arg) { futures.push(fut); } } @@ -190,7 +190,7 @@ impl EventBroadcaster { [] => Ok(()), // One sender at most. - [sender] => match sender.1.send(arg) { + [sender] => match sender.1.send_owned(arg) { None => Ok(()), Some(fut) => fut.await.map_err(|_| BroadcastError {}), }, @@ -267,7 +267,7 @@ impl QueryBroadcaster { // One sender at most. [sender] => { - if let Some(fut) = sender.1.send(arg) { + if let Some(fut) = sender.1.send_owned(arg) { let output = fut.await.map_err(|_| BroadcastError {})?; self.inner.shared.outputs[0] = Some(output); @@ -667,7 +667,7 @@ mod tests { let mailbox = Receiver::new(10); let address = mailbox.sender(); let id_filter_sender = Box::new(FilterMapInputSender::new( - move |x| (x == id || x == BROADCAST_ALL).then_some(x), + move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x), SumModel::increment, address, )); @@ -802,7 +802,7 @@ mod tests { let mailbox = Receiver::new(10); let address = mailbox.sender(); let sender = Box::new(FilterMapReplierSender::new( - move |x| (x == id || x == BROADCAST_ALL).then_some(x), + move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x), |x| 3 * x, DoubleModel::double, address, @@ -917,7 +917,7 @@ mod tests { fut_storage: Option>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> Option>> { + fn send(&mut self, _arg: &()) -> Option>> { let fut_storage = &mut self.fut_storage; let receiver = &mut self.receiver; diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 9ddb9c7..b6eb963 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -17,8 +17,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; /// An event or query sender abstracting over the target model and input or /// replier method. pub(super) trait Sender: DynClone + Send { - /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> Option>>; + /// Asynchronously sends a message using a reference to the message. + fn send(&mut self, arg: &T) -> Option>>; + + /// Asynchronously sends an owned message. + fn send_owned(&mut self, arg: T) -> Option>> { + self.send(&arg) + } } dyn_clone::clone_trait_object!( Sender); @@ -54,10 +59,14 @@ impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + 'static, + T: Clone + Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { + self.send_owned(arg.clone()) + } + + fn send_owned(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let fut = self.sender.send(move |model, scheduler, recycle_box| { @@ -122,13 +131,13 @@ where impl Sender for MapInputSender where M: Model, - C: Fn(T) -> U + Send + Sync, + C: Fn(&T) -> U + Send + Sync, F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { let func = self.func.clone(); let arg = (self.map)(arg); @@ -196,13 +205,13 @@ where impl Sender for FilterMapInputSender where M: Model, - C: Fn(T) -> Option + Send + Sync, + C: Fn(&T) -> Option + Send + Sync, F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { (self.filter_map)(arg).map(|arg| { let func = self.func.clone(); @@ -256,10 +265,14 @@ impl EventSinkSender { impl Sender for EventSinkSender where - T: Send + 'static, + T: Clone + Send + 'static, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { + self.send_owned(arg.clone()) + } + + fn send_owned(&mut self, arg: T) -> Option>> { let writer = &mut self.writer; Some(RecycledFuture::new(&mut self.fut_storage, async move { @@ -283,7 +296,7 @@ impl Clone for EventSinkSender { /// An object that can send mapped events to an event sink. pub(super) struct MapEventSinkSender where - C: Fn(T) -> U, + C: Fn(&T) -> U, { writer: W, map: Arc, @@ -293,7 +306,7 @@ where impl MapEventSinkSender where - C: Fn(T) -> U, + C: Fn(&T) -> U, { pub(super) fn new(map: C, writer: W) -> Self { Self { @@ -309,10 +322,10 @@ impl Sender for MapEventSinkSender where T: Send + 'static, U: Send + 'static, - C: Fn(T) -> U + Send + Sync, + C: Fn(&T) -> U + Send + Sync, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { let writer = &mut self.writer; let arg = (self.map)(arg); @@ -326,7 +339,7 @@ where impl Clone for MapEventSinkSender where - C: Fn(T) -> U, + C: Fn(&T) -> U, W: Clone, { fn clone(&self) -> Self { @@ -342,7 +355,7 @@ where /// An object that can filter and send mapped events to an event sink. pub(super) struct FilterMapEventSinkSender where - C: Fn(T) -> Option, + C: Fn(&T) -> Option, { writer: W, filter_map: Arc, @@ -352,7 +365,7 @@ where impl FilterMapEventSinkSender where - C: Fn(T) -> Option, + C: Fn(&T) -> Option, { pub(super) fn new(filter_map: C, writer: W) -> Self { Self { @@ -368,10 +381,10 @@ impl Sender for FilterMapEventSinkSender where T: Send + 'static, U: Send + 'static, - C: Fn(T) -> Option + Send + Sync, + C: Fn(&T) -> Option + Send + Sync, W: EventSinkWriter, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { let writer = &mut self.writer; (self.filter_map)(arg).map(|arg| { @@ -386,7 +399,7 @@ where impl Clone for FilterMapEventSinkSender where - C: Fn(T) -> Option, + C: Fn(&T) -> Option, W: Clone, { fn clone(&self) -> Self { @@ -432,11 +445,15 @@ impl Sender for ReplierSender where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, - T: Send + 'static, + T: Clone + Send + 'static, R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { + self.send_owned(arg.clone()) + } + + fn send_owned(&mut self, arg: T) -> Option>> { let func = self.func.clone(); let sender = &mut self.sender; let reply_receiver = &mut self.receiver; @@ -525,7 +542,7 @@ where impl Sender for MapReplierSender where M: Model, - C: Fn(T) -> U + Send + Sync, + C: Fn(&T) -> U + Send + Sync, D: Fn(Q) -> R + Send + Sync, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, T: Send + 'static, @@ -534,7 +551,7 @@ where Q: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { let func = self.func.clone(); let arg = (self.query_map)(arg); let sender = &mut self.sender; @@ -638,7 +655,7 @@ where impl Sender for FilterMapReplierSender where M: Model, - C: Fn(T) -> Option + Send + Sync, + C: Fn(&T) -> Option + Send + Sync, D: Fn(Q) -> R + Send + Sync, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, T: Send + 'static, @@ -647,7 +664,7 @@ where Q: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option>> { + fn send(&mut self, arg: &T) -> Option>> { (self.query_filter_map)(arg).map(|arg| { let func = self.func.clone(); let sender = &mut self.sender; diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 08398b9..faab6e5 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -70,7 +70,7 @@ impl EventSource { ) -> LineId where M: Model, - C: Fn(T) -> U + Send + 'static, + C: for<'a> Fn(&'a T) -> U + Send + 'static, F: for<'a> InputFn<'a, M, U, S> + Clone, U: Send + 'static, S: Send + 'static, @@ -96,7 +96,7 @@ impl EventSource { ) -> LineId where M: Model, - C: Fn(T) -> Option + Send + 'static, + C: for<'a> Fn(&'a T) -> Option + Send + 'static, F: for<'a> InputFn<'a, M, U, S> + Clone, U: Send + 'static, S: Send + 'static, @@ -277,7 +277,7 @@ impl QuerySource { ) -> LineId where M: Model, - C: Fn(T) -> U + Send + 'static, + C: for<'a> Fn(&'a T) -> U + Send + 'static, D: Fn(Q) -> R + Send + Sync + 'static, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, U: Send + 'static, @@ -312,7 +312,7 @@ impl QuerySource { ) -> LineId where M: Model, - C: Fn(T) -> Option + Send + 'static, + C: for<'a> Fn(&'a T) -> Option + Send + 'static, D: Fn(Q) -> R + Send + Sync + 'static, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, U: Send + 'static, diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 4291a54..63576d8 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -79,14 +79,14 @@ impl BroadcasterInner { // Broadcast the message and collect all futures. let mut iter = self.senders.iter_mut(); while let Some(sender) = iter.next() { - // Move the argument rather than clone it for the last future. + // Move the argument for the last future to avoid undue cloning. if iter.len() == 0 { - if let Some(fut) = sender.1.send(arg) { + if let Some(fut) = sender.1.send_owned(arg) { future_states.push(SenderFutureState::Pending(fut)); } break; } - if let Some(fut) = sender.1.send(arg.clone()) { + if let Some(fut) = sender.1.send(&arg) { future_states.push(SenderFutureState::Pending(fut)); } } @@ -159,7 +159,7 @@ impl EventBroadcaster { // No sender. [] => Fut::Empty, // One sender at most. - [sender] => Fut::Single(sender.1.send(arg)), + [sender] => Fut::Single(sender.1.send_owned(arg)), // Possibly multiple senders. _ => Fut::Multiple(self.inner.futures(arg)), }; @@ -247,7 +247,7 @@ impl QueryBroadcaster { // No sender. [] => Fut::Empty, // One sender at most. - [sender] => Fut::Single(sender.1.send(arg)), + [sender] => Fut::Single(sender.1.send_owned(arg)), // Possibly multiple senders. _ => Fut::Multiple(self.inner.futures(arg)), }; @@ -569,7 +569,7 @@ mod tests { let mailbox = Receiver::new(10); let address = mailbox.sender(); let id_filter_sender = Box::new(FilterMapInputSender::new( - move |x| (x == id || x == BROADCAST_ALL).then_some(x), + move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x), SumModel::increment, address, )); @@ -704,7 +704,7 @@ mod tests { let mailbox = Receiver::new(10); let address = mailbox.sender(); let sender = Box::new(FilterMapReplierSender::new( - move |x| (x == id || x == BROADCAST_ALL).then_some(x), + move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x), |x| 3 * x, DoubleModel::double, address, @@ -821,7 +821,7 @@ mod tests { impl Sender<(), R> for TestEvent { fn send( &mut self, - _arg: (), + _arg: &(), ) -> Option> + Send>>> { let receiver = self.receiver.take().unwrap(); diff --git a/asynchronix/src/ports/source/sender.rs b/asynchronix/src/ports/source/sender.rs index 66c7080..0edc865 100644 --- a/asynchronix/src/ports/source/sender.rs +++ b/asynchronix/src/ports/source/sender.rs @@ -16,8 +16,13 @@ pub(super) type SenderFuture = Pin: Send { - /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> Option>; + /// Asynchronously sends a message using a reference to the message. + fn send(&mut self, arg: &T) -> Option>; + + /// Asynchronously sends an owned message. + fn send_owned(&mut self, arg: T) -> Option> { + self.send(&arg) + } } /// An object that can send events to an input port. @@ -49,10 +54,14 @@ impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + 'static, + T: Clone + Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option> { + fn send(&mut self, arg: &T) -> Option> { + self.send_owned(arg.clone()) + } + + fn send_owned(&mut self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); @@ -101,13 +110,13 @@ where impl Sender for MapInputSender where M: Model, - C: Fn(T) -> U + Send, + C: Fn(&T) -> U + Send, F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option> { + fn send(&mut self, arg: &T) -> Option> { let func = self.func.clone(); let arg = (self.map)(arg); let sender = self.sender.clone(); @@ -157,13 +166,13 @@ where impl Sender for FilterMapInputSender where M: Model, - C: Fn(T) -> Option + Send, + C: Fn(&T) -> Option + Send, F: for<'a> InputFn<'a, M, U, S> + Clone, T: Send + 'static, U: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option> { + fn send(&mut self, arg: &T) -> Option> { (self.filter_map)(arg).map(|arg| { let func = self.func.clone(); let sender = self.sender.clone(); @@ -211,11 +220,15 @@ impl Sender for ReplierSender where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, - T: Send + 'static, + T: Clone + Send + 'static, R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option> { + fn send(&mut self, arg: &T) -> Option> { + self.send_owned(arg.clone()) + } + + fn send_owned(&mut self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); let (reply_sender, reply_receiver) = oneshot::channel(); @@ -275,7 +288,7 @@ where impl Sender for MapReplierSender where M: Model, - C: Fn(T) -> U + Send, + C: Fn(&T) -> U + Send, D: Fn(Q) -> R + Send + Sync + 'static, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, T: Send + 'static, @@ -284,7 +297,7 @@ where Q: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option> { + fn send(&mut self, arg: &T) -> Option> { let func = self.func.clone(); let arg = (self.query_map)(arg); let sender = self.sender.clone(); @@ -354,7 +367,7 @@ where impl Sender for FilterMapReplierSender where M: Model, - C: Fn(T) -> Option + Send, + C: Fn(&T) -> Option + Send, D: Fn(Q) -> R + Send + Sync + 'static, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, T: Send + 'static, @@ -363,7 +376,7 @@ where Q: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> Option> { + fn send(&mut self, arg: &T) -> Option> { (self.query_filter_map)(arg).map(|arg| { let func = self.func.clone(); let sender = self.sender.clone();