diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index 9d88269..c98d60f 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -12,8 +12,8 @@ use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; use self::sender::{ - EventSinkSender, InputSender, MapEventSinkSender, MapInputSender, MapReplierSender, - ReplierSender, + EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender, + MapEventSinkSender, MapInputSender, MapReplierSender, ReplierSender, }; /// An output port. @@ -63,8 +63,8 @@ impl Output { /// Adds an auto-converting connection to an input port of the model /// specified by the address. /// - /// Events are mapped to the type expected by the input, using the closure - /// provided in argument. + /// Events are mapped to another type using the closure provided in + /// argument. /// /// The input port must be an asynchronous method of a model of type `M` /// taking as argument a value of the type returned by the mapping @@ -90,8 +90,8 @@ impl Output { /// [`EventSlot`](crate::ports::EventSlot) or /// [`EventBuffer`](crate::ports::EventBuffer). /// - /// Events are mapped to the type expected by the sink, using the closure - /// provided in argument. + /// Events are mapped to another type using the closure provided in + /// argument. pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId where C: Fn(T) -> U + Send + Sync + 'static, @@ -102,6 +102,52 @@ impl Output { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting, filtered connection to an input port of the + /// model specified by the address. + /// + /// Events are mapped to another type using the closure provided in + /// argument, or ignored if the closure returns `None`. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping + /// closure plus, optionally, a scheduler reference. + pub fn filter_map_connect( + &mut self, + filter_map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> Option + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapInputSender::new( + filter_map, + input, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting connection to an event sink such as an + /// [`EventSlot`](crate::ports::EventSlot) or + /// [`EventBuffer`](crate::ports::EventBuffer). + /// + /// Events are mapped to another type using the closure provided in + /// argument. + pub fn filter_map_connect_sink(&mut self, filter_map: C, sink: &S) -> LineId + where + C: Fn(T) -> Option + Send + Sync + 'static, + U: Send + 'static, + S: EventSink, + { + let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer())); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -184,8 +230,8 @@ impl Requestor { /// Adds an auto-converting connection to a replier port of the model /// specified by the address. /// - /// Queries and replies are mapped to the types expected by the replier port, - /// using the closures provided in argument. + /// Queries and replies are mapped to other types using the closures + /// provided in argument. /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of the type returned by the second mapping closure and diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 48dda09..66b45e5 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -1,6 +1,6 @@ use std::error::Error; use std::fmt; -use std::future::Future; +use std::future::{ready, Future}; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; @@ -180,6 +180,283 @@ where } } +/// An object that can filter and send mapped events to an input port. +pub(super) struct FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + filter_map: Arc, + func: F, + sender: channel::Sender, + fut_storage: Option>, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + pub(super) fn new(filter_map: C, func: F, sender: channel::Sender) -> Self { + Self { + filter_map: Arc::new(filter_map), + func, + sender, + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let func = self.func.clone(); + + match (self.filter_map)(arg) { + Some(arg) => { + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + } + None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), + } + } +} + +impl Clone for FilterMapInputSender +where + M: Model, + C: Fn(T) -> Option, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + filter_map: self.filter_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send an event to an event sink. +pub(super) struct EventSinkSender> +where + T: Send + 'static, + W: EventSinkWriter, +{ + writer: W, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl> EventSinkSender { + pub(super) fn new(writer: W) -> Self { + Self { + writer, + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for EventSinkSender +where + T: Send + 'static, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + } +} + +impl Clone for EventSinkSender +where + T: Send + 'static, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can send mapped events to an event sink. +pub(super) struct MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + writer: W, + map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + pub(super) fn new(map: C, writer: W) -> Self { + Self { + writer, + map: Arc::new(map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + let arg = (self.map)(arg); + + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + } +} + +impl Clone for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + map: self.map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +/// An object that can filter and send mapped events to an event sink. +pub(super) struct FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option, + W: EventSinkWriter, +{ + writer: W, + filter_map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option, + W: EventSinkWriter, +{ + pub(super) fn new(filter_map: C, writer: W) -> Self { + Self { + writer, + filter_map: Arc::new(filter_map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + + match (self.filter_map)(arg) { + Some(arg) => RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }), + None => RecycledFuture::new(&mut self.fut_storage, ready(Ok(()))), + } + } +} + +impl Clone for FilterMapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> Option + Send + Sync, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + filter_map: self.filter_map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + /// An object that can send requests to a replier port and retrieve responses. pub(super) struct ReplierSender { func: F, @@ -386,124 +663,6 @@ where } } -/// An object that can send an event to an event sink. -pub(super) struct EventSinkSender> -where - T: Send + 'static, - W: EventSinkWriter, -{ - writer: W, - fut_storage: Option>, - _phantom_event: PhantomData, -} - -impl> EventSinkSender { - pub(super) fn new(writer: W) -> Self { - Self { - writer, - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - -impl Sender for EventSinkSender -where - T: Send + 'static, - W: EventSinkWriter, -{ - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let writer = &mut self.writer; - - RecycledFuture::new(&mut self.fut_storage, async move { - writer.write(arg); - - Ok(()) - }) - } -} - -impl Clone for EventSinkSender -where - T: Send + 'static, - W: EventSinkWriter, -{ - fn clone(&self) -> Self { - Self { - writer: self.writer.clone(), - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - -/// An object that can send mapped events to an event sink. -pub(super) struct MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U, - W: EventSinkWriter, -{ - writer: W, - map: Arc, - fut_storage: Option>, - _phantom_event: PhantomData, -} - -impl MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U, - W: EventSinkWriter, -{ - pub(super) fn new(map: C, writer: W) -> Self { - Self { - writer, - map: Arc::new(map), - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - -impl Sender for MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U + Send + Sync, - W: EventSinkWriter, -{ - fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { - let writer = &mut self.writer; - let arg = (self.map)(arg); - - RecycledFuture::new(&mut self.fut_storage, async move { - writer.write(arg); - - Ok(()) - }) - } -} - -impl Clone for MapEventSinkSender -where - T: Send + 'static, - U: Send + 'static, - C: Fn(T) -> U + Send + Sync, - W: EventSinkWriter, -{ - fn clone(&self) -> Self { - Self { - writer: self.writer.clone(), - map: self.map.clone(), - fut_storage: None, - _phantom_event: PhantomData, - } - } -} - /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {}