From 9a2cfe8e77da780c6ffa3cbdac070c8bf815c951 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Fri, 2 Aug 2024 18:53:07 +0200 Subject: [PATCH] Add support for mapped connections from ports --- asynchronix/src/ports/output.rs | 82 ++++++- asynchronix/src/ports/output/sender.rs | 282 ++++++++++++++++++++++++- 2 files changed, 360 insertions(+), 4 deletions(-) diff --git a/asynchronix/src/ports/output.rs b/asynchronix/src/ports/output.rs index d5599fd..9d88269 100644 --- a/asynchronix/src/ports/output.rs +++ b/asynchronix/src/ports/output.rs @@ -11,7 +11,10 @@ use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use self::sender::{EventSinkSender, InputSender, ReplierSender}; +use self::sender::{ + EventSinkSender, InputSender, MapEventSinkSender, MapInputSender, MapReplierSender, + ReplierSender, +}; /// An output port. /// @@ -57,6 +60,48 @@ impl Output { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting connection to an input port of the model + /// specified by the address. + /// + /// Events are mapped to the type expected by the input, using the closure + /// provided in argument. + /// + /// The input port must be an asynchronous method of a model of type `M` + /// taking as argument a value of the type returned by the mapping + /// closure plus, optionally, a scheduler reference. + pub fn map_connect( + &mut self, + map: C, + input: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone, + U: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapInputSender::new(map, input, address.into().0)); + self.broadcaster.write().unwrap().add(sender) + } + + /// Adds an auto-converting connection to an event sink such as an + /// [`EventSlot`](crate::ports::EventSlot) or + /// [`EventBuffer`](crate::ports::EventBuffer). + /// + /// Events are mapped to the type expected by the sink, using the closure + /// provided in argument. + pub fn map_connect_sink(&mut self, map: C, sink: &S) -> LineId + where + C: Fn(T) -> U + Send + Sync + 'static, + U: Send + 'static, + S: EventSink, + { + let sender = Box::new(MapEventSinkSender::new(map, sink.writer())); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another @@ -136,6 +181,41 @@ impl Requestor { self.broadcaster.write().unwrap().add(sender) } + /// Adds an auto-converting connection to a replier port of the model + /// specified by the address. + /// + /// Queries and replies are mapped to the types expected by the replier port, + /// using the closures provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the second mapping closure and + /// taking as argument a value of the type returned by the first mapping + /// closure plus, optionally, a scheduler reference. + pub fn map_connect( + &mut self, + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> LineId + where + M: Model, + C: Fn(T) -> U + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + self.broadcaster.write().unwrap().add(sender) + } + /// Removes the connection specified by the `LineId` parameter. /// /// It is a logic error to specify a line identifier from another diff --git a/asynchronix/src/ports/output/sender.rs b/asynchronix/src/ports/output/sender.rs index 1c9ab02..48dda09 100644 --- a/asynchronix/src/ports/output/sender.rs +++ b/asynchronix/src/ports/output/sender.rs @@ -4,6 +4,7 @@ use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use dyn_clone::DynClone; @@ -93,7 +94,93 @@ where } } -/// An object that can send a request to a replier port and retrieve a response. +/// An object that can send mapped events to an input port. +pub(super) struct MapInputSender +where + M: Model, + C: Fn(T) -> U, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + map: Arc, + func: F, + sender: channel::Sender, + fut_storage: Option>, + _phantom_map: PhantomData U>, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, +} + +impl MapInputSender +where + M: Model, + C: Fn(T) -> U, + F: for<'a> InputFn<'a, M, U, S>, + T: Send + 'static, + U: Send + 'static, +{ + pub(super) fn new(map: C, func: F, sender: channel::Sender) -> Self { + Self { + map: Arc::new(map), + func, + sender, + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapInputSender +where + M: Model, + C: Fn(T) -> U + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let func = self.func.clone(); + let arg = (self.map)(arg); + + let fut = self.sender.send(move |model, scheduler, recycle_box| { + let fut = func.call(model, arg, scheduler); + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(&mut self.fut_storage, async move { + fut.await.map_err(|_| SendError {}) + }) + } +} + +impl Clone for MapInputSender +where + M: Model, + C: Fn(T) -> U, + F: for<'a> InputFn<'a, M, U, S> + Clone, + T: Send + 'static, + U: Send + 'static, + S: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + map: self.map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + fut_storage: None, + _phantom_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send requests to a replier port and retrieve responses. pub(super) struct ReplierSender { func: F, sender: channel::Sender, @@ -181,8 +268,130 @@ where } } -/// An object that can send a payload to an event sink. -pub(super) struct EventSinkSender> { +/// An object that can send mapped requests to a replier port and retrieve +/// mapped responses. +pub(super) struct MapReplierSender { + query_map: Arc, + reply_map: Arc, + func: F, + sender: channel::Sender, + receiver: multishot::Receiver, + fut_storage: Option>, + _phantom_query_map: PhantomData U>, + _phantom_reply_map: PhantomData R>, + _phantom_closure: PhantomData Q>, + _phantom_closure_marker: PhantomData, +} + +impl MapReplierSender +where + M: Model, + C: Fn(T) -> U, + D: Fn(Q) -> R, + F: for<'a> ReplierFn<'a, M, U, Q, S>, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, +{ + pub(super) fn new(query_map: C, reply_map: D, func: F, sender: channel::Sender) -> Self { + Self { + query_map: Arc::new(query_map), + reply_map: Arc::new(reply_map), + func, + sender, + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +impl Sender for MapReplierSender +where + M: Model, + C: Fn(T) -> U + Send + Sync, + D: Fn(Q) -> R + Send + Sync, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, + S: Send, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result> { + let func = self.func.clone(); + let arg = (self.query_map)(arg); + let sender = &mut self.sender; + let reply_receiver = &mut self.receiver; + let fut_storage = &mut self.fut_storage; + let reply_map = &*self.reply_map; + + // The previous future generated by this method should have been polled + // to completion so a new sender should be readily available. + let reply_sender = reply_receiver.sender().unwrap(); + + let send_fut = sender.send(move |model, scheduler, recycle_box| { + let fut = async move { + let reply = func.call(model, arg, scheduler).await; + reply_sender.send(reply); + }; + + coerce_box!(RecycleBox::recycle(recycle_box, fut)) + }); + + RecycledFuture::new(fut_storage, async move { + // Send the message. + send_fut.await.map_err(|_| SendError {})?; + + // Wait until the message is processed and the reply is sent back. + // If an error is received, it most likely means the mailbox was + // dropped before the message was processed. + reply_receiver + .recv() + .await + .map_err(|_| SendError {}) + .map(reply_map) + }) + } +} + +impl Clone for MapReplierSender +where + M: Model, + C: Fn(T) -> U, + D: Fn(Q) -> R, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + T: Send + 'static, + R: Send + 'static, + U: Send + 'static, + Q: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + query_map: self.query_map.clone(), + reply_map: self.reply_map.clone(), + func: self.func.clone(), + sender: self.sender.clone(), + receiver: multishot::Receiver::new(), + fut_storage: None, + _phantom_query_map: PhantomData, + _phantom_reply_map: PhantomData, + _phantom_closure: PhantomData, + _phantom_closure_marker: PhantomData, + } + } +} + +/// An object that can send an event to an event sink. +pub(super) struct EventSinkSender> +where + T: Send + 'static, + W: EventSinkWriter, +{ writer: W, fut_storage: Option>, _phantom_event: PhantomData, @@ -228,6 +437,73 @@ where } } +/// An object that can send mapped events to an event sink. +pub(super) struct MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + writer: W, + map: Arc, + fut_storage: Option>, + _phantom_event: PhantomData, +} + +impl MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U, + W: EventSinkWriter, +{ + pub(super) fn new(map: C, writer: W) -> Self { + Self { + writer, + map: Arc::new(map), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + +impl Sender for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> { + let writer = &mut self.writer; + let arg = (self.map)(arg); + + RecycledFuture::new(&mut self.fut_storage, async move { + writer.write(arg); + + Ok(()) + }) + } +} + +impl Clone for MapEventSinkSender +where + T: Send + 'static, + U: Send + 'static, + C: Fn(T) -> U + Send + Sync, + W: EventSinkWriter, +{ + fn clone(&self) -> Self { + Self { + writer: self.writer.clone(), + map: self.map.clone(), + fut_storage: None, + _phantom_event: PhantomData, + } + } +} + /// Error returned when the mailbox was closed or dropped. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) struct SendError {}