1
0
forked from ROMEO/nexosim

Add map/filter_map variants for source connection

This commit is contained in:
Serge Barral 2024-08-03 19:43:10 +02:00
parent 3527d62b41
commit 7f244d2334
3 changed files with 489 additions and 48 deletions

View File

@ -13,9 +13,11 @@ use crate::simulation::{
};
use crate::util::slot;
use broadcaster::ReplyIterator;
use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::{InputSender, ReplierSender};
use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator};
use sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, MapInputSender, MapReplierSender,
ReplierSender,
};
use super::ReplierFn;
@ -51,6 +53,58 @@ impl<T: Clone + Send + 'static> EventSource<T> {
self.broadcaster.lock().unwrap().add(sender)
}
/// Adds an auto-converting connection to an input port of the model
/// specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping closure
/// plus, optionally, a context reference.
pub fn map_connect<M, C, F, U, S>(
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender)
}
/// Adds an auto-converting, filtered connection to an input port of the
/// model specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument, or ignored if the closure returns `None`.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping closure
/// plus, optionally, a context reference.
pub fn filter_map_connect<M, C, F, U, S>(
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
@ -193,7 +247,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of type `R` and taking as argument a value of type `T`
/// plus, optionally, a scheduler reference.
/// plus, optionally, a context reference.
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId
where
M: Model,
@ -204,6 +258,76 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
self.broadcaster.lock().unwrap().add(sender)
}
/// Adds an auto-converting connection to a replier port of the model
/// specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.lock().unwrap().add(sender)
}
/// Adds an auto-converting, filtered connection to a replier port of the
/// model specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument, or ignored if the query closure returns `None`.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn filter_map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_filter_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filter_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.lock().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another

View File

