1
0
forked from ROMEO/nexosim

Small changes and cleanups to prepare the RPC work

This commit is contained in:
Serge Barral
2024-02-19 12:20:28 +01:00
parent 9d78e4f72a
commit 863f995f1b
25 changed files with 572 additions and 869 deletions

View File

@ -14,18 +14,17 @@
//! ports should generally be preferred over requestor ports when possible.
use std::fmt;
use std::sync::{Arc, Mutex};
mod broadcaster;
mod sender;
use crate::model::ports::sender::EventSinkSender;
use crate::model::{InputFn, Model, ReplierFn};
use crate::simulation::{Address, EventSlot, EventStream};
use crate::util::spsc_queue;
use crate::simulation::{Address, EventSink};
use broadcaster::Broadcaster;
use broadcaster::{EventBroadcaster, QueryBroadcaster};
use self::sender::{EventSender, EventSlotSender, EventStreamSender, QuerySender};
use self::sender::{InputSender, ReplierSender};
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
/// Unique identifier for a connection between two ports.
@ -37,7 +36,7 @@ pub struct LineId(u64);
/// methods that return no value. They broadcast events to all connected input
/// ports.
pub struct Output<T: Clone + Send + 'static> {
broadcaster: Broadcaster<T, ()>,
broadcaster: EventBroadcaster<T>,
next_line_id: u64,
}
@ -62,40 +61,23 @@ impl<T: Clone + Send + 'static> Output<T> {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(EventSender::new(input, address.into().0));
let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.add(sender, line_id);
line_id
}
/// Adds a connection to an event stream iterator.
pub fn connect_stream(&mut self) -> (EventStream<T>, LineId) {
/// Adds a connection to an event sink such as an
/// [`EventSlot`](crate::simulation::EventSlot) or
/// [`EventQueue`](crate::simulation::EventQueue).
pub fn connect_sink<S: EventSink<T>>(&mut self, sink: &S) -> LineId {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let (producer, consumer) = spsc_queue::spsc_queue();
let sender = Box::new(EventStreamSender::new(producer));
let event_stream = EventStream::new(consumer);
let sender = Box::new(EventSinkSender::new(sink.writer()));
self.broadcaster.add(sender, line_id);
(event_stream, line_id)
}
/// Adds a connection to an event slot.
pub fn connect_slot(&mut self) -> (EventSlot<T>, LineId) {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let slot = Arc::new(Mutex::new(None));
let sender = Box::new(EventSlotSender::new(slot.clone()));
let event_slot = EventSlot::new(slot);
self.broadcaster.add(sender, line_id);
(event_slot, line_id)
line_id
}
/// Removes the connection specified by the `LineId` parameter.
@ -118,14 +100,14 @@ impl<T: Clone + Send + 'static> Output<T> {
/// Broadcasts an event to all connected input ports.
pub async fn send(&mut self, arg: T) {
self.broadcaster.broadcast_event(arg).await.unwrap();
self.broadcaster.broadcast(arg).await.unwrap();
}
}
impl<T: Clone + Send + 'static> Default for Output<T> {
fn default() -> Self {
Self {
broadcaster: Broadcaster::default(),
broadcaster: EventBroadcaster::default(),
next_line_id: 0,
}
}
@ -143,7 +125,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
/// model methods that return a value. They broadcast queries to all connected
/// replier ports.
pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: Broadcaster<T, R>,
broadcaster: QueryBroadcaster<T, R>,
next_line_id: u64,
}
@ -168,7 +150,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(QuerySender::new(replier, address.into().0));
let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.add(sender, line_id);
line_id
@ -194,14 +176,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// Broadcasts a query to all connected replier ports.
pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ {
self.broadcaster.broadcast_query(arg).await.unwrap()
self.broadcaster.broadcast(arg).await.unwrap()
}
}
impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> {
fn default() -> Self {
Self {
broadcaster: Broadcaster::default(),
broadcaster: QueryBroadcaster::default(),
next_line_id: 0,
}
}

View File

@ -1,4 +1,5 @@
use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -26,28 +27,17 @@ mod task_set;
/// exploits this behavior by waking the main broadcast future only when all
/// sender futures have been awaken, which strongly reduces overhead since
/// waking a sender task does not actually schedule it on the executor.
pub(super) struct Broadcaster<T: Clone + 'static, R: 'static> {
pub(super) struct BroadcasterInner<T: Clone + 'static, R: 'static> {
/// The list of senders with their associated line identifier.
senders: Vec<(LineId, Box<dyn Sender<T, R>>)>,
/// Fields explicitly borrowed by the `BroadcastFuture`.
shared: Shared<R>,
/// Phantom types.
_phantom_event: PhantomData<T>,
_phantom_reply: PhantomData<R>,
}
impl<T: Clone + 'static> Broadcaster<T, ()> {
/// Broadcasts an event to all addresses.
pub(super) async fn broadcast_event(&mut self, arg: T) -> Result<(), BroadcastError> {
match self.senders.as_mut_slice() {
// No sender.
[] => Ok(()),
// One sender.
[sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}),
// Multiple senders.
_ => self.broadcast(arg).await,
}
}
}
impl<T: Clone + 'static, R> Broadcaster<T, R> {
impl<T: Clone + 'static, R> BroadcasterInner<T, R> {
/// Adds a new sender associated to the specified identifier.
///
/// # Panics
@ -93,55 +83,25 @@ impl<T: Clone + 'static, R> Broadcaster<T, R> {
self.senders.len()
}
/// Broadcasts a query to all addresses and collect all responses.
pub(super) async fn broadcast_query(
&mut self,
arg: T,
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
match self.senders.as_mut_slice() {
// No sender.
[] => {}
// One sender.
[sender] => {
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
self.shared.futures_env[0].output = Some(output);
}
// Multiple senders.
_ => self.broadcast(arg).await?,
};
// At this point all outputs should be available so `unwrap` can be
// called on the output of each future.
let outputs = self
.shared
.futures_env
.iter_mut()
.map(|t| t.output.take().unwrap());
Ok(outputs)
}
/// Efficiently broadcasts a message or a query to multiple addresses.
///
/// This method does not collect the responses from queries.
fn broadcast(&mut self, arg: T) -> BroadcastFuture<'_, R> {
let futures_count = self.senders.len();
let mut futures = recycle_vec(self.shared.storage.take().unwrap_or_default());
// Broadcast the message and collect all futures.
for (i, (sender, futures_env)) in self
let mut iter = self
.senders
.iter_mut()
.zip(self.shared.futures_env.iter_mut())
.enumerate()
{
.zip(self.shared.futures_env.iter_mut());
while let Some((sender, futures_env)) = iter.next() {
let future_cache = futures_env
.storage
.take()
.unwrap_or_else(|| RecycleBox::new(()));
// Move the argument rather than clone it for the last future.
if i + 1 == futures_count {
if iter.len() == 0 {
let future: RecycleBox<dyn Future<Output = Result<R, SendError>> + Send + '_> =
coerce_box!(RecycleBox::recycle(future_cache, sender.1.send(arg)));
@ -161,7 +121,7 @@ impl<T: Clone + 'static, R> Broadcaster<T, R> {
}
}
impl<T: Clone + 'static, R> Default for Broadcaster<T, R> {
impl<T: Clone + 'static, R> Default for BroadcasterInner<T, R> {
/// Creates an empty `Broadcaster` object.
fn default() -> Self {
let wake_sink = WakeSink::new();
@ -175,6 +135,141 @@ impl<T: Clone + 'static, R> Default for Broadcaster<T, R> {
futures_env: Vec::new(),
storage: None,
},
_phantom_event: PhantomData,
_phantom_reply: PhantomData,
}
}
}
/// An object that can efficiently broadcast events to several input ports.
///
/// See `BroadcasterInner` for implementation details.
pub(super) struct EventBroadcaster<T: Clone + 'static> {
/// The broadcaster core object.
inner: BroadcasterInner<T, ()>,
}
impl<T: Clone + 'static> EventBroadcaster<T> {
/// Adds a new sender associated to the specified identifier.
///
/// # Panics
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>, id: LineId) {
self.inner.add(sender, id);
}
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
}
/// Returns the number of connected senders.
pub(super) fn len(&self) -> usize {
self.inner.len()
}
/// Broadcasts an event to all addresses.
pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), BroadcastError> {
match self.inner.senders.as_mut_slice() {
// No sender.
[] => Ok(()),
// One sender.
[sender] => sender.1.send(arg).await.map_err(|_| BroadcastError {}),
// Multiple senders.
_ => self.inner.broadcast(arg).await,
}
}
}
impl<T: Clone + 'static> Default for EventBroadcaster<T> {
fn default() -> Self {
Self {
inner: BroadcasterInner::default(),
}
}
}
/// An object that can efficiently broadcast queries to several replier ports.
///
/// See `BroadcasterInner` for implementation details.
pub(super) struct QueryBroadcaster<T: Clone + 'static, R: 'static> {
/// The broadcaster core object.
inner: BroadcasterInner<T, R>,
}
impl<T: Clone + 'static, R: 'static> QueryBroadcaster<T, R> {
/// Adds a new sender associated to the specified identifier.
///
/// # Panics
///
/// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>, id: LineId) {
self.inner.add(sender, id);
}
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
}
/// Returns the number of connected senders.
pub(super) fn len(&self) -> usize {
self.inner.len()
}
/// Broadcasts a query to all addresses and collect all responses.
pub(super) async fn broadcast(
&mut self,
arg: T,
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> {
match self.inner.senders.as_mut_slice() {
// No sender.
[] => {}
// One sender.
[sender] => {
let output = sender.1.send(arg).await.map_err(|_| BroadcastError {})?;
self.inner.shared.futures_env[0].output = Some(output);
}
// Multiple senders.
_ => self.inner.broadcast(arg).await?,
};
// At this point all outputs should be available so `unwrap` can be
// called on the output of each future.
let outputs = self
.inner
.shared
.futures_env
.iter_mut()
.map(|t| t.output.take().unwrap());
Ok(outputs)
}
}
impl<T: Clone + 'static, R: 'static> Default for QueryBroadcaster<T, R> {
fn default() -> Self {
Self {
inner: BroadcasterInner::default(),
}
}
}
@ -323,7 +418,7 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
let scheduled_tasks = match this
.shared
.task_set
.steal_scheduled(this.pending_futures_count)
.take_scheduled(this.pending_futures_count)
{
Some(st) => st,
None => return Poll::Pending,
@ -408,9 +503,7 @@ mod tests {
use futures_executor::block_on;
use super::super::sender::QuerySender;
use crate::channel::Receiver;
use crate::model::Model;
use crate::time::Scheduler;
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
@ -441,18 +534,18 @@ mod tests {
const N_RECV: usize = 4;
let mut mailboxes = Vec::new();
let mut broadcaster = Broadcaster::default();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(EventSender::new(Counter::inc, address));
let sender = Box::new(InputSender::new(Counter::inc, address));
broadcaster.add(sender, LineId(id as u64));
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(broadcaster.broadcast_event(1)).unwrap();
block_on(broadcaster.broadcast(1)).unwrap();
});
let counter = Arc::new(AtomicUsize::new(0));
@ -489,18 +582,18 @@ mod tests {
const N_RECV: usize = 4;
let mut mailboxes = Vec::new();
let mut broadcaster = Broadcaster::default();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(QuerySender::new(Counter::fetch_inc, address));
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
broadcaster.add(sender, LineId(id as u64));
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
let iter = block_on(broadcaster.broadcast_query(1)).unwrap();
let iter = block_on(broadcaster.broadcast(1)).unwrap();
let sum = iter.fold(0, |acc, val| acc + val);
assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)}
@ -609,12 +702,12 @@ mod tests {
let (test_event2, waker2) = test_event::<usize>();
let (test_event3, waker3) = test_event::<usize>();
let mut broadcaster = Broadcaster::default();
let mut broadcaster = QueryBroadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2));
broadcaster.add(Box::new(test_event3), LineId(3));
let mut fut = Box::pin(broadcaster.broadcast_query(()));
let mut fut = Box::pin(broadcaster.broadcast(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));
let is_scheduled_waker = is_scheduled.clone();
@ -684,11 +777,11 @@ mod tests {
let (test_event1, waker1) = test_event::<usize>();
let (test_event2, waker2) = test_event::<usize>();
let mut broadcaster = Broadcaster::default();
let mut broadcaster = QueryBroadcaster::default();
broadcaster.add(Box::new(test_event1), LineId(1));
broadcaster.add(Box::new(test_event2), LineId(2));
let mut fut = Box::pin(broadcaster.broadcast_query(()));
let mut fut = Box::pin(broadcaster.broadcast(()));
let is_scheduled = loom::sync::Arc::new(AtomicBool::new(false));
let is_scheduled_waker = is_scheduled.clone();

View File

@ -21,31 +21,37 @@ const COUNTDOWN_MASK: u64 = !INDEX_MASK;
/// scheduled tasks.
const COUNTDOWN_ONE: u64 = 1 << 32;
/// A set of tasks that may be scheduled cheaply and can be requested to wake a
/// parent task only when a given amount of tasks have been scheduled.
/// An object for the efficient management of a set of tasks scheduled
/// concurrently.
///
/// This object maintains both a list of all active tasks and a list of the
/// subset of active tasks currently scheduled. The latter is stored in a
/// Treiber stack which links tasks through indices rather than pointers. Using
/// indices has two advantages: (i) it enables a fully safe implementation and
/// (ii) it makes it possible to use a single CAS to simultaneously move the
/// head and decrement the outstanding amount of tasks to be scheduled before
/// the parent task is notified.
/// The algorithm used by `TaskSet` is designed to wake up the parent task as
/// seldom as possible, ideally only when all non-completed sub-tasks have been
/// scheduled (awaken).
///
/// A `TaskSet` maintains both a vector-based list of tasks (or more accurately,
/// task waker handles) and a linked list of the subset of tasks that are
/// currently scheduled. The latter is stored in a vector-based Treiber stack
/// which links tasks through indices rather than pointers. Using indices has
/// two advantages: (i) it makes a fully safe implementation possible and (ii)
/// it can take advantage of a single CAS to simultaneously move the head and
/// decrement the outstanding amount of tasks to be scheduled before the parent
/// task is notified.
pub(super) struct TaskSet {
/// Set of all active tasks, scheduled or not.
/// Set of all tasks, scheduled or not.
///
/// In some rare cases, the back of the vector can also contain inactive
/// (retired) tasks.
/// In some cases, the use of `resize()` to shrink the task set may leave
/// inactive tasks at the back of the vector, in which case the length of
/// the vector will exceed `task_count`.
tasks: Vec<Arc<Task>>,
/// Head of the Treiber stack for scheduled tasks.
///
/// The lower bits specify the index of the last scheduled task, if any,
/// whereas the upper bits specify the countdown of tasks still to be
/// scheduled before the parent task is notified.
/// The lower 32 bits specify the index of the last scheduled task (head),
/// if any, whereas the upper 32 bits specify the countdown of tasks still
/// to be scheduled before the parent task is notified.
head: Arc<AtomicU64>,
/// A notifier used to wake the parent task.
notifier: WakeSource,
/// Count of all active tasks, scheduled or not.
/// Count of all tasks, scheduled or not.
task_count: usize,
}
@ -65,21 +71,25 @@ impl TaskSet {
}
}
/// Steals scheduled tasks if any and returns an iterator over their
/// indices, otherwise returns `None` and requests a notification to be sent
/// after `notify_count` tasks have been scheduled.
/// Take all scheduled tasks and returns an iterator over their indices, or
/// if there are no currently scheduled tasks returns `None` and requests a
/// notification to be sent after `pending_task_count` tasks have been
/// scheduled.
///
/// In all cases, the list of scheduled tasks is guaranteed to be empty
/// after this call.
/// In all cases, the list of scheduled tasks will be empty right after this
/// call.
///
/// If some tasks were stolen, no notification is requested.
/// If there were scheduled tasks, no notification is requested because this
/// method is expected to be called repeatedly until it returns `None`.
/// Failure to do so will result in missed notifications.
///
/// If no tasks were stolen, the notification is guaranteed to be triggered
/// no later than after `notify_count` tasks have been scheduled, though it
/// may in some cases be triggered earlier. If the specified `notify_count`
/// is zero then no notification is requested.
pub(super) fn steal_scheduled(&self, notify_count: usize) -> Option<TaskIterator<'_>> {
let countdown = u32::try_from(notify_count).unwrap();
/// If no tasks were scheduled, the notification is guaranteed to be
/// triggered no later than after `pending_task_count` tasks have been
/// scheduled, though it may in some cases be triggered earlier. If the
/// specified `pending_task_count` is zero then no notification is
/// requested.
pub(super) fn take_scheduled(&self, pending_task_count: usize) -> Option<TaskIterator<'_>> {
let countdown = u32::try_from(pending_task_count).unwrap();
let mut head = self.head.load(Ordering::Relaxed);
loop {
@ -126,13 +136,13 @@ impl TaskSet {
if self.head.load(Ordering::Relaxed) != EMPTY as u64 {
// Dropping the iterator ensures that all tasks are put in the
// sleeping state.
let _ = self.steal_scheduled(0);
let _ = self.take_scheduled(0);
}
}
/// Modify the number of active tasks.
/// Set the number of active tasks.
///
/// Note that this method may discard all scheduled tasks.
/// Note that this method may discard already scheduled tasks.
///
/// # Panic
///
@ -200,7 +210,7 @@ impl TaskSet {
}
}
/// Returns `true` if one or more tasks are currently scheduled.
/// Returns `true` if one or more sub-tasks are currently scheduled.
pub(super) fn has_scheduled(&self) -> bool {
// Ordering: the content of the head is only used as an advisory flag so
// Relaxed ordering is sufficient.

View File

@ -4,22 +4,23 @@ use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use recycle_box::{coerce_box, RecycleBox};
use crate::channel;
use crate::model::{InputFn, Model, ReplierFn};
use crate::util::spsc_queue;
use crate::simulation::EventSinkWriter;
/// Abstraction over `EventSender` and `QuerySender`.
/// An event or query sender abstracting over the target model and input or
/// replier method.
pub(super) trait Sender<T, R>: Send {
/// Asynchronously send the event or request.
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>>;
}
/// An object that can send a payload to a model.
pub(super) struct EventSender<M: 'static, F, T, S> {
/// An object that can send events to an input port.
pub(super) struct InputSender<M: 'static, F, T, S> {
func: F,
sender: channel::Sender<M>,
fut_storage: Option<RecycleBox<()>>,
@ -27,7 +28,7 @@ pub(super) struct EventSender<M: 'static, F, T, S> {
_phantom_closure_marker: PhantomData<S>,
}
impl<M: Send, F, T, S> EventSender<M, F, T, S>
impl<M: Send, F, T, S> InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
@ -44,15 +45,15 @@ where
}
}
impl<M: Send, F, T, S> Sender<T, ()> for EventSender<M, F, T, S>
impl<M: Send, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Copy,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + 'static,
S: Send,
S: Send + 'static,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let func = self.func;
let func = self.func.clone();
let fut = self.sender.send(move |model, scheduler, recycle_box| {
let fut = func.call(model, arg, scheduler);
@ -66,8 +67,8 @@ where
}
}
/// An object that can send a payload to a model and retrieve a response.
pub(super) struct QuerySender<M: 'static, F, T, R, S> {
/// An object that can send a request to a replier port and retrieve a response.
pub(super) struct ReplierSender<M: 'static, F, T, R, S> {
func: F,
sender: channel::Sender<M>,
receiver: multishot::Receiver<R>,
@ -76,7 +77,7 @@ pub(super) struct QuerySender<M: 'static, F, T, R, S> {
_phantom_closure_marker: PhantomData<S>,
}
impl<M, F, T, R, S> QuerySender<M, F, T, R, S>
impl<M, F, T, R, S> ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>,
@ -95,16 +96,16 @@ where
}
}
impl<M, F, T, R, S> Sender<T, R> for QuerySender<M, F, T, R, S>
impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Copy,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
T: Send + 'static,
R: Send + 'static,
S: Send,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<R, SendError>> {
let func = self.func;
let func = self.func.clone();
let sender = &mut self.sender;
let reply_receiver = &mut self.receiver;
let fut_storage = &mut self.fut_storage;
@ -134,59 +135,32 @@ where
}
}
/// An object that can send a payload to an unbounded queue.
pub(super) struct EventStreamSender<T> {
producer: spsc_queue::Producer<T>,
fut_storage: Option<RecycleBox<()>>,
}
impl<T> EventStreamSender<T> {
pub(super) fn new(producer: spsc_queue::Producer<T>) -> Self {
Self {
producer,
fut_storage: None,
}
}
}
impl<T> Sender<T, ()> for EventStreamSender<T>
where
T: Send + 'static,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let producer = &mut self.producer;
RecycledFuture::new(&mut self.fut_storage, async move {
producer.push(arg).map_err(|_| SendError {})
})
}
}
/// An object that can send a payload to a mutex-protected slot.
pub(super) struct EventSlotSender<T> {
slot: Arc<Mutex<Option<T>>>,
pub(super) struct EventSinkSender<T: Send + 'static, W: EventSinkWriter<T>> {
writer: W,
fut_storage: Option<RecycleBox<()>>,
_phantom_event: PhantomData<T>,
}
impl<T> EventSlotSender<T> {
pub(super) fn new(slot: Arc<Mutex<Option<T>>>) -> Self {
impl<T: Send + 'static, W: EventSinkWriter<T>> EventSinkSender<T, W> {
pub(super) fn new(writer: W) -> Self {
Self {
slot,
writer,
fut_storage: None,
_phantom_event: PhantomData,
}
}
}
impl<T> Sender<T, ()> for EventSlotSender<T>
impl<T, W: EventSinkWriter<T>> Sender<T, ()> for EventSinkSender<T, W>
where
T: Send + 'static,
{
fn send(&mut self, arg: T) -> RecycledFuture<'_, Result<(), SendError>> {
let slot = &*self.slot;
let writer = &self.writer;
RecycledFuture::new(&mut self.fut_storage, async move {
let mut slot = slot.lock().unwrap();
*slot = Some(arg);
writer.write(arg);
Ok(())
})