From 8de53aff1ffe867114fb0d151f0870f22c9e64f9 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 2 Dec 2024 18:46:45 +0100 Subject: [PATCH] Make source event/query creation methods immutable --- nexosim/src/ports/source.rs | 97 ++++++++----------- nexosim/src/ports/source/broadcaster.rs | 15 ++- nexosim/src/ports/source/sender.rs | 54 +++++------ nexosim/src/registry/event_sink_registry.rs | 4 +- nexosim/src/registry/event_source_registry.rs | 28 +++--- nexosim/src/registry/query_source_registry.rs | 4 +- 6 files changed, 93 insertions(+), 109 deletions(-) diff --git a/nexosim/src/ports/source.rs b/nexosim/src/ports/source.rs index 3eccb8c..dff1d04 100644 --- a/nexosim/src/ports/source.rs +++ b/nexosim/src/ports/source.rs @@ -2,7 +2,7 @@ mod broadcaster; mod sender; use std::fmt; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use crate::model::Model; @@ -28,7 +28,7 @@ use super::ReplierFn; /// however, to be instantiated as a member of a model, but rather as a /// simulation control endpoint instantiated during bench assembly. pub struct EventSource { - broadcaster: Arc>>, + broadcaster: EventBroadcaster, } impl EventSource { @@ -46,11 +46,11 @@ impl EventSource { pub fn connect(&mut self, input: F, address: impl Into>) where M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - S: Send + 'static, + F: for<'a> InputFn<'a, M, T, S> + Clone + Sync, + S: Send + Sync + 'static, { let sender = Box::new(InputSender::new(input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender); + self.broadcaster.add(sender); } /// Adds an auto-converting connection to an input port of the model @@ -65,13 +65,13 @@ impl EventSource { pub fn map_connect(&mut self, map: C, input: F, address: impl Into>) where M: Model, - C: for<'a> Fn(&'a T) -> U + Send + 'static, - F: for<'a> InputFn<'a, M, U, S> + Clone, + C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Sync + Clone, U: Send + 'static, - S: Send + 'static, + S: Send + Sync + 'static, { let sender = Box::new(MapInputSender::new(map, input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender); + self.broadcaster.add(sender); } /// Adds an auto-converting, filtered connection to an input port of the @@ -90,22 +90,19 @@ impl EventSource { address: impl Into>, ) where M: Model, - C: for<'a> Fn(&'a T) -> Option + Send + 'static, - F: for<'a> InputFn<'a, M, U, S> + Clone, + C: for<'a> Fn(&'a T) -> Option + Send + Sync + 'static, + F: for<'a> InputFn<'a, M, U, S> + Clone + Sync, U: Send + 'static, - S: Send + 'static, + S: Send + Sync + 'static, { let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); - self.broadcaster.lock().unwrap().add(sender); + self.broadcaster.add(sender); } /// Returns an action which, when processed, broadcasts an event to all /// connected input ports. - /// - /// Note that the action broadcasts the event to those models that are - /// connected to the event source at the time the action is processed. - pub fn event(&mut self, arg: T) -> Action { - let fut = self.broadcaster.lock().unwrap().broadcast(arg); + pub fn event(&self, arg: T) -> Action { + let fut = self.broadcaster.broadcast(arg); let fut = async { fut.await.unwrap_or_throw(); }; @@ -115,12 +112,9 @@ impl EventSource { /// Returns a cancellable action and a cancellation key; when processed, the /// action broadcasts an event to all connected input ports. - /// - /// Note that the action broadcasts the event to those models that are - /// connected to the event source at the time the action is processed. - pub fn keyed_event(&mut self, arg: T) -> (Action, ActionKey) { + pub fn keyed_event(&self, arg: T) -> (Action, ActionKey) { let action_key = ActionKey::new(); - let fut = self.broadcaster.lock().unwrap().broadcast(arg); + let fut = self.broadcaster.broadcast(arg); let action = Action::new(KeyedOnceAction::new( // Cancellation is ignored once the action is already spawned on the @@ -139,15 +133,12 @@ impl EventSource { /// Returns a periodically recurring action which, when processed, /// broadcasts an event to all connected input ports. - /// - /// Note that the action broadcasts the event to those models that are - /// connected to the event source at the time the action is processed. - pub fn periodic_event(&mut self, period: Duration, arg: T) -> Action { - let broadcaster = self.broadcaster.clone(); + pub fn periodic_event(self: &Arc, period: Duration, arg: T) -> Action { + let source = self.clone(); Action::new(PeriodicAction::new( || async move { - let fut = broadcaster.lock().unwrap().broadcast(arg); + let fut = source.broadcaster.broadcast(arg); fut.await.unwrap_or_throw(); }, period, @@ -157,12 +148,9 @@ impl EventSource { /// Returns a cancellable, periodically recurring action and a cancellation /// key; when processed, the action broadcasts an event to all connected /// input ports. - /// - /// Note that the action broadcasts the event to those models that are - /// connected to the event source at the time the action is processed. - pub fn keyed_periodic_event(&mut self, period: Duration, arg: T) -> (Action, ActionKey) { + pub fn keyed_periodic_event(self: &Arc, period: Duration, arg: T) -> (Action, ActionKey) { let action_key = ActionKey::new(); - let broadcaster = self.broadcaster.clone(); + let source = self.clone(); let action = Action::new(KeyedPeriodicAction::new( // Cancellation is ignored once the action is already spawned on the @@ -171,7 +159,7 @@ impl EventSource { // used outside the simulator, this shouldn't be an issue in // practice. |_| async move { - let fut = broadcaster.lock().unwrap().broadcast(arg); + let fut = source.broadcaster.broadcast(arg); fut.await.unwrap_or_throw(); }, period, @@ -185,7 +173,7 @@ impl EventSource { impl Default for EventSource { fn default() -> Self { Self { - broadcaster: Arc::new(Mutex::new(EventBroadcaster::default())), + broadcaster: EventBroadcaster::default(), } } } @@ -195,7 +183,7 @@ impl fmt::Debug for EventSource { write!( f, "Event source ({} connected ports)", - self.broadcaster.lock().unwrap().len() + self.broadcaster.len() ) } } @@ -208,7 +196,7 @@ impl fmt::Debug for EventSource { /// member of a model, but rather as a simulation monitoring endpoint /// instantiated during bench assembly. pub struct QuerySource { - broadcaster: Arc>>, + broadcaster: QueryBroadcaster, } impl QuerySource { @@ -226,11 +214,11 @@ impl QuerySource { pub fn connect(&mut self, replier: F, address: impl Into>) where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, - S: Send + 'static, + F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync, + S: Send + Sync + 'static, { let sender = Box::new(ReplierSender::new(replier, address.into().0)); - self.broadcaster.lock().unwrap().add(sender); + self.broadcaster.add(sender); } /// Adds an auto-converting connection to a replier port of the model @@ -251,12 +239,12 @@ impl QuerySource { address: impl Into>, ) where M: Model, - C: for<'a> Fn(&'a T) -> U + Send + 'static, + C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static, - F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync, U: Send + 'static, Q: Send + 'static, - S: Send + 'static, + S: Send + Sync + 'static, { let sender = Box::new(MapReplierSender::new( query_map, @@ -264,7 +252,7 @@ impl QuerySource { replier, address.into().0, )); - self.broadcaster.lock().unwrap().add(sender); + self.broadcaster.add(sender); } /// Adds an auto-converting, filtered connection to a replier port of the @@ -285,12 +273,12 @@ impl QuerySource { address: impl Into>, ) where M: Model, - C: for<'a> Fn(&'a T) -> Option + Send + 'static, + C: for<'a> Fn(&'a T) -> Option + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static, - F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync, U: Send + 'static, Q: Send + 'static, - S: Send + 'static, + S: Send + Sync + 'static, { let sender = Box::new(FilterMapReplierSender::new( query_filter_map, @@ -298,17 +286,14 @@ impl QuerySource { replier, address.into().0, )); - self.broadcaster.lock().unwrap().add(sender); + self.broadcaster.add(sender); } /// Returns an action which, when processed, broadcasts a query to all /// connected replier ports. - /// - /// Note that the action broadcasts the query to those models that are - /// connected to the query source at the time the action is processed. - pub fn query(&mut self, arg: T) -> (Action, ReplyReceiver) { + pub fn query(&self, arg: T) -> (Action, ReplyReceiver) { let (writer, reader) = slot::slot(); - let fut = self.broadcaster.lock().unwrap().broadcast(arg); + let fut = self.broadcaster.broadcast(arg); let fut = async move { let replies = fut.await.unwrap_or_throw(); let _ = writer.write(replies); @@ -323,7 +308,7 @@ impl QuerySource { impl Default for QuerySource { fn default() -> Self { Self { - broadcaster: Arc::new(Mutex::new(QueryBroadcaster::default())), + broadcaster: QueryBroadcaster::default(), } } } @@ -333,7 +318,7 @@ impl fmt::Debug for QuerySource BroadcasterInner { /// Return a list of futures broadcasting an event or query to multiple /// addresses. - fn futures(&mut self, arg: T) -> Vec> { + fn futures(&self, arg: T) -> Vec> { let mut future_states = Vec::new(); // Broadcast the message and collect all futures. - let mut iter = self.senders.iter_mut(); + let mut iter = self.senders.iter(); while let Some(sender) = iter.next() { // Move the argument for the last future to avoid undue cloning. if iter.len() == 0 { @@ -107,17 +107,14 @@ impl EventBroadcaster { } /// Broadcasts an event to all addresses. - pub(super) fn broadcast( - &mut self, - arg: T, - ) -> impl Future> + Send { + pub(super) fn broadcast(&self, arg: T) -> impl Future> + Send { enum Fut { Empty, Single(F1), Multiple(F2), } - let fut = match self.inner.senders.as_mut_slice() { + let fut = match self.inner.senders.as_slice() { // No sender. [] => Fut::Empty, // One sender at most. @@ -184,7 +181,7 @@ impl QueryBroadcaster { /// Broadcasts an event to all addresses. pub(super) fn broadcast( - &mut self, + &self, arg: T, ) -> impl Future, SendError>> + Send { enum Fut { @@ -193,7 +190,7 @@ impl QueryBroadcaster { Multiple(F2), } - let fut = match self.inner.senders.as_mut_slice() { + let fut = match self.inner.senders.as_slice() { // No sender. [] => Fut::Empty, // One sender at most. diff --git a/nexosim/src/ports/source/sender.rs b/nexosim/src/ports/source/sender.rs index 9dec914..7a82a6b 100644 --- a/nexosim/src/ports/source/sender.rs +++ b/nexosim/src/ports/source/sender.rs @@ -14,12 +14,12 @@ use crate::ports::{InputFn, ReplierFn}; pub(super) type SenderFuture = Pin> + Send>>; /// An event or query sender abstracting over the target model and input method. -pub(super) trait Sender: Send { +pub(super) trait Sender: Send + Sync { /// Asynchronously sends a message using a reference to the message. - fn send(&mut self, arg: &T) -> Option>; + fn send(&self, arg: &T) -> Option>; /// Asynchronously sends an owned message. - fn send_owned(&mut self, arg: T) -> Option> { + fn send_owned(&self, arg: T) -> Option> { self.send(&arg) } } @@ -52,15 +52,15 @@ where impl Sender for InputSender where M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, + F: for<'a> InputFn<'a, M, T, S> + Clone + Sync, T: Clone + Send + 'static, - S: Send, + S: Send + Sync, { - fn send(&mut self, arg: &T) -> Option> { + fn send(&self, arg: &T) -> Option> { self.send_owned(arg.clone()) } - fn send_owned(&mut self, arg: T) -> Option> { + fn send_owned(&self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); @@ -108,13 +108,13 @@ where impl Sender for MapInputSender where M: Model, - C: Fn(&T) -> U + Send, - F: for<'a> InputFn<'a, M, U, S> + Clone, + C: Fn(&T) -> U + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone + Sync, T: Send + 'static, U: Send + 'static, - S: Send, + S: Send + Sync, { - fn send(&mut self, arg: &T) -> Option> { + fn send(&self, arg: &T) -> Option> { let func = self.func.clone(); let arg = (self.map)(arg); let sender = self.sender.clone(); @@ -163,13 +163,13 @@ where impl Sender for FilterMapInputSender where M: Model, - C: Fn(&T) -> Option + Send, - F: for<'a> InputFn<'a, M, U, S> + Clone, + C: Fn(&T) -> Option + Send + Sync, + F: for<'a> InputFn<'a, M, U, S> + Clone + Sync, T: Send + 'static, U: Send + 'static, - S: Send, + S: Send + Sync, { - fn send(&mut self, arg: &T) -> Option> { + fn send(&self, arg: &T) -> Option> { (self.filter_map)(arg).map(|arg| { let func = self.func.clone(); let sender = self.sender.clone(); @@ -215,16 +215,16 @@ where impl Sender for ReplierSender where M: Model, - F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, + F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync, T: Clone + Send + 'static, R: Send + 'static, - S: Send, + S: Send + Sync, { - fn send(&mut self, arg: &T) -> Option> { + fn send(&self, arg: &T) -> Option> { self.send_owned(arg.clone()) } - fn send_owned(&mut self, arg: T) -> Option> { + fn send_owned(&self, arg: T) -> Option> { let func = self.func.clone(); let sender = self.sender.clone(); let (reply_sender, reply_receiver) = oneshot::channel(); @@ -283,16 +283,16 @@ where impl Sender for MapReplierSender where M: Model, - C: Fn(&T) -> U + Send, + C: Fn(&T) -> U + Send + Sync, D: Fn(Q) -> R + Send + Sync + 'static, - F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync, T: Send + 'static, R: Send + 'static, U: Send + 'static, Q: Send + 'static, - S: Send, + S: Send + Sync, { - fn send(&mut self, arg: &T) -> Option> { + fn send(&self, arg: &T) -> Option> { let func = self.func.clone(); let arg = (self.query_map)(arg); let sender = self.sender.clone(); @@ -358,16 +358,16 @@ where impl Sender for FilterMapReplierSender where M: Model, - C: Fn(&T) -> Option + Send, + C: Fn(&T) -> Option + Send + Sync, D: Fn(Q) -> R + Send + Sync + 'static, - F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync, T: Send + 'static, R: Send + 'static, U: Send + 'static, Q: Send + 'static, - S: Send, + S: Send + Sync, { - fn send(&mut self, arg: &T) -> Option> { + fn send(&self, arg: &T) -> Option> { (self.query_filter_map)(arg).map(|arg| { let func = self.func.clone(); let sender = self.sender.clone(); diff --git a/nexosim/src/registry/event_sink_registry.rs b/nexosim/src/registry/event_sink_registry.rs index a848f48..3722e6f 100644 --- a/nexosim/src/registry/event_sink_registry.rs +++ b/nexosim/src/registry/event_sink_registry.rs @@ -9,8 +9,8 @@ use crate::ports::EventSinkStream; type SerializationError = ciborium::ser::Error; -/// A registry that holds all sources and sinks meant to be accessed through -/// remote procedure calls. +/// A registry that holds all sinks meant to be accessed through remote +/// procedure calls. #[derive(Default)] pub(crate) struct EventSinkRegistry(HashMap>); diff --git a/nexosim/src/registry/event_source_registry.rs b/nexosim/src/registry/event_source_registry.rs index f22c044..86258ec 100644 --- a/nexosim/src/registry/event_source_registry.rs +++ b/nexosim/src/registry/event_source_registry.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; +use std::sync::Arc; use std::time::Duration; use ciborium; @@ -31,7 +32,7 @@ impl EventSourceRegistry { { match self.0.entry(name.into()) { Entry::Vacant(s) => { - s.insert(Box::new(source)); + s.insert(Box::new(Arc::new(source))); Ok(()) } @@ -58,14 +59,14 @@ pub(crate) trait EventSourceAny: Send + 'static { /// connected input ports. /// /// The argument is expected to conform to the serde CBOR encoding. - fn event(&mut self, serialized_arg: &[u8]) -> Result; + fn event(&self, serialized_arg: &[u8]) -> Result; /// Returns a cancellable action and a cancellation key; when processed, the /// action broadcasts an event to all connected input ports. /// /// The argument is expected to conform to the serde CBOR encoding. fn keyed_event( - &mut self, + &self, serialized_arg: &[u8], ) -> Result<(Action, ActionKey), DeserializationError>; @@ -74,7 +75,7 @@ pub(crate) trait EventSourceAny: Send + 'static { /// /// The argument is expected to conform to the serde CBOR encoding. fn periodic_event( - &mut self, + &self, period: Duration, serialized_arg: &[u8], ) -> Result; @@ -85,7 +86,7 @@ pub(crate) trait EventSourceAny: Send + 'static { /// /// The argument is expected to conform to the serde CBOR encoding. fn keyed_periodic_event( - &mut self, + &self, period: Duration, serialized_arg: &[u8], ) -> Result<(Action, ActionKey), DeserializationError>; @@ -95,28 +96,29 @@ pub(crate) trait EventSourceAny: Send + 'static { fn event_type_name(&self) -> &'static str; } -impl EventSourceAny for EventSource +impl EventSourceAny for Arc> where T: DeserializeOwned + Clone + Send + 'static, { - fn event(&mut self, serialized_arg: &[u8]) -> Result { - ciborium::from_reader(serialized_arg).map(|arg| self.event(arg)) + fn event(&self, serialized_arg: &[u8]) -> Result { + ciborium::from_reader(serialized_arg).map(|arg| EventSource::event(self, arg)) } fn keyed_event( - &mut self, + &self, serialized_arg: &[u8], ) -> Result<(Action, ActionKey), DeserializationError> { - ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg)) + ciborium::from_reader(serialized_arg).map(|arg| EventSource::keyed_event(self, arg)) } fn periodic_event( - &mut self, + &self, period: Duration, serialized_arg: &[u8], ) -> Result { - ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg)) + ciborium::from_reader(serialized_arg) + .map(|arg| EventSource::periodic_event(self, period, arg)) } fn keyed_periodic_event( - &mut self, + &self, period: Duration, serialized_arg: &[u8], ) -> Result<(Action, ActionKey), DeserializationError> { diff --git a/nexosim/src/registry/query_source_registry.rs b/nexosim/src/registry/query_source_registry.rs index b12cacb..97f9fbc 100644 --- a/nexosim/src/registry/query_source_registry.rs +++ b/nexosim/src/registry/query_source_registry.rs @@ -63,7 +63,7 @@ pub(crate) trait QuerySourceAny: Send + 'static { /// /// The argument is expected to conform to the serde CBOR encoding. fn query( - &mut self, + &self, arg: &[u8], ) -> Result<(Action, Box), DeserializationError>; @@ -82,7 +82,7 @@ where R: Serialize + Send + 'static, { fn query( - &mut self, + &self, arg: &[u8], ) -> Result<(Action, Box), DeserializationError> { ciborium::from_reader(arg).map(|arg| {