@ -71,26 +71,26 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
self.senders.len()
}
/// Efficiently broadcasts a message or a query to multiple addresses.
///
/// This method does not collect the responses from queries.
fn broadcast(&mut self, arg: T) -> BroadcastFuture<R> {
let mut future_states = Vec::with_capacity(self.senders.len());
/// Return a list of futures broadcasting an event or query to multiple addresses.
fn futures(&mut self, arg: T) -> Vec<SenderFutureState<R>> {
let mut future_states = Vec::new();
// Broadcast the message and collect all futures.
let mut iter = self.senders.iter_mut();
while let Some(sender) = iter.next() {
// Move the argument rather than clone it for the last future.
if iter.len() == 0 {
future_states.push(SenderFutureState::Pending(sender.1.send(arg)));
if let Some(fut) = sender.1.send(arg) {
future_states.push(SenderFutureState::Pending(fut));
}
break;
}
future_states.push(SenderFutureState::Pending(sender.1.send(arg.clone())));
if let Some(fut) = sender.1.send(arg.clone()) {
future_states.push(SenderFutureState::Pending(fut));
}
}
// Generate the global future.
BroadcastFuture::new(future_states)
future_states
}
}
@ -157,17 +157,27 @@ impl<T: Clone + Send> EventBroadcaster<T> {
let fut = match self.inner.senders.as_mut_slice() {
// No sender.
[] => Fut::Empty,
// One sender.
// One sender at most.
[sender] => Fut::Single(sender.1.send(arg)),
// Multiple senders.
_ => Fut::Multiple(self.inner.broadcast(arg)),
// Possibly multiple senders.
_ => Fut::Multiple(self.inner.futures(arg)),
};
async {
match fut {
Fut::Empty => Ok(()),
Fut::Single(fut) => fut.await.map_err(|_| BroadcastError {}),
Fut::Multiple(fut) => fut.await.map(|_| ()),
// No sender.
Fut::Empty | Fut::Single(None) => Ok(()),
Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}),
Fut::Multiple(mut futures) => match futures.as_mut_slice() {
// No sender.
[] => Ok(()),
// One sender.
[SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}),
// Multiple senders.
_ => BroadcastFuture::new(futures).await.map(|_| ()),
},
}
}
}
@ -235,20 +245,39 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
let fut = match self.inner.senders.as_mut_slice() {
// No sender.
[] => Fut::Empty,
// One sender.
// One sender at most.
[sender] => Fut::Single(sender.1.send(arg)),
// Multiple senders.
_ => Fut::Multiple(self.inner.broadcast(arg)),
// Possibly multiple senders.
_ => Fut::Multiple(self.inner.futures(arg)),
};
async {
match fut {
Fut::Empty => Ok(ReplyIterator(Vec::new().into_iter())),
Fut::Single(fut) => fut
// No sender.
Fut::Empty | Fut::Single(None) => Ok(ReplyIterator(Vec::new().into_iter())),
Fut::Single(Some(fut)) => fut
.await
.map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()))
.map_err(|_| BroadcastError {}),
Fut::Multiple(fut) => fut.await.map_err(|_| BroadcastError {}),
Fut::Multiple(mut futures) => match futures.as_mut_slice() {
// No sender.
[] => Ok(ReplyIterator(Vec::new().into_iter())),
// One sender.
[SenderFutureState::Pending(fut)] => fut
.await
.map(|reply| {
ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())
})
.map_err(|_| BroadcastError {}),
// Multiple senders.
_ => BroadcastFuture::new(futures)
.await
.map_err(|_| BroadcastError {}),
},
}
}
}
@ -598,14 +627,17 @@ mod tests {
receiver: Option<mpsc::UnboundedReceiver<Option<R>>>,
}
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
fn send(&mut self, _arg: ()) -> Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>> {
fn send(
&mut self,
_arg: (),
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
let receiver = self.receiver.take().unwrap();
Box::pin(async move {
Some(Box::pin(async move {
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
Ok(stream.next().await.unwrap())
})
}))
}
}
@ -634,8 +666,16 @@ mod tests {
)
}
// This tests fails with "Concurrent load and mut accesses" even though the
// `task_list` implementation which triggers it does not use any unsafe.
// This is most certainly related to this Loom bug:
//
// https://github.com/tokio-rs/loom/issues/260
//
// Disabling until the bug is fixed.
#[ignore]
#[test]
fn loom_broadcast_basic() {
fn loom_broadcast_query_basic() {
const DEFAULT_PREEMPTION_BOUND: usize = 3;
let mut builder = Builder::new();
@ -707,8 +747,16 @@ mod tests {
});
}
// This tests fails with "Concurrent load and mut accesses" even though the
// `task_list` implementation which triggers it does not use any unsafe.
// This is most certainly related to this Loom bug:
//
// https://github.com/tokio-rs/loom/issues/260
//
// Disabling until the bug is fixed.
#[ignore]
#[test]
fn loom_broadcast_spurious() {
fn loom_broadcast_query_spurious() {
const DEFAULT_PREEMPTION_BOUND: usize = 3;
let mut builder = Builder::new();

View File

@ -3,6 +3,7 @@ use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use futures_channel::oneshot;
use recycle_box::{coerce_box, RecycleBox};
@ -16,22 +17,23 @@ pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendErro
/// An event or query sender abstracting over the target model and input method.
pub(super) trait Sender<T, R>: Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> SenderFuture<R>;
fn send(&mut self, arg: T) -> Option<SenderFuture<R>>;
}
/// An object that can send events to an input port.
pub(super) struct InputSender<M: 'static, F, T, S> {
pub(super) struct InputSender<M, F, T, S>
where
M: 'static,
{
func: F,
sender: channel::Sender<M>,
_phantom_closure: PhantomData<fn(&mut M, T)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M: Send, F, T, S> InputSender<M, F, T, S>
impl<M, F, T, S> InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + 'static,
M: 'static,
{
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
Self {
@ -43,14 +45,126 @@ where
}
}
impl<M: Send, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
S: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> SenderFuture<()> {
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
let func = self.func.clone();
let sender = self.sender.clone();
Some(Box::pin(async move {
sender
.send(move |model, scheduler, recycle_box| {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
})
.await
.map_err(|_| SendError {})
}))
}
}
/// An object that can send mapped events to an input port.
pub(super) struct MapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
map: C,
func: F,
sender: channel::Sender<M>,
_phantom_map: PhantomData<fn(T) -> U>,
_phantom_closure: PhantomData<fn(&mut M, T)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, F, T, U, S> MapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
pub(super) fn new(map: C, func: F, sender: channel::Sender<M>) -> Self {
Self {
map,
func,
sender,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U + Send,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
let func = self.func.clone();
let arg = (self.map)(arg);
let sender = self.sender.clone();
Some(Box::pin(async move {
sender
.send(move |model, scheduler, recycle_box| {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
})
.await
.map_err(|_| SendError {})
}))
}
}
/// An object that can filter and send mapped events to an input port.
pub(super) struct FilterMapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
filter_map: C,
func: F,
sender: channel::Sender<M>,
_phantom_map: PhantomData<fn(T) -> U>,
_phantom_closure: PhantomData<fn(&mut M, T)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, F, T, U, S> FilterMapInputSender<M, C, F, T, U, S>
where
M: 'static,
{
pub(super) fn new(filter_map: C, func: F, sender: channel::Sender<M>) -> Self {
Self {
filter_map,
func,
sender,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<()>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = self.sender.clone();
@ -63,12 +177,16 @@ where
})
.await
.map_err(|_| SendError {})
}) as SenderFuture<()>
})
}
}
/// An object that can send a request to a replier port and retrieve a response.
pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
pub(super) struct ReplierSender<M, F, T, R, S>
where
M: 'static,
{
func: F,
sender: channel::Sender<M>,
_phantom_closure: PhantomData<fn(&mut M, T) -> R>,
@ -77,10 +195,7 @@ pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>,
T: Send + 'static,
R: Send + 'static,
M: 'static,
{
pub(super) fn new(func: F, sender: channel::Sender<M>) -> Self {
Self {
@ -100,11 +215,161 @@ where
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> SenderFuture<R> {
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
let func = self.func.clone();
let sender = self.sender.clone();
let (reply_sender, reply_receiver) = oneshot::channel();
Some(Box::pin(async move {
sender
.send(move |model, scheduler, recycle_box| {
let fut = async move {
let reply = func.call(model, arg, scheduler).await;
let _ = reply_sender.send(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
})
.await
.map_err(|_| SendError {})?;
reply_receiver.await.map_err(|_| SendError {})
}))
}
}
/// An object that can send a mapped request to a replier port and retrieve a
/// mapped response.
pub(super) struct MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: 'static,
{
query_map: C,
reply_map: Arc<D>,
func: F,
sender: channel::Sender<M>,
_phantom_query_map: PhantomData<fn(T) -> U>,
_phantom_reply_map: PhantomData<fn(Q) -> R>,
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, D, F, T, R, U, Q, S> MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: 'static,
{
pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender<M>) -> Self {
Self {
query_map,
reply_map: Arc::new(reply_map),
func,
sender,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U + Send,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = self.sender.clone();
let reply_map = self.reply_map.clone();
let (reply_sender, reply_receiver) = oneshot::channel();
Some(Box::pin(async move {
sender
.send(move |model, scheduler, recycle_box| {
let fut = async move {
let reply = func.call(model, arg, scheduler).await;
let _ = reply_sender.send(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
})
.await
.map_err(|_| SendError {})?;
reply_receiver
.await
.map_err(|_| SendError {})
.map(&*reply_map)
}))
}
}
/// An object that can filter and send a mapped request to a replier port and
/// retrieve a mapped response.
pub(super) struct FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: 'static,
{
query_filter_map: C,
reply_map: Arc<D>,
func: F,
sender: channel::Sender<M>,
_phantom_query_map: PhantomData<fn(T) -> Option<U>>,
_phantom_reply_map: PhantomData<fn(Q) -> R>,
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, D, F, T, R, U, Q, S> FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: 'static,
{
pub(super) fn new(
query_filter_map: C,
reply_map: D,
func: F,
sender: channel::Sender<M>,
) -> Self {
Self {
query_filter_map,
reply_map: Arc::new(reply_map),
func,
sender,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> Option<SenderFuture<R>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = self.sender.clone();
let reply_map = self.reply_map.clone();
let (reply_sender, reply_receiver) = oneshot::channel();
Box::pin(async move {
sender
.send(move |model, scheduler, recycle_box| {
@ -118,7 +383,11 @@ where
.await
.map_err(|_| SendError {})?;
reply_receiver.await.map_err(|_| SendError {})
reply_receiver
.await
.map_err(|_| SendError {})
.map(&*reply_map)
}) as SenderFuture<R>
})
}
}