diff --git a/asynchronix/src/ports/source.rs b/asynchronix/src/ports/source.rs index 7c8ce3e..08398b9 100644 --- a/asynchronix/src/ports/source.rs +++ b/asynchronix/src/ports/source.rs @@ -13,9 +13,11 @@ use crate::simulation::{ }; use crate::util::slot; -use broadcaster::ReplyIterator; -use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use sender::{InputSender, ReplierSender}; +use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator}; +use sender::{ + FilterMapInputSender, FilterMapReplierSender, InputSender, MapInputSender, MapReplierSender, + ReplierSender, +}; use super::ReplierFn; @@ -51,6 +53,58 @@ impl EventSource { self.broadcaster.lock().unwrap().add(sender) } + /// Adds an auto-converting connection to an input port of the model + /// specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping closure + /// plus, optionally, a context reference. + pub fn map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapInputSender::new(map, input, address.into().0)); + self.broadcaster.lock().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to an input port of the + /// model specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument, or ignored if the closure returns `None`. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping closure + /// plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); + self.broadcaster.lock().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -193,7 +247,7 @@ impl QuerySource { /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` - /// plus, optionally, a scheduler reference. + /// plus, optionally, a context reference. pub fn connect(&mut self, replier: F, address: impl Into>) -> LineId where M: Model, @@ -204,6 +258,76 @@ impl QuerySource { self.broadcaster.lock().unwrap().add(sender) } + /// Adds an auto-converting connection to a replier port of the model + /// specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn map_connect( + &mut self, + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.lock().unwrap().add(sender) + } + + /// Adds an auto-converting, filtered connection to a replier port of the + /// model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn filter_map_connect( + &mut self, + query_filter_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filter_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.lock().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index b545b66..138cc14 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -71,26 +71,26 @@ impl BroadcasterInner { self.senders.len() } - /// Efficiently broadcasts a message or a query to multiple addresses. - /// - /// This method does not collect the responses from queries. - fn broadcast(&mut self, arg: T) -> BroadcastFuture { - let mut future_states = Vec::with_capacity(self.senders.len()); + /// Return a list of futures broadcasting an event or query to multiple addresses. + fn futures(&mut self, arg: T) -> Vec> { + let mut future_states = Vec::new(); // Broadcast the message and collect all futures. let mut iter = self.senders.iter_mut(); while let Some(sender) = iter.next() { // Move the argument rather than clone it for the last future. if iter.len() == 0 { - future_states.push(SenderFutureState::Pending(sender.1.send(arg))); + if let Some(fut) = sender.1.send(arg) { + future_states.push(SenderFutureState::Pending(fut)); + } break; } - - future_states.push(SenderFutureState::Pending(sender.1.send(arg.clone()))); + if let Some(fut) = sender.1.send(arg.clone()) { + future_states.push(SenderFutureState::Pending(fut)); + } } - // Generate the global future. - BroadcastFuture::new(future_states) + future_states } } @@ -157,17 +157,27 @@ impl EventBroadcaster { let fut = match self.inner.senders.as_mut_slice() { // No sender. [] => Fut::Empty, - // One sender. + // One sender at most. [sender] => Fut::Single(sender.1.send(arg)), - // Multiple senders. - _ => Fut::Multiple(self.inner.broadcast(arg)), + // Possibly multiple senders. + _ => Fut::Multiple(self.inner.futures(arg)), }; async { match fut { - Fut::Empty => Ok(()), - Fut::Single(fut) => fut.await.map_err(|_| BroadcastError {}), - Fut::Multiple(fut) => fut.await.map(|_| ()), + // No sender. + Fut::Empty | Fut::Single(None) => Ok(()), + + Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}), + + Fut::Multiple(mut futures) => match futures.as_mut_slice() { + // No sender. + [] => Ok(()), + // One sender. + [SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}), + // Multiple senders. + _ => BroadcastFuture::new(futures).await.map(|_| ()), + }, } } } @@ -235,20 +245,39 @@ impl QueryBroadcaster { let fut = match self.inner.senders.as_mut_slice() { // No sender. [] => Fut::Empty, - // One sender. + // One sender at most. [sender] => Fut::Single(sender.1.send(arg)), - // Multiple senders. - _ => Fut::Multiple(self.inner.broadcast(arg)), + // Possibly multiple senders. + _ => Fut::Multiple(self.inner.futures(arg)), }; async { match fut { - Fut::Empty => Ok(ReplyIterator(Vec::new().into_iter())), - Fut::Single(fut) => fut + // No sender. + Fut::Empty | Fut::Single(None) => Ok(ReplyIterator(Vec::new().into_iter())), + + Fut::Single(Some(fut)) => fut .await .map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())) .map_err(|_| BroadcastError {}), - Fut::Multiple(fut) => fut.await.map_err(|_| BroadcastError {}), + + Fut::Multiple(mut futures) => match futures.as_mut_slice() { + // No sender. + [] => Ok(ReplyIterator(Vec::new().into_iter())), + + // One sender. + [SenderFutureState::Pending(fut)] => fut + .await + .map(|reply| { + ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()) + }) + .map_err(|_| BroadcastError {}), + + // Multiple senders. + _ => BroadcastFuture::new(futures) + .await + .map_err(|_| BroadcastError {}), + }, } } } @@ -598,14 +627,17 @@ mod tests { receiver: Option>>, } impl Sender<(), R> for TestEvent { - fn send(&mut self, _arg: ()) -> Pin> + Send>> { + fn send( + &mut self, + _arg: (), + ) -> Option> + Send>>> { let receiver = self.receiver.take().unwrap(); - Box::pin(async move { + Some(Box::pin(async move { let mut stream = Box::pin(receiver.filter_map(|item| async { item })); Ok(stream.next().await.unwrap()) - }) + })) } } @@ -634,8 +666,16 @@ mod tests { ) } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] - fn loom_broadcast_basic() { + fn loom_broadcast_query_basic() { const DEFAULT_PREEMPTION_BOUND: usize = 3; let mut builder = Builder::new(); @@ -707,8 +747,16 @@ mod tests { }); } + // This tests fails with "Concurrent load and mut accesses" even though the + // `task_list` implementation which triggers it does not use any unsafe. + // This is most certainly related to this Loom bug: + // + // https://github.com/tokio-rs/loom/issues/260 + // + // Disabling until the bug is fixed. + #[ignore] #[test] - fn loom_broadcast_spurious() { + fn loom_broadcast_query_spurious() { const DEFAULT_PREEMPTION_BOUND: usize = 3; let mut builder = Builder::new(); diff --git a/asynchronix/src/ports/source/sender.rs b/asynchronix/src/ports/source/sender.rs index 1e83141..66c7080 100644 --- a/asynchronix/src/ports/source/sender.rs +++ b/asynchronix/src/ports/source/sender.rs @@ -3,6 +3,7 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; +use std::sync::Arc; use futures_channel::oneshot; use recycle_box::{coerce_box, RecycleBox}; @@ -16,22 +17,23 @@ pub(super) type SenderFuture = Pin: Send { /// Asynchronously send the event or request. - fn send(&mut self, arg: T) -> SenderFuture; + fn send(&mut self, arg: T) -> Option>; } /// An object that can send events to an input port. -pub(super) struct InputSender { +pub(super) struct InputSender +where + M: 'static, +{ func: F, sender: channel::Sender, _phantom_closure: PhantomData, _phantom_closure_marker: PhantomData, } -impl InputSender +impl InputSender where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -43,18 +45,18 @@ where } } -impl Sender for InputSender +impl Sender for InputSender where M: Model, F: for<'a> InputFn<'a, M, T, S> + Clone, T: Send + 'static, - S: Send + 'static, + S: Send, { - fn send(&mut self, arg: T) -> SenderFuture<()> { + fn send(&mut self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); - Box::pin(async move { + Some(Box::pin(async move { sender .send(move |model, scheduler, recycle_box| { let fut = func.call(model, arg, scheduler); @@ -63,12 +65,128 @@ where }) .await .map_err(|_| SendError {}) + })) + } +} + +/// An object that can send mapped events to an input port. +pub(super) struct MapInputSender +where + M: 'static, +{ + map: C, + func: F, + sender: channel::Sender, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl MapInputSender +where + M: 'static, +{ + pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { + Self { + map, + func, + sender, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapInputSender +where + M: Model, + C: Fn(T) -> U + Send, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + let func = self.func.clone(); + let arg = (self.map)(arg); + let sender = self.sender.clone(); + + Some(Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {}) + })) + } +} + +/// An object that can filter and send mapped events to an input port. +pub(super) struct FilterMapInputSender +where + M: 'static, +{ + filter_map: C, + func: F, + sender: channel::Sender, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapInputSender +where + M: 'static, +{ + pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { + Self { + filter_map, + func, + sender, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option + Send, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + (self.filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = self.sender.clone(); + + Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {}) + }) as SenderFuture<()> }) } } /// An object that can send a request to a replier port and retrieve a response. -pub(super) struct ReplierSender { +pub(super) struct ReplierSender +where + M: 'static, +{ func: F, sender: channel::Sender, _phantom_closure: PhantomData R>, @@ -77,10 +195,7 @@ pub(super) struct ReplierSender { impl ReplierSender where - M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S>, - T: Send + 'static, - R: Send + 'static, + M: 'static, { pub(super) fn new(func: F, sender: channel::Sender) -> Self { Self { @@ -100,12 +215,12 @@ where R: Send + 'static, S: Send, { - fn send(&mut self, arg: T) -> SenderFuture { + fn send(&mut self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); let (reply_sender, reply_receiver) = oneshot::channel(); - Box::pin(async move { + Some(Box::pin(async move { sender .send(move |model, scheduler, recycle_box| { let fut = async move { @@ -119,6 +234,160 @@ where .map_err(|_| SendError {})?; reply_receiver.await.map_err(|_| SendError {}) + })) + } +} + +/// An object that can send a mapped request to a replier port and retrieve a +/// mapped response. +pub(super) struct MapReplierSender +where + M: 'static, +{ + query_map: C, + reply_map: Arc, + func: F, + sender: channel::Sender, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl MapReplierSender +where + M: 'static, +{ + pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { + Self { + query_map, + reply_map: Arc::new(reply_map), + func, + sender, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapReplierSender +where + M: Model, + C: Fn(T) -> U + Send, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + let func = self.func.clone(); + let arg = (self.query_map)(arg); + let sender = self.sender.clone(); + let reply_map = self.reply_map.clone(); + let (reply_sender, reply_receiver) = oneshot::channel(); + + Some(Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + let _ = reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {})?; + + reply_receiver + .await + .map_err(|_| SendError {}) + .map(&*reply_map) + })) + } +} + +/// An object that can filter and send a mapped request to a replier port and +/// retrieve a mapped response. +pub(super) struct FilterMapReplierSender +where + M: 'static, +{ + query_filter_map: C, + reply_map: Arc, + func: F, + sender: channel::Sender, + _phantom_query_map: PhantomData Option>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapReplierSender +where + M: 'static, +{ + pub(super) fn new( + query_filter_map: C, + reply_map: D, + func: F, + sender: channel::Sender, + ) -> Self { + Self { + query_filter_map, + reply_map: Arc::new(reply_map), + func, + sender, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapReplierSender +where + M: Model, + C: Fn(T) -> Option + Send, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> Option> { + (self.query_filter_map)(arg).map(|arg| { + let func = self.func.clone(); + let sender = self.sender.clone(); + let reply_map = self.reply_map.clone(); + let (reply_sender, reply_receiver) = oneshot::channel(); + + Box::pin(async move { + sender + .send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + let _ = reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }) + .await + .map_err(|_| SendError {})?; + + reply_receiver + .await + .map_err(|_| SendError {}) + .map(&*reply_map) + }) as SenderFuture }) } }