diff --git a/asynchronix/src/rpc.rs b/asynchronix/src/rpc.rs index 9506bfa..59928eb 100644 --- a/asynchronix/src/rpc.rs +++ b/asynchronix/src/rpc.rs @@ -1,13 +1,17 @@ //! Simulation management through remote procedure calls. mod codegen; -mod endpoint_registry; #[cfg(feature = "grpc-service")] pub mod grpc; mod key_registry; +mod monitoring_service; mod simulation_service; +mod sink_registry; +mod source_registry; #[cfg(feature = "wasm-service")] pub mod wasm; -pub use endpoint_registry::EndpointRegistry; +pub use monitoring_service::MonitoringService; pub use simulation_service::SimulationService; +pub use sink_registry::SinkRegistry; +pub use source_registry::SourceRegistry; diff --git a/asynchronix/src/rpc/simulation_service.rs b/asynchronix/src/rpc/simulation_service.rs index 9c53001..d554490 100644 --- a/asynchronix/src/rpc/simulation_service.rs +++ b/asynchronix/src/rpc/simulation_service.rs @@ -1,3 +1,6 @@ +mod management_service; +mod monitoring_service; + use std::error; use std::fmt; use std::time::Duration; @@ -8,10 +11,14 @@ use prost_types::Timestamp; use tai_time::MonotonicTime; use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; -use crate::rpc::EndpointRegistry; +use crate::rpc::{SinkRegistry, SourceRegistry}; use crate::simulation::{SimInit, Simulation}; use super::codegen::simulation::*; +use super::sink_registry; + +use management_service::ManagementService; +use monitoring_service::MonitoringService; /// Protobuf-based simulation manager. /// @@ -23,8 +30,8 @@ use super::codegen::simulation::*; /// Its methods map the various RPC service methods defined in /// `simulation.proto`. pub struct SimulationService { - sim_gen: Box (SimInit, EndpointRegistry) + Send + 'static>, - sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>, + sim_gen: Box (SimInit, SourceRegistry, SinkRegistry) + Send + 'static>, + services: Option<(ManagementService, MonitoringService)>, } impl SimulationService { @@ -36,16 +43,16 @@ impl SimulationService { /// interface. pub fn new(sim_gen: F) -> Self where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + F: FnMut() -> (SimInit, SourceRegistry, SinkRegistry) + Send + 'static, { Self { sim_gen: Box::new(sim_gen), - sim_context: None, + services: None, } } /// Processes an encoded `AnyRequest` message and returns an encoded reply. - pub fn process_request(&mut self, request_buf: B) -> Result, InvalidRequest> + /*pub fn process_request(&mut self, request_buf: B) -> Result, InvalidRequest> where B: Buf, { @@ -75,15 +82,6 @@ impl SimulationService { any_request::Request::ProcessQueryRequest(request) => { Ok(self.process_query(request).encode_to_vec()) } - any_request::Request::ReadEventsRequest(request) => { - Ok(self.read_events(request).encode_to_vec()) - } - any_request::Request::OpenSinkRequest(request) => { - Ok(self.open_sink(request).encode_to_vec()) - } - any_request::Request::CloseSinkRequest(request) => { - Ok(self.close_sink(request).encode_to_vec()) - } }, Ok(AnyRequest { request: None }) => Err(InvalidRequest { description: "the message did not contain any request".to_string(), @@ -92,7 +90,7 @@ impl SimulationService { description: format!("bad request: {}", err), }), } - } + }*/ /// Initialize a simulation with the provided time. /// @@ -104,9 +102,12 @@ impl SimulationService { pub(crate) fn init(&mut self, request: InitRequest) -> InitReply { let start_time = request.time.unwrap_or_default(); let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { - let (sim_init, endpoint_registry) = (self.sim_gen)(); + let (sim_init, source_registry, sink_registry) = (self.sim_gen)(); let simulation = sim_init.init(start_time); - self.sim_context = Some((simulation, endpoint_registry, KeyRegistry::default())); + self.services = Some(( + ManagementService::new(simulation, source_registry, KeyRegistry::default()), + MonitoringService::new(sink_registry), + )); init_reply::Result::Empty(()) } else { @@ -122,31 +123,18 @@ impl SimulationService { } /// Returns the current simulation time. - pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { - let reply = match &self.sim_context { - Some((simulation, ..)) => { - if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { - time_reply::Result::Time(timestamp) - } else { - time_reply::Result::Error(Error { - code: ErrorCode::SimulationTimeOutOfRange as i32, - message: "the final simulation time is out of range".to_string(), - }) - } + pub(crate) fn time(&mut self, request: TimeRequest) -> TimeReply { + if let Some((management_service, ..)) = &mut self.services { + management_service.time(request) + } else { + TimeReply { + result: Some(time_reply::Result::Error(simulation_not_started_error())), } - None => time_reply::Result::Error(Error { - code: ErrorCode::SimulationNotStarted as i32, - message: "the simulation was not started".to_string(), - }), - }; - - TimeReply { - result: Some(reply), } } /// Advances simulation time to that of the next scheduled event, processing - /// that event as well as all other event scheduled for the same time. + /// that event as well as all other events scheduled for the same time. /// /// Processing is gated by a (possibly blocking) call to /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the @@ -500,113 +488,6 @@ impl SimulationService { }, } } - - /// Read all events from an event sink. - pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply { - let reply = move || -> Result>, (ErrorCode, String)> { - let sink_name = &request.sink_name; - - let (_, registry, _) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let sink = registry.get_event_sink_mut(sink_name).ok_or(( - ErrorCode::SinkNotFound, - "no sink is registered with the name '{}'".to_string(), - ))?; - - sink.collect().map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the event could not be serialized from type '{}'", - sink.event_type_name() - ), - ) - }) - }(); - - match reply { - Ok(events) => ReadEventsReply { - events, - result: Some(read_events_reply::Result::Empty(())), - }, - Err((code, message)) => ReadEventsReply { - events: Vec::new(), - result: Some(read_events_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } - - /// Opens an event sink. - pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let sink_name = &request.sink_name; - - let (_, registry, _) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let sink = registry.get_event_sink_mut(sink_name).ok_or(( - ErrorCode::SinkNotFound, - "no sink is registered with the name '{}'".to_string(), - ))?; - - sink.open(); - - Ok(()) - }(); - - match reply { - Ok(()) => OpenSinkReply { - result: Some(open_sink_reply::Result::Empty(())), - }, - Err((code, message)) => OpenSinkReply { - result: Some(open_sink_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } - - /// Closes an event sink. - pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let sink_name = &request.sink_name; - - let (_, registry, _) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let sink = registry.get_event_sink_mut(sink_name).ok_or(( - ErrorCode::SinkNotFound, - "no sink is registered with the name '{}'".to_string(), - ))?; - - sink.close(); - - Ok(()) - }(); - - match reply { - Ok(()) => CloseSinkReply { - result: Some(close_sink_reply::Result::Empty(())), - }, - Err((code, message)) => CloseSinkReply { - result: Some(close_sink_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } } impl fmt::Debug for SimulationService { @@ -628,6 +509,14 @@ impl fmt::Display for InvalidRequest { impl error::Error for InvalidRequest {} +/// An error returned when a simulation was not started. +fn simulation_not_started_error() -> Error { + Error { + code: ErrorCode::SimulationNotStarted as i32, + message: "the simulation was not started".to_string(), + } +} + /// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. /// /// This will fail if the time is outside the protobuf-specified range for diff --git a/asynchronix/src/rpc/simulation_service/management_service.rs b/asynchronix/src/rpc/simulation_service/management_service.rs new file mode 100644 index 0000000..8a55b0d --- /dev/null +++ b/asynchronix/src/rpc/simulation_service/management_service.rs @@ -0,0 +1,451 @@ +use std::fmt; +use std::time::Duration; + +use prost_types::Timestamp; +use tai_time::MonotonicTime; + +use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; +use crate::rpc::SourceRegistry; +use crate::simulation::Simulation; + +use super::super::codegen::simulation::*; + +/// Protobuf-based simulation manager. +/// +/// A `ManagementService` enables the management of the lifecycle of a +/// simulation. +/// +/// Its methods map the various RPC management service methods defined in +/// `simulation.proto`. +pub(crate) struct ManagementService { + simulation: Simulation, + source_registry: SourceRegistry, + key_registry: KeyRegistry, +} + +impl ManagementService { + /// Creates a new `ManagementService`. + pub(crate) fn new( + simulation: Simulation, + source_registry: SourceRegistry, + key_registry: KeyRegistry, + ) -> Self { + Self { + simulation, + source_registry, + key_registry, + } + } + + /// Returns the current simulation time. + pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { + let reply = if let Some(timestamp) = monotonic_to_timestamp(self.simulation.time()) { + time_reply::Result::Time(timestamp) + } else { + time_reply::Result::Error(Error { + code: ErrorCode::SimulationTimeOutOfRange as i32, + message: "the final simulation time is out of range".to_string(), + }) + }; + + TimeReply { + result: Some(reply), + } + } + + /// Advances simulation time to that of the next scheduled event, processing + /// that event as well as all other event scheduled for the same time. + /// + /// Processing is gated by a (possibly blocking) call to + /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the + /// configured simulation clock. This method blocks until all newly + /// processed events have completed. + pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply { + self.simulation.step(); + + let reply = if let Some(timestamp) = monotonic_to_timestamp(self.simulation.time()) { + step_reply::Result::Time(timestamp) + } else { + step_reply::Result::Error(Error { + code: ErrorCode::SimulationTimeOutOfRange as i32, + message: "the final simulation time is out of range".to_string(), + }) + }; + + StepReply { + result: Some(reply), + } + } + + /// Iteratively advances the simulation time until the specified deadline, + /// as if by calling + /// [`Simulation::step()`](crate::simulation::Simulation::step) repeatedly. + /// + /// This method blocks until all events scheduled up to the specified target + /// time have completed. The simulation time upon completion is equal to the + /// specified target time, whether or not an event was scheduled for that + /// time. + pub(crate) fn step_until(&mut self, request: StepUntilRequest) -> StepUntilReply { + let reply = move || -> Result { + let deadline = request + .deadline + .ok_or((ErrorCode::MissingArgument, "missing deadline argument"))?; + + match deadline { + step_until_request::Deadline::Time(time) => { + let time = timestamp_to_monotonic(time) + .ok_or((ErrorCode::InvalidTime, "out-of-range nanosecond field"))?; + + self.simulation.step_until(time).map_err(|_| { + ( + ErrorCode::InvalidTime, + "the specified deadline lies in the past", + ) + })?; + } + step_until_request::Deadline::Duration(duration) => { + let duration = to_positive_duration(duration).ok_or(( + ErrorCode::InvalidDuration, + "the specified deadline lies in the past", + ))?; + + self.simulation.step_by(duration); + } + }; + + let timestamp = monotonic_to_timestamp(self.simulation.time()).ok_or(( + ErrorCode::SimulationTimeOutOfRange, + "the final simulation time is out of range", + ))?; + + Ok(timestamp) + }(); + + StepUntilReply { + result: Some(match reply { + Ok(timestamp) => step_until_reply::Result::Time(timestamp), + Err((code, message)) => step_until_reply::Result::Error(Error { + code: code as i32, + message: message.to_string(), + }), + }), + } + } + + /// Schedules an event at a future time. + pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply { + let reply = move || -> Result, (ErrorCode, String)> { + let source_name = &request.source_name; + let msgpack_event = &request.event; + let with_key = request.with_key; + let period = request + .period + .map(|period| { + to_strictly_positive_duration(period).ok_or(( + ErrorCode::InvalidDuration, + "the specified event period is not strictly positive".to_string(), + )) + }) + .transpose()?; + + let deadline = request.deadline.ok_or(( + ErrorCode::MissingArgument, + "missing deadline argument".to_string(), + ))?; + + let deadline = match deadline { + schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time) + .ok_or(( + ErrorCode::InvalidTime, + "out-of-range nanosecond field".to_string(), + ))?, + schedule_event_request::Deadline::Duration(duration) => { + let duration = to_strictly_positive_duration(duration).ok_or(( + ErrorCode::InvalidDuration, + "the specified scheduling deadline is not in the future".to_string(), + ))?; + + self.simulation.time() + duration + } + }; + + let source = self + .source_registry + .get_event_source_mut(source_name) + .ok_or(( + ErrorCode::SourceNotFound, + "no event source is registered with the name '{}'".to_string(), + ))?; + + let (action, action_key) = match (with_key, period) { + (false, None) => source.event(msgpack_event).map(|action| (action, None)), + (false, Some(period)) => source + .periodic_event(period, msgpack_event) + .map(|action| (action, None)), + (true, None) => source + .keyed_event(msgpack_event) + .map(|(action, key)| (action, Some(key))), + (true, Some(period)) => source + .keyed_periodic_event(period, msgpack_event) + .map(|(action, key)| (action, Some(key))), + } + .map_err(|_| { + ( + ErrorCode::InvalidMessage, + format!( + "the event could not be deserialized as type '{}'", + source.event_type_name() + ), + ) + })?; + + let key_id = action_key.map(|action_key| { + // Free stale keys from the registry. + self.key_registry + .remove_expired_keys(self.simulation.time()); + + if period.is_some() { + self.key_registry.insert_eternal_key(action_key) + } else { + self.key_registry.insert_key(action_key, deadline) + } + }); + + self.simulation.process(action); + + Ok(key_id) + }(); + + ScheduleEventReply { + result: Some(match reply { + Ok(Some(key_id)) => { + let (subkey1, subkey2) = key_id.into_raw_parts(); + schedule_event_reply::Result::Key(EventKey { + subkey1: subkey1 + .try_into() + .expect("action key index is too large to be serialized"), + subkey2, + }) + } + Ok(None) => schedule_event_reply::Result::Empty(()), + Err((code, message)) => schedule_event_reply::Result::Error(Error { + code: code as i32, + message, + }), + }), + } + } + + /// Cancels a keyed event. + pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply { + let reply = move || -> Result<(), (ErrorCode, String)> { + let key = request.key.ok_or(( + ErrorCode::MissingArgument, + "missing key argument".to_string(), + ))?; + let subkey1: usize = key + .subkey1 + .try_into() + .map_err(|_| (ErrorCode::InvalidKey, "invalid event key".to_string()))?; + let subkey2 = key.subkey2; + + let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2); + + self.key_registry + .remove_expired_keys(self.simulation.time()); + let key = self.key_registry.extract_key(key_id).ok_or(( + ErrorCode::InvalidKey, + "invalid or expired event key".to_string(), + ))?; + + key.cancel(); + + Ok(()) + }(); + + CancelEventReply { + result: Some(match reply { + Ok(()) => cancel_event_reply::Result::Empty(()), + Err((code, message)) => cancel_event_reply::Result::Error(Error { + code: code as i32, + message, + }), + }), + } + } + + /// Broadcasts an event from an event source immediately, blocking until + /// completion. + /// + /// Simulation time remains unchanged. + pub(crate) fn process_event(&mut self, request: ProcessEventRequest) -> ProcessEventReply { + let reply = move || -> Result<(), (ErrorCode, String)> { + let source_name = &request.source_name; + let msgpack_event = &request.event; + + let source = self + .source_registry + .get_event_source_mut(source_name) + .ok_or(( + ErrorCode::SourceNotFound, + "no source is registered with the name '{}'".to_string(), + ))?; + + let event = source.event(msgpack_event).map_err(|_| { + ( + ErrorCode::InvalidMessage, + format!( + "the event could not be deserialized as type '{}'", + source.event_type_name() + ), + ) + })?; + + self.simulation.process(event); + + Ok(()) + }(); + + ProcessEventReply { + result: Some(match reply { + Ok(()) => process_event_reply::Result::Empty(()), + Err((code, message)) => process_event_reply::Result::Error(Error { + code: code as i32, + message, + }), + }), + } + } + + /// Broadcasts an event from an event source immediately, blocking until + /// completion. + /// + /// Simulation time remains unchanged. + pub(crate) fn process_query(&mut self, request: ProcessQueryRequest) -> ProcessQueryReply { + let reply = move || -> Result>, (ErrorCode, String)> { + let source_name = &request.source_name; + let msgpack_request = &request.request; + + let source = self + .source_registry + .get_query_source_mut(source_name) + .ok_or(( + ErrorCode::SourceNotFound, + "no source is registered with the name '{}'".to_string(), + ))?; + + let (query, mut promise) = source.query(msgpack_request).map_err(|_| { + ( + ErrorCode::InvalidMessage, + format!( + "the request could not be deserialized as type '{}'", + source.request_type_name() + ), + ) + })?; + + self.simulation.process(query); + + let replies = promise.take_collect().ok_or(( + ErrorCode::InternalError, + "a reply to the query was expected but none was available".to_string(), + ))?; + + replies.map_err(|_| { + ( + ErrorCode::InvalidMessage, + format!( + "the reply could not be serialized as type '{}'", + source.reply_type_name() + ), + ) + }) + }(); + + match reply { + Ok(replies) => ProcessQueryReply { + replies, + result: Some(process_query_reply::Result::Empty(())), + }, + Err((code, message)) => ProcessQueryReply { + replies: Vec::new(), + result: Some(process_query_reply::Result::Error(Error { + code: code as i32, + message, + })), + }, + } + } +} + +impl fmt::Debug for ManagementService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ManagementService").finish_non_exhaustive() + } +} + +/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. +/// +/// This will fail if the time is outside the protobuf-specified range for +/// timestamps (0001-01-01 00:00:00 to 9999-12-31 23:59:59). +fn monotonic_to_timestamp(monotonic_time: MonotonicTime) -> Option { + // Unix timestamp for 0001-01-01 00:00:00, the minimum accepted by + // protobuf's specification for the `Timestamp` type. + const MIN_SECS: i64 = -62135596800; + // Unix timestamp for 9999-12-31 23:59:59, the maximum accepted by + // protobuf's specification for the `Timestamp` type. + const MAX_SECS: i64 = 253402300799; + + let secs = monotonic_time.as_secs(); + if !(MIN_SECS..=MAX_SECS).contains(&secs) { + return None; + } + + Some(Timestamp { + seconds: secs, + nanos: monotonic_time.subsec_nanos() as i32, + }) +} + +/// Attempts a cast from a protobuf `Timestamp` to a `MonotonicTime`. +/// +/// This should never fail provided that the `Timestamp` complies with the +/// protobuf specification. It can only fail if the nanosecond part is negative +/// or greater than 999'999'999. +fn timestamp_to_monotonic(timestamp: Timestamp) -> Option { + let nanos: u32 = timestamp.nanos.try_into().ok()?; + + MonotonicTime::new(timestamp.seconds, nanos) +} + +/// Attempts a cast from a protobuf `Duration` to a `std::time::Duration`. +/// +/// If the `Duration` complies with the protobuf specification, this can only +/// fail if the duration is negative. +fn to_positive_duration(duration: prost_types::Duration) -> Option { + if duration.seconds < 0 || duration.nanos < 0 { + return None; + } + + Some(Duration::new( + duration.seconds as u64, + duration.nanos as u32, + )) +} + +/// Attempts a cast from a protobuf `Duration` to a strictly positive +/// `std::time::Duration`. +/// +/// If the `Duration` complies with the protobuf specification, this can only +/// fail if the duration is negative or null. +fn to_strictly_positive_duration(duration: prost_types::Duration) -> Option { + if duration.seconds < 0 || duration.nanos < 0 || (duration.seconds == 0 && duration.nanos == 0) + { + return None; + } + + Some(Duration::new( + duration.seconds as u64, + duration.nanos as u32, + )) +} diff --git a/asynchronix/src/rpc/simulation_service/monitoring_service.rs b/asynchronix/src/rpc/simulation_service/monitoring_service.rs new file mode 100644 index 0000000..ddae842 --- /dev/null +++ b/asynchronix/src/rpc/simulation_service/monitoring_service.rs @@ -0,0 +1,122 @@ +use std::error; +use std::fmt; + +use crate::rpc::SinkRegistry; + +use super::super::codegen::simulation::*; + +/// Protobuf-based simulation monitor. +/// +/// A `MonitoringService` enables the monitoring of the event sinks of a +/// [`Simulation`](crate::simulation::Simulation). +/// +/// Its methods map the various RPC monitoring service methods defined in +/// `simulation.proto`. +pub(crate) struct MonitoringService { + sink_registry: SinkRegistry, +} + +impl MonitoringService { + /// Creates a new `MonitoringService`. + pub(crate) fn new(sink_registry: SinkRegistry) -> Self { + Self { sink_registry } + } + + /// Read all events from an event sink. + pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply { + let reply = move || -> Result>, (ErrorCode, String)> { + let sink_name = &request.sink_name; + + let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or(( + ErrorCode::SinkNotFound, + "no sink is registered with the name '{}'".to_string(), + ))?; + + sink.collect().map_err(|_| { + ( + ErrorCode::InvalidMessage, + format!( + "the event could not be serialized from type '{}'", + sink.event_type_name() + ), + ) + }) + }(); + + match reply { + Ok(events) => ReadEventsReply { + events, + result: Some(read_events_reply::Result::Empty(())), + }, + Err((code, message)) => ReadEventsReply { + events: Vec::new(), + result: Some(read_events_reply::Result::Error(Error { + code: code as i32, + message, + })), + }, + } + } + + /// Opens an event sink. + pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply { + let reply = move || -> Result<(), (ErrorCode, String)> { + let sink_name = &request.sink_name; + + let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or(( + ErrorCode::SinkNotFound, + "no sink is registered with the name '{}'".to_string(), + ))?; + + sink.open(); + + Ok(()) + }(); + + match reply { + Ok(()) => OpenSinkReply { + result: Some(open_sink_reply::Result::Empty(())), + }, + Err((code, message)) => OpenSinkReply { + result: Some(open_sink_reply::Result::Error(Error { + code: code as i32, + message, + })), + }, + } + } + + /// Closes an event sink. + pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply { + let reply = move || -> Result<(), (ErrorCode, String)> { + let sink_name = &request.sink_name; + + let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or(( + ErrorCode::SinkNotFound, + "no sink is registered with the name '{}'".to_string(), + ))?; + + sink.close(); + + Ok(()) + }(); + + match reply { + Ok(()) => CloseSinkReply { + result: Some(close_sink_reply::Result::Empty(())), + }, + Err((code, message)) => CloseSinkReply { + result: Some(close_sink_reply::Result::Error(Error { + code: code as i32, + message, + })), + }, + } + } +} + +impl fmt::Debug for MonitoringService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SimulationService").finish_non_exhaustive() + } +} diff --git a/asynchronix/src/rpc/endpoint_registry.rs b/asynchronix/src/rpc/sink_registry.rs similarity index 77% rename from asynchronix/src/rpc/endpoint_registry.rs rename to asynchronix/src/rpc/sink_registry.rs index 67b5821..dc7fb0e 100644 --- a/asynchronix/src/rpc/endpoint_registry.rs +++ b/asynchronix/src/rpc/sink_registry.rs @@ -14,75 +14,16 @@ use crate::simulation::{Action, ActionKey}; /// A registry that holds all sources and sinks meant to be accessed through /// remote procedure calls. #[derive(Default)] -pub struct EndpointRegistry { - event_sources: HashMap>, - query_sources: HashMap>, +pub struct SinkRegistry { sinks: HashMap>, } -impl EndpointRegistry { +impl SinkRegistry { /// Creates an empty `EndpointRegistry`. pub fn new() -> Self { Self::default() } - /// Adds an event source to the registry. - /// - /// If the specified name is already in use for another event source, the source - /// provided as argument is returned in the error. - pub fn add_event_source( - &mut self, - source: EventSource, - name: impl Into, - ) -> Result<(), EventSource> - where - T: DeserializeOwned + Clone + Send + 'static, - { - match self.event_sources.entry(name.into()) { - Entry::Vacant(s) => { - s.insert(Box::new(source)); - - Ok(()) - } - Entry::Occupied(_) => Err(source), - } - } - - /// Returns a mutable reference to the specified event source if it is in - /// the registry. - pub(crate) fn get_event_source_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> { - self.event_sources.get_mut(name).map(|s| s.as_mut()) - } - - /// Adds a query source to the registry. - /// - /// If the specified name is already in use for another query source, the - /// source provided as argument is returned in the error. - pub fn add_query_source( - &mut self, - source: QuerySource, - name: impl Into, - ) -> Result<(), QuerySource> - where - T: DeserializeOwned + Clone + Send + 'static, - R: Serialize + Send + 'static, - { - match self.query_sources.entry(name.into()) { - Entry::Vacant(s) => { - s.insert(Box::new(source)); - - Ok(()) - } - Entry::Occupied(_) => Err(source), - } - } - - /// Returns a mutable reference to the specified query source if it is in - /// the registry. - pub(crate) fn get_query_source_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> { - self.query_sources.get_mut(name).map(|s| s.as_mut()) - } - /// Adds a sink to the registry. /// /// If the specified name is already in use for another sink, the sink @@ -109,14 +50,9 @@ impl EndpointRegistry { } } -impl fmt::Debug for EndpointRegistry { +impl fmt::Debug for SinkRegistry { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "EndpointRegistry ({} sources, {} sinks)", - self.event_sources.len(), - self.sinks.len() - ) + write!(f, "SinkRegistry ({} sinks)", self.sinks.len()) } } diff --git a/asynchronix/src/rpc/source_registry.rs b/asynchronix/src/rpc/source_registry.rs new file mode 100644 index 0000000..3f47733 --- /dev/null +++ b/asynchronix/src/rpc/source_registry.rs @@ -0,0 +1,237 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; +use std::time::Duration; + +use rmp_serde::decode::Error as RmpDecodeError; +use rmp_serde::encode::Error as RmpEncodeError; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::ports::{EventSource, QuerySource, ReplyReceiver}; +use crate::simulation::{Action, ActionKey}; + +/// A registry that holds all sources and sinks meant to be accessed through +/// remote procedure calls. +#[derive(Default)] +pub struct SourceRegistry { + event_sources: HashMap>, + query_sources: HashMap>, +} + +impl SourceRegistry { + /// Creates an empty `EndpointRegistry`. + pub fn new() -> Self { + Self::default() + } + + /// Adds an event source to the registry. + /// + /// If the specified name is already in use for another event source, the source + /// provided as argument is returned in the error. + pub fn add_event_source( + &mut self, + source: EventSource, + name: impl Into, + ) -> Result<(), EventSource> + where + T: DeserializeOwned + Clone + Send + 'static, + { + match self.event_sources.entry(name.into()) { + Entry::Vacant(s) => { + s.insert(Box::new(source)); + + Ok(()) + } + Entry::Occupied(_) => Err(source), + } + } + + /// Returns a mutable reference to the specified event source if it is in + /// the registry. + pub(crate) fn get_event_source_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> { + self.event_sources.get_mut(name).map(|s| s.as_mut()) + } + + /// Adds a query source to the registry. + /// + /// If the specified name is already in use for another query source, the + /// source provided as argument is returned in the error. + pub fn add_query_source( + &mut self, + source: QuerySource, + name: impl Into, + ) -> Result<(), QuerySource> + where + T: DeserializeOwned + Clone + Send + 'static, + R: Serialize + Send + 'static, + { + match self.query_sources.entry(name.into()) { + Entry::Vacant(s) => { + s.insert(Box::new(source)); + + Ok(()) + } + Entry::Occupied(_) => Err(source), + } + } + + /// Returns a mutable reference to the specified query source if it is in + /// the registry. + pub(crate) fn get_query_source_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> { + self.query_sources.get_mut(name).map(|s| s.as_mut()) + } +} + +impl fmt::Debug for SourceRegistry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "SourceRegistry ({} event sources, {} query sources)", + self.event_sources.len(), + self.query_sources.len() + ) + } +} + +/// A type-erased `EventSource` that operates on MessagePack-encoded serialized +/// events. +pub(crate) trait EventSourceAny: Send + 'static { + /// Returns an action which, when processed, broadcasts an event to all + /// connected input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn event(&mut self, msgpack_arg: &[u8]) -> Result; + + /// Returns a cancellable action and a cancellation key; when processed, the + /// action broadcasts an event to all connected input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>; + + /// Returns a periodically recurring action which, when processed, + /// broadcasts an event to all connected input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result; + + /// Returns a cancellable, periodically recurring action and a cancellation + /// key; when processed, the action broadcasts an event to all connected + /// input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn keyed_periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result<(Action, ActionKey), RmpDecodeError>; + + /// Human-readable name of the event type, as returned by + /// `any::type_name()`. + fn event_type_name(&self) -> &'static str; +} + +impl EventSourceAny for EventSource +where + T: DeserializeOwned + Clone + Send + 'static, +{ + fn event(&mut self, msgpack_arg: &[u8]) -> Result { + rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg)) + } + fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> { + rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg)) + } + fn periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result { + rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg)) + } + fn keyed_periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result<(Action, ActionKey), RmpDecodeError> { + rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg)) + } + fn event_type_name(&self) -> &'static str { + std::any::type_name::() + } +} + +/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized +/// queries and returns MessagePack-encoded replies. +pub(crate) trait QuerySourceAny: Send + 'static { + /// Returns an action which, when processed, broadcasts a query to all + /// connected replier ports. + /// + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn query( + &mut self, + msgpack_arg: &[u8], + ) -> Result<(Action, Box), RmpDecodeError>; + + /// Human-readable name of the request type, as returned by + /// `any::type_name()`. + fn request_type_name(&self) -> &'static str; + + /// Human-readable name of the reply type, as returned by + /// `any::type_name()`. + fn reply_type_name(&self) -> &'static str; +} + +impl QuerySourceAny for QuerySource +where + T: DeserializeOwned + Clone + Send + 'static, + R: Serialize + Send + 'static, +{ + fn query( + &mut self, + msgpack_arg: &[u8], + ) -> Result<(Action, Box), RmpDecodeError> { + rmp_serde::from_read(msgpack_arg).map(|arg| { + let (action, reply_recv) = self.query(arg); + let reply_recv: Box = Box::new(reply_recv); + + (action, reply_recv) + }) + } + + fn request_type_name(&self) -> &'static str { + std::any::type_name::() + } + + fn reply_type_name(&self) -> &'static str { + std::any::type_name::() + } +} + +/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies.. +pub(crate) trait ReplyReceiverAny { + /// Take the replies, if any, encode them and collect them in a vector. + fn take_collect(&mut self) -> Option>, RmpEncodeError>>; +} + +impl ReplyReceiverAny for ReplyReceiver { + fn take_collect(&mut self) -> Option>, RmpEncodeError>> { + let replies = self.take()?; + + let encoded_replies = (move || { + let mut encoded_replies = Vec::new(); + for reply in replies { + let encoded_reply = rmp_serde::to_vec_named(&reply)?; + encoded_replies.push(encoded_reply); + } + + Ok(encoded_replies) + })(); + + Some(encoded_replies) + } +} diff --git a/asynchronix/src/rpc/wasm.rs b/asynchronix/src/rpc/wasm.rs index 5526e6e..6c43f95 100644 --- a/asynchronix/src/rpc/wasm.rs +++ b/asynchronix/src/rpc/wasm.rs @@ -20,7 +20,7 @@ use wasm_bindgen::prelude::*; -use super::{EndpointRegistry, SimulationService}; +use super::{SimulationService, SinkRegistry}; use crate::simulation::SimInit; /// A simulation service that can be used from JavaScript. @@ -75,7 +75,7 @@ impl WasmSimulationService { /// interface. pub fn new(sim_gen: F) -> Self where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + F: FnMut() -> (SimInit, SinkRegistry) + Send + 'static, { Self(SimulationService::new(sim_gen)) } diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 6be841e..1bcd281 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -218,7 +218,7 @@ impl Simulation { } /// Advances simulation time to that of the next scheduled event, processing - /// that event as well as all other event scheduled for the same time. + /// that event as well as all other events scheduled for the same time. /// /// Processing is gated by a (possibly blocking) call to /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured diff --git a/asynchronix/src/util/cached_rw_lock.rs b/asynchronix/src/util/cached_rw_lock.rs index d2a9125..8997627 100644 --- a/asynchronix/src/util/cached_rw_lock.rs +++ b/asynchronix/src/util/cached_rw_lock.rs @@ -1,3 +1,5 @@ +//! Cached read-write lock. + use std::ops::{Deref, DerefMut}; use crate::loom_exports::sync::atomic::{AtomicUsize, Ordering};