1
0
forked from ROMEO/nexosim

Merge pull request #35 from asynchronics/feature/connect_map

Take message by ref in (filter)map_connect closures
This commit is contained in:
Jauhien Piatlicki 2024-08-16 11:30:10 +02:00 committed by GitHub
commit e75edcbd33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 95 additions and 65 deletions

View File

@ -78,7 +78,7 @@ impl<T: Clone + Send + 'static> Output<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
@ -95,7 +95,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
@ -120,7 +120,7 @@ impl<T: Clone + Send + 'static> Output<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
@ -141,7 +141,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
where
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
@ -247,7 +247,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
@ -282,7 +282,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,

View File

@ -98,13 +98,13 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
while let Some(sender) = iter.next() {
// Move the argument rather than clone it for the last future.
if iter.len() == 0 {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
futures.push(fut);
}
break;
}
if let Some(fut) = sender.1.send(arg.clone()) {
if let Some(fut) = sender.1.send(&arg) {
futures.push(fut);
}
}
@ -190,7 +190,7 @@ impl<T: Clone> EventBroadcaster<T> {
[] => Ok(()),
// One sender at most.
[sender] => match sender.1.send(arg) {
[sender] => match sender.1.send_owned(arg) {
None => Ok(()),
Some(fut) => fut.await.map_err(|_| BroadcastError {}),
},
@ -267,7 +267,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
// One sender at most.
[sender] => {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
let output = fut.await.map_err(|_| BroadcastError {})?;
self.inner.shared.outputs[0] = Some(output);
@ -667,7 +667,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
SumModel::increment,
address,
));
@ -802,7 +802,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|x| 3 * x,
DoubleModel::double,
address,
@ -917,7 +917,7 @@ mod tests {
fut_storage: Option<RecycleBox<()>>,
}
impl<R: Send> Sender<(), R> for TestEvent<R> {
fn send(&mut self, _arg: ()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, _arg: &()) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let fut_storage = &mut self.fut_storage;
let receiver = &mut self.receiver;

View File

@ -17,8 +17,13 @@ use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
/// An event or query sender abstracting over the target model and input or
/// replier method.
pub(super) trait Sender<T, R>: DynClone + Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
/// Asynchronously sends a message using a reference to the message.
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>>;
/// Asynchronously sends an owned message.
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send(&arg)
}
}
dyn_clone::clone_trait_object!(<T, R> Sender<T, R>);
@ -54,10 +59,14 @@ impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let func = self.func.clone();
let fut = self.sender.send(move |model, scheduler, recycle_box| {
@ -122,13 +131,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let func = self.func.clone();
let arg = (self.map)(arg);
@ -196,13 +205,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();
@ -256,10 +265,14 @@ impl<T, W> EventSinkSender<T, W> {
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
T: Clone + Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;
Some(RecycledFuture::new(&mut self.fut_storage, async move {
@ -283,7 +296,7 @@ impl<T, W: Clone> Clone for EventSinkSender<T, W> {
/// An object that can send mapped events to an event sink.
pub(super) struct MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
{
writer: W,
map: Arc<C>,
@ -293,7 +306,7 @@ where
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
@ -309,10 +322,10 @@ impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);
@ -326,7 +339,7 @@ where
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> U,
C: Fn(&T) -> U,
W: Clone,
{
fn clone(&self) -> Self {
@ -342,7 +355,7 @@ where
/// An object that can filter and send mapped events to an event sink.
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
{
writer: W,
filter_map: Arc<C>,
@ -352,7 +365,7 @@ where
impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
{
pub(super) fn new(filter_map: C, writer: W) -> Self {
Self {
@ -368,10 +381,10 @@ impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<(), SendError>>> {
let writer = &mut self.writer;
(self.filter_map)(arg).map(|arg| {
@ -386,7 +399,7 @@ where
impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
where
C: Fn(T) -> Option<U>,
C: Fn(&T) -> Option<U>,
W: Clone,
{
fn clone(&self) -> Self {
@ -432,11 +445,15 @@ impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let func = self.func.clone();
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
@ -525,7 +542,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
C: Fn(&T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
@ -534,7 +551,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = &mut self.sender;
@ -638,7 +655,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
C: Fn(&T) -> Option<U> + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
@ -647,7 +664,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
fn send(&mut self, arg: &T) -> Option<RecycledFuture<'_, Result<R, SendError>>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = &mut self.sender;

View File

@ -70,7 +70,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
@ -96,7 +96,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
@ -277,7 +277,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
@ -312,7 +312,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,

View File

@ -79,14 +79,14 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
// Broadcast the message and collect all futures.
let mut iter = self.senders.iter_mut();
while let Some(sender) = iter.next() {
// Move the argument rather than clone it for the last future.
// Move the argument for the last future to avoid undue cloning.
if iter.len() == 0 {
if let Some(fut) = sender.1.send(arg) {
if let Some(fut) = sender.1.send_owned(arg) {
future_states.push(SenderFutureState::Pending(fut));
}
break;
}
if let Some(fut) = sender.1.send(arg.clone()) {
if let Some(fut) = sender.1.send(&arg) {
future_states.push(SenderFutureState::Pending(fut));
}
}
@ -159,7 +159,7 @@ impl<T: Clone + Send> EventBroadcaster<T> {
// No sender.
[] => Fut::Empty,
// One sender at most.
[sender] => Fut::Single(sender.1.send(arg)),
[sender] => Fut::Single(sender.1.send_owned(arg)),
// Possibly multiple senders.
_ => Fut::Multiple(self.inner.futures(arg)),
};
@ -247,7 +247,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
// No sender.
[] => Fut::Empty,
// One sender at most.
[sender] => Fut::Single(sender.1.send(arg)),
[sender] => Fut::Single(sender.1.send_owned(arg)),
// Possibly multiple senders.
_ => Fut::Multiple(self.inner.futures(arg)),
};
@ -569,7 +569,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
SumModel::increment,
address,
));
@ -704,7 +704,7 @@ mod tests {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
move |x: &usize| (*x == id || *x == BROADCAST_ALL).then_some(*x),
|x| 3 * x,
DoubleModel::double,
address,
@ -821,7 +821,7 @@ mod tests {
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
fn send(
&mut self,
_arg: (),
_arg: &(),
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
let receiver = self.receiver.take().unwrap();

View File

@ -16,8 +16,13 @@ pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendErro
/// An event or query sender abstracting over the target model and input method.
pub(super) trait Sender<T, R>: Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> Option<SenderFuture<R>>;
/// Asynchronously sends a message using a reference to the message.
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>>;
/// Asynchronously sends an owned message.
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> {
self.send(&arg)
}
}
/// An object that can send events to an input port.
@ -49,10 +54,14 @@ impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<()>> {
let func = self.func.clone();
let sender = self.sender.clone();
@ -101,13 +110,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U + Send,
C: Fn(&T) -> U + Send,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
let func = self.func.clone();
let arg = (self.map)(arg);
let sender = self.sender.clone();
@ -157,13 +166,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send,
C: Fn(&T) -> Option<U> + Send,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = self.sender.clone();
@ -211,11 +220,15 @@ impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Send + 'static,
T: Clone + Send + 'static,
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> {
let func = self.func.clone();
let sender = self.sender.clone();
let (reply_sender, reply_receiver) = oneshot::channel();
@ -275,7 +288,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U + Send,
C: Fn(&T) -> U + Send,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
@ -284,7 +297,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = self.sender.clone();
@ -354,7 +367,7 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send,
C: Fn(&T) -> Option<U> + Send,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
@ -363,7 +376,7 @@ where
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = self.sender.clone();