1
0
forked from ROMEO/nexosim

Remove the LineId API

This was probably never used.
This commit is contained in:
Serge Barral 2024-11-15 17:15:25 +01:00
parent 0f1d876aed
commit 3c1056d699
6 changed files with 64 additions and 311 deletions

View File

@ -91,11 +91,3 @@ pub use sink::{
event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter, event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter,
}; };
pub use source::{EventSource, QuerySource, ReplyReceiver}; pub use source::{EventSource, QuerySource, ReplyReceiver};
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
/// Unique identifier for a connection between two ports.
pub struct LineId(u64);
/// Error raised when the specified line cannot be found.
#[derive(Copy, Clone, Debug)]
pub struct LineError {}

View File

@ -4,7 +4,7 @@ mod sender;
use std::fmt; use std::fmt;
use crate::model::Model; use crate::model::Model;
use crate::ports::{EventSink, LineError, LineId}; use crate::ports::EventSink;
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
use crate::simulation::Address; use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock; use crate::util::cached_rw_lock::CachedRwLock;
@ -43,20 +43,20 @@ impl<T: Clone + Send + 'static> Output<T> {
/// The input port must be an asynchronous method of a model of type `M` /// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of type `T` plus, optionally, a scheduler /// taking as argument a value of type `T` plus, optionally, a scheduler
/// reference. /// reference.
pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>) -> LineId pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone, F: for<'a> InputFn<'a, M, T, S> + Clone,
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(InputSender::new(input, address.into().0)); let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
} }
/// Adds a connection to an event sink such as an /// Adds a connection to an event sink such as an
/// [`EventSlot`](crate::ports::EventSlot) or /// [`EventSlot`](crate::ports::EventSlot) or
/// [`EventBuffer`](crate::ports::EventBuffer). /// [`EventBuffer`](crate::ports::EventBuffer).
pub fn connect_sink<S: EventSink<T>>(&mut self, sink: &S) -> LineId { pub fn connect_sink<S: EventSink<T>>(&mut self, sink: &S) {
let sender = Box::new(EventSinkSender::new(sink.writer())); let sender = Box::new(EventSinkSender::new(sink.writer()));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender)
} }
@ -70,12 +70,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// The input port must be an asynchronous method of a model of type `M` /// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping /// taking as argument a value of the type returned by the mapping
/// closure plus, optionally, a context reference. /// closure plus, optionally, a context reference.
pub fn map_connect<M, C, F, U, S>( pub fn map_connect<M, C, F, U, S>(&mut self, map: C, input: F, address: impl Into<Address<M>>)
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where where
M: Model, M: Model,
C: Fn(&T) -> U + Send + Sync + 'static, C: Fn(&T) -> U + Send + Sync + 'static,
@ -84,7 +79,7 @@ impl<T: Clone + Send + 'static> Output<T> {
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(MapInputSender::new(map, input, address.into().0)); let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
} }
/// Adds an auto-converting connection to an event sink such as an /// Adds an auto-converting connection to an event sink such as an
@ -93,14 +88,14 @@ impl<T: Clone + Send + 'static> Output<T> {
/// ///
/// Events are mapped to another type using the closure provided in /// Events are mapped to another type using the closure provided in
/// argument. /// argument.
pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S) -> LineId pub fn map_connect_sink<C, U, S>(&mut self, map: C, sink: &S)
where where
C: Fn(&T) -> U + Send + Sync + 'static, C: Fn(&T) -> U + Send + Sync + 'static,
U: Send + 'static, U: Send + 'static,
S: EventSink<U>, S: EventSink<U>,
{ {
let sender = Box::new(MapEventSinkSender::new(map, sink.writer())); let sender = Box::new(MapEventSinkSender::new(map, sink.writer()));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
} }
/// Adds an auto-converting, filtered connection to an input port of the /// Adds an auto-converting, filtered connection to an input port of the
@ -117,8 +112,7 @@ impl<T: Clone + Send + 'static> Output<T> {
filter_map: C, filter_map: C,
input: F, input: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> LineId ) where
where
M: Model, M: Model,
C: Fn(&T) -> Option<U> + Send + Sync + 'static, C: Fn(&T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone, F: for<'a> InputFn<'a, M, U, S> + Clone,
@ -130,7 +124,7 @@ impl<T: Clone + Send + 'static> Output<T> {
input, input,
address.into().0, address.into().0,
)); ));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
} }
/// Adds an auto-converting connection to an event sink such as an /// Adds an auto-converting connection to an event sink such as an
@ -139,33 +133,14 @@ impl<T: Clone + Send + 'static> Output<T> {
/// ///
/// Events are mapped to another type using the closure provided in /// Events are mapped to another type using the closure provided in
/// argument. /// argument.
pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S) -> LineId pub fn filter_map_connect_sink<C, U, S>(&mut self, filter_map: C, sink: &S)
where where
C: Fn(&T) -> Option<U> + Send + Sync + 'static, C: Fn(&T) -> Option<U> + Send + Sync + 'static,
U: Send + 'static, U: Send + 'static,
S: EventSink<U>, S: EventSink<U>,
{ {
let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer())); let sender = Box::new(FilterMapEventSinkSender::new(filter_map, sink.writer()));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
}
/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
/// [`Output`], [`Requestor`], [`EventSource`](crate::ports::EventSource) or
/// [`QuerySource`](crate::ports::QuerySource) instance and may result in
/// the disconnection of an arbitrary endpoint.
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.write().unwrap().remove(line_id) {
Ok(())
} else {
Err(LineError {})
}
}
/// Removes all connections.
pub fn disconnect_all(&mut self) {
self.broadcaster.write().unwrap().clear();
} }
/// Broadcasts an event to all connected input ports. /// Broadcasts an event to all connected input ports.
@ -219,14 +194,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// The replier port must be an asynchronous method of a model of type `M` /// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of type `R` and taking as argument a value of type `T` /// returning a value of type `R` and taking as argument a value of type `T`
/// plus, optionally, a context reference. /// plus, optionally, a context reference.
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(ReplierSender::new(replier, address.into().0)); let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
} }
/// Adds an auto-converting connection to a replier port of the model /// Adds an auto-converting connection to a replier port of the model
@ -245,8 +220,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
reply_map: D, reply_map: D,
replier: F, replier: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> LineId ) where
where
M: Model, M: Model,
C: Fn(&T) -> U + Send + Sync + 'static, C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
@ -261,7 +235,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
replier, replier,
address.into().0, address.into().0,
)); ));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
} }
/// Adds an auto-converting, filtered connection to a replier port of the /// Adds an auto-converting, filtered connection to a replier port of the
@ -280,8 +254,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
reply_map: D, reply_map: D,
replier: F, replier: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> LineId ) where
where
M: Model, M: Model,
C: Fn(&T) -> Option<U> + Send + Sync + 'static, C: Fn(&T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
@ -296,26 +269,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
replier, replier,
address.into().0, address.into().0,
)); ));
self.broadcaster.write().unwrap().add(sender) self.broadcaster.write().unwrap().add(sender);
}
/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
/// [`Requestor`], [`Output`], [`EventSource`](crate::ports::EventSource) or
/// [`QuerySource`](crate::ports::QuerySource) instance and may result in
/// the disconnection of an arbitrary endpoint.
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.write().unwrap().remove(line_id) {
Ok(())
} else {
Err(LineError {})
}
}
/// Removes all connections.
pub fn disconnect_all(&mut self) {
self.broadcaster.write().unwrap().clear();
} }
/// Broadcasts a query to all connected replier ports. /// Broadcasts a query to all connected replier ports.

