1
0
forked from ROMEO/nexosim

Add filter_map variants for output port connection

This commit is contained in:
Serge Barral 2024-08-03 11:47:57 +02:00
parent 9a2cfe8e77
commit 0ec781e18b
2 changed files with 332 additions and 127 deletions

View File

@ -12,8 +12,8 @@ use crate::util::cached_rw_lock::CachedRwLock;
use broadcaster::{EventBroadcaster, QueryBroadcaster}; use broadcaster::{EventBroadcaster, QueryBroadcaster};
use self::sender::{ use self::sender::{
EventSinkSender, InputSender, MapEventSinkSender, MapInputSender, MapReplierSender, EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
ReplierSender, MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender,
}; };
/// An output port. /// An output port.
@ -63,8 +63,8 @@ impl<T: Clone + Send + 'static> Output<T> {
/// Adds an auto-converting connection to an input port of the model /// Adds an auto-converting connection to an input port of the model
/// specified by the address. /// specified by the address.
/// ///
/// Events are mapped to the type expected by the input, using the closure /// Events are mapped to another type using the closure provided in
/// provided in argument. /// argument.
/// ///
/// The input port must be an asynchronous method of a model of type `M` /// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping /// taking as argument a value of the type returned by the mapping
@ -90,8 +90,8 @@ impl<T: Clone + Send + 'static> Output<T> {
/// [`EventSlot`](crate::ports::EventSlot) or /// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer). /// [`EventBuffer`](crate::ports::EventBuffer).
/// ///
/// Events are mapped to the type expected by the sink, using the closure /// Events are mapped to another type using the closure provided in
/// provided in argument. /// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId
where where
C: Fn(T) -> U + Send + Sync + 'static, C: Fn(T) -> U + Send + Sync + 'static,
@ -102,6 +102,52 @@ impl<T: Clone + Send + 'static> Output<T> {
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender)
} }
/// Adds an auto-converting, filtered connection to an input port of the
/// model specified by the address.
///
/// Events are mapped to another type using the closure provided in
/// argument, or ignored if the closure returns `None`.
///
/// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a scheduler reference.
pub fn filter_map_connect<M, C, F, U, S>(
&mut self,
filter_map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
U: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapInputSender::new(
filter_map,
input,
address.into().0,
));
self.broadcaster.write().unwrap().add(sender)
}
/// Adds an auto-converting connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer).
///
/// Events are mapped to another type using the closure provided in
/// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId
where
C: Fn(T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static,
S: EventSink<U>,
{
let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer()));
self.broadcaster.write().unwrap().add(sender)
}
/// Removes the connection specified by the `LineId` parameter. /// Removes the connection specified by the `LineId` parameter.
/// ///
/// It is a logic error to specify a line identifier from another /// It is a logic error to specify a line identifier from another
@ -184,8 +230,8 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// Adds an auto-converting connection to a replier port of the model /// Adds an auto-converting connection to a replier port of the model
/// specified by the address. /// specified by the address.
/// ///
/// Queries and replies are mapped to the types expected by the replier port, /// Queries and replies are mapped to other types using the closures
/// using the closures provided in argument. /// provided in argument.
/// ///
/// The replier port must be an asynchronous method of a model of type `M` /// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the second mapping closure and /// returning a value of the type returned by the second mapping closure and

View File

