1
0
forked from ROMEO/nexosim

Add support for mapped connections from ports

This commit is contained in:
Serge Barral 2024-08-02 18:53:07 +02:00
parent d9099c4bfa
commit 9a2cfe8e77
2 changed files with 360 additions and 4 deletions

View File

@ -11,7 +11,10 @@ use crate::util::cached_rw_lock::CachedRwLock;
use broadcaster::{EventBroadcaster, QueryBroadcaster}; use broadcaster::{EventBroadcaster, QueryBroadcaster};
use self::sender::{EventSinkSender, InputSender, ReplierSender}; use self::sender::{
EventSinkSender, InputSender, MapEventSinkSender, MapInputSender, MapReplierSender,
ReplierSender,
};
/// An output port. /// An output port.
/// ///
@ -57,6 +60,48 @@ impl<T: Clone + Send + 'static> Output<T> {
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender)
} }
/// Adds an auto-converting connection to an input port of the model
/// specified by the address.
///
/// Events are mapped to the type expected by the input, using the closure
/// provided in argument.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a scheduler reference.
pub fn map_connect<M, C, F, U, S>(
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.write().unwrap().add(sender)
}
/// Adds an auto-converting connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
///
/// Events are mapped to the type expected by the sink, using the closure
/// provided in argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where
C: Fn(T) -> U + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
let sender = Box::new(MapEventSinkSender::new(map, sink.writer()));
self.broadcaster.write().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter. /// Removes the connection specified by the `LineId` parameter.
/// ///
/// It is a logic error to specify a line identifier from another /// It is a logic error to specify a line identifier from another
@ -136,6 +181,41 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender)
} }
/// Adds an auto-converting connection to a replier port of the model
/// specified by the address.
///
/// Queries and replies are mapped to the types expected by the replier port,
/// using the closures provided in argument.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the second mapping closure and
/// taking as argument a value of the type returned by the first mapping
/// closure plus, optionally, a scheduler reference.
pub fn map_connect<M, C, D, F, U, Q, S>(
&mut self,
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter. /// Removes the connection specified by the `LineId` parameter.
/// ///
/// It is a logic error to specify a line identifier from another /// It is a logic error to specify a line identifier from another

View File

@ -4,6 +4,7 @@ use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use dyn_clone::DynClone; use dyn_clone::DynClone;
@ -93,7 +94,93 @@ where
} }
} }
/// An object that can send a request to a replier port and retrieve a response. /// An object that can send mapped events to an input port.
pub(super) struct MapInputSender<M: 'static, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U,
F: for<'a> InputFn<'a, M, U, S>,
T: Send + 'static,
U: Send + 'static,
{
map: Arc<C>,
func: F,
sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>,
_phantom_map: PhantomData<fn(T) -> U>,
_phantom_closure: PhantomData<fn(&mut M, U)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M: Send, C, F, T, U, S> MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U,
F: for<'a> InputFn<'a, M, U, S>,
T: Send + 'static,
U: Send + 'static,
{
pub(super) fn new(map: C, func: F, sender: channel::Sender<M>) -> Self {
Self {
map: Arc::new(map),
func,
sender,
fut_storage: None,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M: Send, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send + 'static,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let func = self.func.clone();
let arg = (self.map)(arg);
let fut = self.sender.send(move |model, scheduler, recycle_box| {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(&mut self.fut_storage, async move {
fut.await.map_err(|_| SendError {})
})
}
}
impl<M: Send, C, F, T, U, S> Clone for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> U,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send + 'static,
{
fn clone(&self) -> Self {
Self {
map: self.map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
fut_storage: None,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
/// An object that can send requests to a replier port and retrieve responses.
pub(super) struct ReplierSender<M: 'static, F, T, R, S> { pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
func: F, func: F,
sender: channel::Sender<M>, sender: channel::Sender<M>,
@ -181,8 +268,130 @@ where
} }
} }
/// An object that can send a payload to an event sink. /// An object that can send mapped requests to a replier port and retrieve
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>> { /// mapped responses.
pub(super) struct MapReplierSender<M: 'static, C, D, F, T, R, U, Q, S> {
query_map: Arc<C>,
reply_map: Arc<D>,
func: F,
sender: channel::Sender<M>,
receiver: multishot::Receiver<Q>,
fut_storage: Option<RecycleBox<()>>,
_phantom_query_map: PhantomData<fn(T) -> U>,
_phantom_reply_map: PhantomData<fn(Q) -> R>,
_phantom_closure: PhantomData<fn(&mut M, U) -> Q>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M, C, D, F, T, R, U, Q, S> MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U,
D: Fn(Q) -> R,
F: for<'a> ReplierFn<'a, M, U, Q, S>,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
{
pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender<M>) -> Self {
Self {
query_map: Arc::new(query_map),
reply_map: Arc::new(reply_map),
func,
sender,
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
let fut_storage = &mut self.fut_storage;
let reply_map = &*self.reply_map;
// The previous future generated by this method should have been polled
// to completion so a new sender should be readily available.
let reply_sender = reply_receiver.sender().unwrap();
let send_fut = sender.send(move |model, scheduler, recycle_box| {
let fut = async move {
let reply = func.call(model, arg, scheduler).await;
reply_sender.send(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(fut_storage, async move {
// Send the message.
send_fut.await.map_err(|_| SendError {})?;
// Wait until the message is processed and the reply is sent back.
// If an error is received, it most likely means the mailbox was
// dropped before the message was processed.
reply_receiver
.recv()
.await
.map_err(|_| SendError {})
.map(reply_map)
})
}
}
impl<M, C, D, F, T, R, U, Q, S> Clone for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(T) -> U,
D: Fn(Q) -> R,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
{
fn clone(&self) -> Self {
Self {
query_map: self.query_map.clone(),
reply_map: self.reply_map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
receiver: multishot::Receiver::new(),
fut_storage: None,
_phantom_query_map: PhantomData,
_phantom_reply_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
/// An object that can send an event to an event sink.
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
writer: W, writer: W,
fut_storage: Option<RecycleBox<()>>, fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>, _phantom_event: PhantomData<T>,
@ -228,6 +437,73 @@ where
} }
} }
/// An object that can send mapped events to an event sink.
pub(super) struct MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U,
W: EventSinkWriter<U>,
{
writer: W,
map: Arc<C>,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U,
W: EventSinkWriter<U>,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
writer,
map: Arc::new(map),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
map: self.map.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// Error returned when the mailbox was closed or dropped. /// Error returned when the mailbox was closed or dropped.
#[derive(Debug, PartialEq, Eq, Clone, Copy)] #[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) struct SendError {} pub(super) struct SendError {}