View File

@ -6,7 +6,6 @@ use std::task::{Context, Poll};
use diatomic_waker::WakeSink; use diatomic_waker::WakeSink;
use super::sender::{RecycledFuture, SendError, Sender}; use super::sender::{RecycledFuture, SendError, Sender};
use super::LineId;
use crate::util::task_set::TaskSet; use crate::util::task_set::TaskSet;
/// An object that can efficiently broadcast messages to several addresses. /// An object that can efficiently broadcast messages to several addresses.
@ -24,10 +23,8 @@ use crate::util::task_set::TaskSet;
/// - the outputs of all sender futures are returned all at once rather than /// - the outputs of all sender futures are returned all at once rather than
/// with an asynchronous iterator (a.k.a. async stream). /// with an asynchronous iterator (a.k.a. async stream).
pub(super) struct BroadcasterInner<T: Clone, R> { pub(super) struct BroadcasterInner<T: Clone, R> {
/// Line identifier for the next port to be connected.
next_line_id: u64,
/// The list of senders with their associated line identifier. /// The list of senders with their associated line identifier.
senders: Vec<(LineId, Box<dyn Sender<T, R>>)>, senders: Vec<Box<dyn Sender<T, R>>>,
/// Fields explicitly borrowed by the `BroadcastFuture`. /// Fields explicitly borrowed by the `BroadcastFuture`.
shared: Shared<R>, shared: Shared<R>,
} }
@ -38,42 +35,17 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
/// # Panics /// # Panics
/// ///
/// This method will panic if the total count of senders would reach /// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`. /// `u32::MAX - 1` due to limitations inherent to the task set
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId { /// implementation.
assert!(self.next_line_id != u64::MAX); pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) {
let line_id = LineId(self.next_line_id); assert!(self.senders.len() < (u32::MAX as usize - 2));
self.next_line_id += 1; self.senders.push(sender);
self.senders.push((line_id, sender));
self.shared.outputs.push(None); self.shared.outputs.push(None);
// The storage is alway an empty vector so we just book some capacity. // The storage is alway an empty vector so we just book some capacity.
if let Some(storage) = self.shared.storage.as_mut() { if let Some(storage) = self.shared.storage.as_mut() {
let _ = storage.try_reserve(self.senders.len()); let _ = storage.try_reserve(self.senders.len());
}; };
line_id
}
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
self.senders.swap_remove(pos);
self.shared.outputs.truncate(self.senders.len());
return true;
}
false
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.senders.clear();
self.shared.outputs.clear();
} }
/// Returns the number of connected senders. /// Returns the number of connected senders.
@ -98,13 +70,13 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
while let Some(sender) = iter.next() { while let Some(sender) = iter.next() {
// Move the argument rather than clone it for the last future. // Move the argument rather than clone it for the last future.
if iter.len() == 0 { if iter.len() == 0 {
if let Some(fut) = sender.1.send_owned(arg) { if let Some(fut) = sender.send_owned(arg) {
futures.push(fut); futures.push(fut);
} }
break; break;
} }
if let Some(fut) = sender.1.send(&arg) { if let Some(fut) = sender.send(&arg) {
futures.push(fut); futures.push(fut);
} }
} }
@ -120,7 +92,6 @@ impl<T: Clone, R> Default for BroadcasterInner<T, R> {
let wake_src = wake_sink.source(); let wake_src = wake_sink.source();
Self { Self {
next_line_id: 0,
senders: Vec::new(), senders: Vec::new(),
shared: Shared { shared: Shared {
wake_sink, wake_sink,
@ -135,7 +106,6 @@ impl<T: Clone, R> Default for BroadcasterInner<T, R> {
impl<T: Clone, R> Clone for BroadcasterInner<T, R> { impl<T: Clone, R> Clone for BroadcasterInner<T, R> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
next_line_id: self.next_line_id,
senders: self.senders.clone(), senders: self.senders.clone(),
shared: self.shared.clone(), shared: self.shared.clone(),
} }
@ -160,24 +130,12 @@ impl<T: Clone> EventBroadcaster<T> {
/// # Panics /// # Panics
/// ///
/// This method will panic if the total count of senders would reach /// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`. /// `u32::MAX - 1` due to limitations inherent to the task set
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>) -> LineId { /// implementation.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>) {
self.inner.add(sender) self.inner.add(sender)
} }
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
}
/// Returns the number of connected senders. /// Returns the number of connected senders.
pub(super) fn len(&self) -> usize { pub(super) fn len(&self) -> usize {
self.inner.len() self.inner.len()
@ -190,7 +148,7 @@ impl<T: Clone> EventBroadcaster<T> {
[] => Ok(()), [] => Ok(()),
// One sender at most. // One sender at most.
[sender] => match sender.1.send_owned(arg) { [sender] => match sender.send_owned(arg) {
None => Ok(()), None => Ok(()),
Some(fut) => fut.await.map_err(|_| BroadcastError {}), Some(fut) => fut.await.map_err(|_| BroadcastError {}),
}, },
@ -233,24 +191,12 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
/// # Panics /// # Panics
/// ///
/// This method will panic if the total count of senders would reach /// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`. /// `u32::MAX - 1` due to limitations inherent to the task set
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId { /// implementation.
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) {
self.inner.add(sender) self.inner.add(sender)
} }
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
}
/// Returns the number of connected senders. /// Returns the number of connected senders.
pub(super) fn len(&self) -> usize { pub(super) fn len(&self) -> usize {
self.inner.len() self.inner.len()
@ -267,7 +213,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
// One sender at most. // One sender at most.
[sender] => { [sender] => {
if let Some(fut) = sender.1.send_owned(arg) { if let Some(fut) = sender.send_owned(arg) {
let output = fut.await.map_err(|_| BroadcastError {})?; let output = fut.await.map_err(|_| BroadcastError {})?;
self.inner.shared.outputs[0] = Some(output); self.inner.shared.outputs[0] = Some(output);

View File

@ -7,7 +7,6 @@ use std::time::Duration;
use crate::model::Model; use crate::model::Model;
use crate::ports::InputFn; use crate::ports::InputFn;
use crate::ports::{LineError, LineId};
use crate::simulation::{ use crate::simulation::{
Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction,
}; };
@ -43,14 +42,14 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// The input port must be an asynchronous method of a model of type `M` /// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of type `T` plus, optionally, a scheduler /// taking as argument a value of type `T` plus, optionally, a scheduler
/// reference. /// reference.
pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>) -> LineId pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone, F: for<'a> InputFn<'a, M, T, S> + Clone,
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(InputSender::new(input, address.into().0)); let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.lock().unwrap().add(sender) self.broadcaster.lock().unwrap().add(sender);
} }
/// Adds an auto-converting connection to an input port of the model /// Adds an auto-converting connection to an input port of the model
@ -62,12 +61,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// The input port must be an asynchronous method of a model of type `M` /// The input port must be an asynchronous method of a model of type `M`
/// taking as argument a value of the type returned by the mapping closure /// taking as argument a value of the type returned by the mapping closure
/// plus, optionally, a context reference. /// plus, optionally, a context reference.
pub fn map_connect<M, C, F, U, S>( pub fn map_connect<M, C, F, U, S>(&mut self, map: C, input: F, address: impl Into<Address<M>>)
&mut self,
map: C,
input: F,
address: impl Into<Address<M>>,
) -> LineId
where where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> U + Send + 'static, C: for<'a> Fn(&'a T) -> U + Send + 'static,
@ -76,7 +70,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(MapInputSender::new(map, input, address.into().0)); let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender) self.broadcaster.lock().unwrap().add(sender);
} }
/// Adds an auto-converting, filtered connection to an input port of the /// Adds an auto-converting, filtered connection to an input port of the
@ -93,8 +87,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
map: C, map: C,
input: F, input: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> LineId ) where
where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static, C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone, F: for<'a> InputFn<'a, M, U, S> + Clone,
@ -102,26 +95,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender) self.broadcaster.lock().unwrap().add(sender);
}
/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
/// [`EventSource`], [`QuerySource`], [`Output`](crate::ports::Output) or
/// [`Requestor`](crate::ports::Requestor) instance and may result in the
/// disconnection of an arbitrary endpoint.
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.lock().unwrap().remove(line_id) {
Ok(())
} else {
Err(LineError {})
}
}
/// Removes all connections.
pub fn disconnect_all(&mut self) {
self.broadcaster.lock().unwrap().clear();
} }
/// Returns an action which, when processed, broadcasts an event to all /// Returns an action which, when processed, broadcasts an event to all
@ -248,14 +222,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
/// The replier port must be an asynchronous method of a model of type `M` /// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of type `R` and taking as argument a value of type `T` /// returning a value of type `R` and taking as argument a value of type `T`
/// plus, optionally, a context reference. /// plus, optionally, a context reference.
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) -> LineId pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(ReplierSender::new(replier, address.into().0)); let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.lock().unwrap().add(sender) self.broadcaster.lock().unwrap().add(sender);
} }
/// Adds an auto-converting connection to a replier port of the model /// Adds an auto-converting connection to a replier port of the model
@ -274,8 +248,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
reply_map: D, reply_map: D,
replier: F, replier: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> LineId ) where
where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> U + Send + 'static, C: for<'a> Fn(&'a T) -> U + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
@ -290,7 +263,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
replier, replier,
address.into().0, address.into().0,
)); ));
self.broadcaster.lock().unwrap().add(sender) self.broadcaster.lock().unwrap().add(sender);
} }
/// Adds an auto-converting, filtered connection to a replier port of the /// Adds an auto-converting, filtered connection to a replier port of the
@ -309,8 +282,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
reply_map: D, reply_map: D,
replier: F, replier: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) -> LineId ) where
where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static, C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
@ -325,26 +297,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
replier, replier,
address.into().0, address.into().0,
)); ));
self.broadcaster.lock().unwrap().add(sender) self.broadcaster.lock().unwrap().add(sender);
}
/// Removes the connection specified by the `LineId` parameter.
///
/// It is a logic error to specify a line identifier from another
/// [`QuerySource`], [`EventSource`], [`Output`](crate::ports::Output) or
/// [`Requestor`](crate::ports::Requestor) instance and may result in the
/// disconnection of an arbitrary endpoint.
pub fn disconnect(&mut self, line_id: LineId) -> Result<(), LineError> {
if self.broadcaster.lock().unwrap().remove(line_id) {
Ok(())
} else {
Err(LineError {})
}
}
/// Removes all connections.
pub fn disconnect_all(&mut self) {
self.broadcaster.lock().unwrap().clear();
} }
/// Returns an action which, when processed, broadcasts a query to all /// Returns an action which, when processed, broadcasts a query to all