@ -1,6 +1,6 @@
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::{ready, Future};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
use std::pin::Pin; use std::pin::Pin;
@ -180,6 +180,283 @@ where
} }
} }
/// An object that can filter and send mapped events to an input port.
pub(super) struct FilterMapInputSender<M: 'static, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U>,
F: for<'a> InputFn<'a, M, U, S>,
T: Send + 'static,
U: Send + 'static,
{
filter_map: Arc<C>,
func: F,
sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>,
_phantom_map: PhantomData<fn(T) -> U>,
_phantom_closure: PhantomData<fn(&mut M, U)>,
_phantom_closure_marker: PhantomData<S>,
}
impl<M: Send, C, F, T, U, S> FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U>,
F: for<'a> InputFn<'a, M, U, S>,
T: Send + 'static,
U: Send + 'static,
{
pub(super) fn new(filter_map: C, func: F, sender: channel::Sender<M>) -> Self {
Self {
filter_map: Arc::new(filter_map),
func,
sender,
fut_storage: None,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
impl<M: Send, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send + 'static,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let func = self.func.clone();
match (self.filter_map)(arg) {
Some(arg) => {
let fut = self.sender.send(move |model, scheduler, recycle_box| {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
});
RecycledFuture::new(&mut self.fut_storage, async move {
fut.await.map_err(|_| SendError {})
})
}
None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))),
}
}
}
impl<M: Send, C, F, T, U, S> Clone for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(T) -> Option<U>,
F: for<'a> InputFn<'a, M, U, S> + Clone,
T: Send + 'static,
U: Send + 'static,
S: Send + 'static,
{
fn clone(&self) -> Self {
Self {
filter_map: self.filter_map.clone(),
func: self.func.clone(),
sender: self.sender.clone(),
fut_storage: None,
_phantom_map: PhantomData,
_phantom_closure: PhantomData,
_phantom_closure_marker: PhantomData,
}
}
}
/// An object that can send an event to an event sink.
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
writer: W,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T: Send + 'static, W: EventSinkWriter<T>> EventSinkSender<T, W> {
pub(super) fn new(writer: W) -> Self {
Self {
writer,
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, W> Clone for EventSinkSender<T, W>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// An object that can send mapped events to an event sink.
pub(super) struct MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U,
W: EventSinkWriter<U>,
{
writer: W,
map: Arc<C>,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U,
W: EventSinkWriter<U>,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
writer,
map: Arc::new(map),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
map: self.map.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// An object that can filter and send mapped events to an event sink.
pub(super) struct FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U>,
W: EventSinkWriter<U>,
{
writer: W,
filter_map: Arc<C>,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, U, W, C> FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U>,
W: EventSinkWriter<U>,
{
pub(super) fn new(filter_map: C, writer: W) -> Self {
Self {
writer,
filter_map: Arc::new(filter_map),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, U, W, C> Sender<T, ()> for FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U> + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
match (self.filter_map)(arg) {
Some(arg) => RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
}),
None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))),
}
}
}
impl<T, U, W, C> Clone for FilterMapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> Option<U> + Send + Sync,
W: EventSinkWriter<U>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
filter_map: self.filter_map.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// An object that can send requests to a replier port and retrieve responses. /// An object that can send requests to a replier port and retrieve responses.
pub(super) struct ReplierSender<M: 'static, F, T, R, S> { pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
func: F, func: F,
@ -386,124 +663,6 @@ where
} }
} }
/// An object that can send an event to an event sink.
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
writer: W,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T: Send + 'static, W: EventSinkWriter<T>> EventSinkSender<T, W> {
pub(super) fn new(writer: W) -> Self {
Self {
writer,
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, W> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, W> Clone for EventSinkSender<T, W>
where
T: Send + 'static,
W: EventSinkWriter<T>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// An object that can send mapped events to an event sink.
pub(super) struct MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U,
W: EventSinkWriter<U>,
{
writer: W,
map: Arc<C>,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T, U, W, C> MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U,
W: EventSinkWriter<U>,
{
pub(super) fn new(map: C, writer: W) -> Self {
Self {
writer,
map: Arc::new(map),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T, U, W, C> Sender<T, ()> for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let writer = &mut self.writer;
let arg = (self.map)(arg);
RecycledFuture::new(&mut self.fut_storage, async move {
writer.write(arg);
Ok(())
})
}
}
impl<T, U, W, C> Clone for MapEventSinkSender<T, U, W, C>
where
T: Send + 'static,
U: Send + 'static,
C: Fn(T) -> U + Send + Sync,
W: EventSinkWriter<U>,
{
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
map: self.map.clone(),
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
/// Error returned when the mailbox was closed or dropped. /// Error returned when the mailbox was closed or dropped.
#[derive(Debug, PartialEq, Eq, Clone, Copy)] #[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) struct SendError {} pub(super) struct SendError {}