1
0
forked from ROMEO/nexosim

Make source event/query creation methods immutable

This commit is contained in:
Serge Barral 2024-12-02 18:46:45 +01:00
parent a87bf493b3
commit 8de53aff1f
6 changed files with 93 additions and 109 deletions

View File

@ -2,7 +2,7 @@ mod broadcaster;
mod sender;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use crate::model::Model;
@ -28,7 +28,7 @@ use super::ReplierFn;
/// however, to be instantiated as a member of a model, but rather as a
/// simulation control endpoint instantiated during bench assembly.
pub struct EventSource<T: Clone + Send + 'static> {
broadcaster: Arc<Mutex<EventBroadcaster<T>>>,
broadcaster: EventBroadcaster<T>,
}
impl<T: Clone + Send + 'static> EventSource<T> {
@ -46,11 +46,11 @@ impl<T: Clone + Send + 'static> EventSource<T> {
pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>)
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
S: Send + 'static,
F: for<'a> InputFn<'a, M, T, S> + Clone + Sync,
S: Send + Sync + 'static,
{
let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.lock().unwrap().add(sender);
self.broadcaster.add(sender);
}
/// Adds an auto-converting connection to an input port of the model
@ -65,13 +65,13 @@ impl<T: Clone + Send + 'static> EventSource<T> {
pub fn map_connect<M, C, F, U, S>(&mut self, map: C, input: F, address: impl Into<Address<M>>)
where
M: Model,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Sync + Clone,
U: Send + 'static,
S: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender);
self.broadcaster.add(sender);
}
/// Adds an auto-converting, filtered connection to an input port of the
@ -90,22 +90,19 @@ impl<T: Clone + Send + 'static> EventSource<T> {
address: impl Into<Address<M>>,
) where
M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone,
C: for<'a> Fn(&'a T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
U: Send + 'static,
S: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender);
self.broadcaster.add(sender);
}
/// Returns an action which, when processed, broadcasts an event to all
/// connected input ports.
///
/// Note that the action broadcasts the event to those models that are
/// connected to the event source at the time the action is processed.
pub fn event(&mut self, arg: T) -> Action {
let fut = self.broadcaster.lock().unwrap().broadcast(arg);
pub fn event(&self, arg: T) -> Action {
let fut = self.broadcaster.broadcast(arg);
let fut = async {
fut.await.unwrap_or_throw();
};
@ -115,12 +112,9 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports.
///
/// Note that the action broadcasts the event to those models that are
/// connected to the event source at the time the action is processed.
pub fn keyed_event(&mut self, arg: T) -> (Action, ActionKey) {
pub fn keyed_event(&self, arg: T) -> (Action, ActionKey) {
let action_key = ActionKey::new();
let fut = self.broadcaster.lock().unwrap().broadcast(arg);
let fut = self.broadcaster.broadcast(arg);
let action = Action::new(KeyedOnceAction::new(
// Cancellation is ignored once the action is already spawned on the
@ -139,15 +133,12 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// Returns a periodically recurring action which, when processed,
/// broadcasts an event to all connected input ports.
///
/// Note that the action broadcasts the event to those models that are
/// connected to the event source at the time the action is processed.
pub fn periodic_event(&mut self, period: Duration, arg: T) -> Action {
let broadcaster = self.broadcaster.clone();
pub fn periodic_event(self: &Arc<Self>, period: Duration, arg: T) -> Action {
let source = self.clone();
Action::new(PeriodicAction::new(
|| async move {
let fut = broadcaster.lock().unwrap().broadcast(arg);
let fut = source.broadcaster.broadcast(arg);
fut.await.unwrap_or_throw();
},
period,
@ -157,12 +148,9 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// Returns a cancellable, periodically recurring action and a cancellation
/// key; when processed, the action broadcasts an event to all connected
/// input ports.
///
/// Note that the action broadcasts the event to those models that are
/// connected to the event source at the time the action is processed.
pub fn keyed_periodic_event(&mut self, period: Duration, arg: T) -> (Action, ActionKey) {
pub fn keyed_periodic_event(self: &Arc<Self>, period: Duration, arg: T) -> (Action, ActionKey) {
let action_key = ActionKey::new();
let broadcaster = self.broadcaster.clone();
let source = self.clone();
let action = Action::new(KeyedPeriodicAction::new(
// Cancellation is ignored once the action is already spawned on the
@ -171,7 +159,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
// used outside the simulator, this shouldn't be an issue in
// practice.
|_| async move {
let fut = broadcaster.lock().unwrap().broadcast(arg);
let fut = source.broadcaster.broadcast(arg);
fut.await.unwrap_or_throw();
},
period,
@ -185,7 +173,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
impl<T: Clone + Send + 'static> Default for EventSource<T> {
fn default() -> Self {
Self {
broadcaster: Arc::new(Mutex::new(EventBroadcaster::default())),
broadcaster: EventBroadcaster::default(),
}
}
}
@ -195,7 +183,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for EventSource<T> {
write!(
f,
"Event source ({} connected ports)",
self.broadcaster.lock().unwrap().len()
self.broadcaster.len()
)
}
}
@ -208,7 +196,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for EventSource<T> {
/// member of a model, but rather as a simulation monitoring endpoint
/// instantiated during bench assembly.
pub struct QuerySource<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: Arc<Mutex<QueryBroadcaster<T, R>>>,
broadcaster: QueryBroadcaster<T, R>,
}
impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
@ -226,11 +214,11 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>)
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync,
S: Send + Sync + 'static,
{
let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.lock().unwrap().add(sender);
self.broadcaster.add(sender);
}
/// Adds an auto-converting connection to a replier port of the model
@ -251,12 +239,12 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
address: impl Into<Address<M>>,
) where
M: Model,
C: for<'a> Fn(&'a T) -> U + Send + 'static,
C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
@ -264,7 +252,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
replier,
address.into().0,
));
self.broadcaster.lock().unwrap().add(sender);
self.broadcaster.add(sender);
}
/// Adds an auto-converting, filtered connection to a replier port of the
@ -285,12 +273,12 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
address: impl Into<Address<M>>,
) where
M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static,
C: for<'a> Fn(&'a T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
S: Send + Sync + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filter_map,
@ -298,17 +286,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
replier,
address.into().0,
));
self.broadcaster.lock().unwrap().add(sender);
self.broadcaster.add(sender);
}
/// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports.
///
/// Note that the action broadcasts the query to those models that are
/// connected to the query source at the time the action is processed.
pub fn query(&mut self, arg: T) -> (Action, ReplyReceiver<R>) {
pub fn query(&self, arg: T) -> (Action, ReplyReceiver<R>) {
let (writer, reader) = slot::slot();
let fut = self.broadcaster.lock().unwrap().broadcast(arg);
let fut = self.broadcaster.broadcast(arg);
let fut = async move {
let replies = fut.await.unwrap_or_throw();
let _ = writer.write(replies);
@ -323,7 +308,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
impl<T: Clone + Send + 'static, R: Send + 'static> Default for QuerySource<T, R> {
fn default() -> Self {
Self {
broadcaster: Arc::new(Mutex::new(QueryBroadcaster::default())),
broadcaster: QueryBroadcaster::default(),
}
}
}
@ -333,7 +318,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for QuerySource<T,
write!(
f,
"Query source ({} connected ports)",
self.broadcaster.lock().unwrap().len()
self.broadcaster.len()
)
}
}

View File

@ -48,11 +48,11 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
/// Return a list of futures broadcasting an event or query to multiple
/// addresses.
fn futures(&mut self, arg: T) -> Vec<SenderFutureState<R>> {
fn futures(&self, arg: T) -> Vec<SenderFutureState<R>> {
let mut future_states = Vec::new();
// Broadcast the message and collect all futures.
let mut iter = self.senders.iter_mut();
let mut iter = self.senders.iter();
while let Some(sender) = iter.next() {
// Move the argument for the last future to avoid undue cloning.
if iter.len() == 0 {
@ -107,17 +107,14 @@ impl<T: Clone + Send> EventBroadcaster<T> {
}
/// Broadcasts an event to all addresses.
pub(super) fn broadcast(
&mut self,
arg: T,
) -> impl Future<Output = Result<(), SendError>> + Send {
pub(super) fn broadcast(&self, arg: T) -> impl Future<Output = Result<(), SendError>> + Send {
enum Fut<F1, F2> {
Empty,
Single(F1),
Multiple(F2),
}
let fut = match self.inner.senders.as_mut_slice() {
let fut = match self.inner.senders.as_slice() {
// No sender.
[] => Fut::Empty,
// One sender at most.
@ -184,7 +181,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
/// Broadcasts an event to all addresses.
pub(super) fn broadcast(
&mut self,
&self,
arg: T,
) -> impl Future<Output = Result<ReplyIterator<R>, SendError>> + Send {
enum Fut<F1, F2> {
@ -193,7 +190,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
Multiple(F2),
}
let fut = match self.inner.senders.as_mut_slice() {
let fut = match self.inner.senders.as_slice() {
// No sender.
[] => Fut::Empty,
// One sender at most.

View File

@ -14,12 +14,12 @@ use crate::ports::{InputFn, ReplierFn};
pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>;
/// An event or query sender abstracting over the target model and input method.
pub(super) trait Sender<T, R>: Send {
pub(super) trait Sender<T, R>: Send + Sync {
/// Asynchronously sends a message using a reference to the message.
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>>;
fn send(&self, arg: &T) -> Option<SenderFuture<R>>;
/// Asynchronously sends an owned message.
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> {
fn send_owned(&self, arg: T) -> Option<SenderFuture<R>> {
self.send(&arg)
}
}
@ -52,15 +52,15 @@ where
impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
F: for<'a> InputFn<'a, M, T, S> + Clone + Sync,
T: Clone + Send + 'static,
S: Send,
S: Send + Sync,
{
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
fn send(&self, arg: &T) -> Option<SenderFuture<()>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<()>> {
fn send_owned(&self, arg: T) -> Option<SenderFuture<()>> {
let func = self.func.clone();
let sender = self.sender.clone();
@ -108,13 +108,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(&T) -> U + Send,
F: for<'a> InputFn<'a, M, U, S> + Clone,
C: Fn(&T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
T: Send + 'static,
U: Send + 'static,
S: Send,
S: Send + Sync,
{
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
fn send(&self, arg: &T) -> Option<SenderFuture<()>> {
let func = self.func.clone();
let arg = (self.map)(arg);
let sender = self.sender.clone();
@ -163,13 +163,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where
M: Model,
C: Fn(&T) -> Option<U> + Send,
F: for<'a> InputFn<'a, M, U, S> + Clone,
C: Fn(&T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
T: Send + 'static,
U: Send + 'static,
S: Send,
S: Send + Sync,
{
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> {
fn send(&self, arg: &T) -> Option<SenderFuture<()>> {
(self.filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = self.sender.clone();
@ -215,16 +215,16 @@ where
impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync,
T: Clone + Send + 'static,
R: Send + 'static,
S: Send,
S: Send + Sync,
{
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
fn send(&self, arg: &T) -> Option<SenderFuture<R>> {
self.send_owned(arg.clone())
}
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> {
fn send_owned(&self, arg: T) -> Option<SenderFuture<R>> {
let func = self.func.clone();
let sender = self.sender.clone();
let (reply_sender, reply_receiver) = oneshot::channel();
@ -283,16 +283,16 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(&T) -> U + Send,
C: Fn(&T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
S: Send + Sync,
{
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
fn send(&self, arg: &T) -> Option<SenderFuture<R>> {
let func = self.func.clone();
let arg = (self.query_map)(arg);
let sender = self.sender.clone();
@ -358,16 +358,16 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where
M: Model,
C: Fn(&T) -> Option<U> + Send,
C: Fn(&T) -> Option<U> + Send + Sync,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
T: Send + 'static,
R: Send + 'static,
U: Send + 'static,
Q: Send + 'static,
S: Send,
S: Send + Sync,
{
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> {
fn send(&self, arg: &T) -> Option<SenderFuture<R>> {
(self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone();
let sender = self.sender.clone();

View File

@ -9,8 +9,8 @@ use crate::ports::EventSinkStream;
type SerializationError = ciborium::ser::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
/// A registry that holds all sinks meant to be accessed through remote
/// procedure calls.
#[derive(Default)]
pub(crate) struct EventSinkRegistry(HashMap<String, Box<dyn EventSinkStreamAny>>);

View File

@ -1,6 +1,7 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use ciborium;
@ -31,7 +32,7 @@ impl EventSourceRegistry {
{
match self.0.entry(name.into()) {
Entry::Vacant(s) => {
s.insert(Box::new(source));
s.insert(Box::new(Arc::new(source)));
Ok(())
}
@ -58,14 +59,14 @@ pub(crate) trait EventSourceAny: Send + 'static {
/// connected input ports.
///
/// The argument is expected to conform to the serde CBOR encoding.
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>;
fn event(&self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>;
/// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde CBOR encoding.
fn keyed_event(
&mut self,
&self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;
@ -74,7 +75,7 @@ pub(crate) trait EventSourceAny: Send + 'static {
///
/// The argument is expected to conform to the serde CBOR encoding.
fn periodic_event(
&mut self,
&self,
period: Duration,
serialized_arg: &[u8],
) -> Result<Action, DeserializationError>;
@ -85,7 +86,7 @@ pub(crate) trait EventSourceAny: Send + 'static {
///
/// The argument is expected to conform to the serde CBOR encoding.
fn keyed_periodic_event(
&mut self,
&self,
period: Duration,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;
@ -95,28 +96,29 @@ pub(crate) trait EventSourceAny: Send + 'static {
fn event_type_name(&self) -> &'static str;
}
impl<T> EventSourceAny for EventSource<T>
impl<T> EventSourceAny for Arc<EventSource<T>>
where
T: DeserializeOwned + Clone + Send + 'static,
{
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.event(arg))
fn event(&self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| EventSource::event(self, arg))
}
fn keyed_event(
&mut self,
&self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg))
ciborium::from_reader(serialized_arg).map(|arg| EventSource::keyed_event(self, arg))
}
fn periodic_event(
&mut self,
&self,
period: Duration,
serialized_arg: &[u8],
) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg))
ciborium::from_reader(serialized_arg)
.map(|arg| EventSource::periodic_event(self, period, arg))
}
fn keyed_periodic_event(
&mut self,
&self,
period: Duration,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {

View File

@ -63,7 +63,7 @@ pub(crate) trait QuerySourceAny: Send + 'static {
///
/// The argument is expected to conform to the serde CBOR encoding.
fn query(
&mut self,
&self,
arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError>;
@ -82,7 +82,7 @@ where
R: Serialize + Send + 'static,
{
fn query(
&mut self,
&self,
arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError> {
ciborium::from_reader(arg).map(|arg| {