View File

@ -10,7 +10,6 @@ use diatomic_waker::WakeSink;
use super::sender::{Sender, SenderFuture}; use super::sender::{Sender, SenderFuture};
use crate::ports::LineId;
use crate::util::task_set::TaskSet; use crate::util::task_set::TaskSet;
/// An object that can efficiently broadcast messages to several addresses. /// An object that can efficiently broadcast messages to several addresses.
@ -24,10 +23,8 @@ use crate::util::task_set::TaskSet;
/// does, but the outputs of all sender futures are returned all at once rather /// does, but the outputs of all sender futures are returned all at once rather
/// than with an asynchronous iterator (a.k.a. async stream). /// than with an asynchronous iterator (a.k.a. async stream).
pub(super) struct BroadcasterInner<T: Clone, R> { pub(super) struct BroadcasterInner<T: Clone, R> {
/// Line identifier for the next port to be connected.
next_line_id: u64,
/// The list of senders with their associated line identifier. /// The list of senders with their associated line identifier.
senders: Vec<(LineId, Box<dyn Sender<T, R>>)>, senders: Vec<Box<dyn Sender<T, R>>>,
} }
impl<T: Clone, R> BroadcasterInner<T, R> { impl<T: Clone, R> BroadcasterInner<T, R> {
@ -36,34 +33,11 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
/// # Panics /// # Panics
/// ///
/// This method will panic if the total count of senders would reach /// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`. /// `u32::MAX - 1` due to limitations inherent to the task set
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId { /// implementation.
assert!(self.next_line_id != u64::MAX); pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) {
let line_id = LineId(self.next_line_id); assert!(self.senders.len() < (u32::MAX as usize - 2));
self.next_line_id += 1; self.senders.push(sender);
self.senders.push((line_id, sender));
line_id
}
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
if let Some(pos) = self.senders.iter().position(|s| s.0 == id) {
self.senders.swap_remove(pos);
return true;
}
false
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.senders.clear();
} }
/// Returns the number of connected senders. /// Returns the number of connected senders.
@ -81,12 +55,12 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
while let Some(sender) = iter.next() { while let Some(sender) = iter.next() {
// Move the argument for the last future to avoid undue cloning. // Move the argument for the last future to avoid undue cloning.
if iter.len() == 0 { if iter.len() == 0 {
if let Some(fut) = sender.1.send_owned(arg) { if let Some(fut) = sender.send_owned(arg) {
future_states.push(SenderFutureState::Pending(fut)); future_states.push(SenderFutureState::Pending(fut));
} }
break; break;
} }
if let Some(fut) = sender.1.send(&arg) { if let Some(fut) = sender.send(&arg) {
future_states.push(SenderFutureState::Pending(fut)); future_states.push(SenderFutureState::Pending(fut));
} }
} }
@ -98,7 +72,6 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
impl<T: Clone, R> Default for BroadcasterInner<T, R> { impl<T: Clone, R> Default for BroadcasterInner<T, R> {
fn default() -> Self { fn default() -> Self {
Self { Self {
next_line_id: 0,
senders: Vec::new(), senders: Vec::new(),
} }
} }
@ -121,22 +94,10 @@ impl<T: Clone + Send> EventBroadcaster<T> {
/// # Panics /// # Panics
/// ///
/// This method will panic if the total count of senders would reach /// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`. /// `u32::MAX - 1` due to limitations inherent to the task set
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>) -> LineId { /// implementation.
self.inner.add(sender) pub(super) fn add(&mut self, sender: Box<dyn Sender<T, ()>>) {
} self.inner.add(sender);
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
} }
/// Returns the number of connected senders. /// Returns the number of connected senders.
@ -159,7 +120,7 @@ impl<T: Clone + Send> EventBroadcaster<T> {
// No sender. // No sender.
[] => Fut::Empty, [] => Fut::Empty,
// One sender at most. // One sender at most.
[sender] => Fut::Single(sender.1.send_owned(arg)), [sender] => Fut::Single(sender.send_owned(arg)),
// Possibly multiple senders. // Possibly multiple senders.
_ => Fut::Multiple(self.inner.futures(arg)), _ => Fut::Multiple(self.inner.futures(arg)),
}; };
@ -209,22 +170,10 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
/// # Panics /// # Panics
/// ///
/// This method will panic if the total count of senders would reach /// This method will panic if the total count of senders would reach
/// `u32::MAX - 1`. /// `u32::MAX - 1` due to limitations inherent to the task set
pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) -> LineId { /// implementation.
self.inner.add(sender) pub(super) fn add(&mut self, sender: Box<dyn Sender<T, R>>) {
} self.inner.add(sender);
/// Removes the first sender with the specified identifier, if any.
///
/// Returns `true` if there was indeed a sender associated to the specified
/// identifier.
pub(super) fn remove(&mut self, id: LineId) -> bool {
self.inner.remove(id)
}
/// Removes all senders.
pub(super) fn clear(&mut self) {
self.inner.clear();
} }
/// Returns the number of connected senders. /// Returns the number of connected senders.
@ -247,7 +196,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
// No sender. // No sender.
[] => Fut::Empty, [] => Fut::Empty,
// One sender at most. // One sender at most.
[sender] => Fut::Single(sender.1.send_owned(arg)), [sender] => Fut::Single(sender.send_owned(arg)),
// Possibly multiple senders. // Possibly multiple senders.
_ => Fut::Multiple(self.inner.futures(arg)), _ => Fut::Multiple(self.inner.futures(arg)),
}; };

View File

@ -76,47 +76,6 @@
//! Any deadlocks will be reported as an [`ExecutionError::Deadlock`] error, //! Any deadlocks will be reported as an [`ExecutionError::Deadlock`] error,
//! which identifies all involved models and the amount of unprocessed messages //! which identifies all involved models and the amount of unprocessed messages
//! (events or requests) in their mailboxes. //! (events or requests) in their mailboxes.
//!
//! ## Modifying connections during simulation
//!
//! Although uncommon, there is sometimes a need for connecting and/or
//! disconnecting models after they have been migrated to the simulation.
//! Likewise, one may want to connect or disconnect an
//! [`EventSlot`](crate::ports::EventSlot) or
//! [`EventBuffer`](crate::ports::EventBuffer) after the simulation has been
//! instantiated.
//!
//! There is actually a very simple solution to this problem: since the
//! [`InputFn`] trait also matches closures of type `FnOnce(&mut impl Model)`,
//! it is enough to invoke [`Simulation::process_event()`] with a closure that
//! connects or disconnects a port, such as:
//!
//! ```
//! # use asynchronix::model::{Context, Model};
//! # use asynchronix::ports::Output;
//! # use asynchronix::time::MonotonicTime;
//! # use asynchronix::simulation::{Mailbox, SimInit};
//! # pub struct ModelA {
//! # pub output: Output<i32>,
//! # }
//! # impl Model for ModelA {};
//! # pub struct ModelB {}
//! # impl ModelB {
//! # pub fn input(&mut self, value: i32) {}
//! # }
//! # impl Model for ModelB {};
//! # let modelA_addr = Mailbox::<ModelA>::new().address();
//! # let modelB_addr = Mailbox::<ModelB>::new().address();
//! # let mut simu = SimInit::new().init(MonotonicTime::EPOCH)?.0;
//! simu.process_event(
//! |m: &mut ModelA| {
//! m.output.connect(ModelB::input, modelB_addr);
//! },
//! (),
//! &modelA_addr
//! )?;
//! # Ok::<(), asynchronix::simulation::SimulationError>(())
//! ```
mod mailbox; mod mailbox;
mod scheduler; mod scheduler;
mod sim_init; mod sim_init;