diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b8660b9..9e6b33c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: toolchain: ${{ matrix.rust }} - name: Run cargo check - run: cargo check --features="rpc grpc-service" + run: cargo check --features="grpc-service" build-wasm: name: Build wasm32 @@ -43,7 +43,7 @@ jobs: targets: wasm32-unknown-unknown - name: Run cargo build (wasm) - run: cargo build --target wasm32-unknown-unknown --features="rpc" + run: cargo build --target wasm32-unknown-unknown --features="wasm-service" test: name: Test suite @@ -56,7 +56,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run cargo test - run: cargo test --features="rpc grpc-service" + run: cargo test --features="grpc-service" loom-dry-run: name: Loom dry run @@ -69,7 +69,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Dry-run cargo test (Loom) - run: cargo test --no-run --tests --features="rpc grpc-service" + run: cargo test --no-run --tests --features="grpc-service" env: RUSTFLAGS: --cfg asynchronix_loom @@ -86,12 +86,12 @@ jobs: components: miri - name: Run cargo miri tests (single-threaded executor) - run: cargo miri test --tests --lib --features="rpc grpc-service" + run: cargo miri test --tests --lib --features="grpc-service" env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - name: Run cargo miri tests (multi-threaded executor) - run: cargo miri test --tests --lib --features="rpc grpc-service" + run: cargo miri test --tests --lib --features="grpc-service" env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 @@ -149,7 +149,7 @@ jobs: run: cargo fmt --all -- --check - name: Run cargo clippy - run: cargo clippy --features="rpc grpc-service" + run: cargo clippy --features="grpc-service" docs: name: Docs @@ -162,4 +162,4 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run cargo doc - run: cargo doc --no-deps --features="rpc grpc-service" --document-private-items + run: cargo doc --no-deps --features="grpc-service" --document-private-items diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 2b978dd..851ca50 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -61,7 +61,7 @@ rmp-serde = { version = "1.1", optional = true } serde = { version = "1", optional = true } # gRPC service dependencies. -tokio = { version = "1.0", features=["net"], optional = true } +tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true } tonic = { version = "0.11", default-features = false, features=["codegen", "prost"], optional = true } # WASM service dependencies. diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index b8a2954..f8cb141 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -414,6 +414,8 @@ pub(crate) mod macros; pub mod model; pub mod ports; #[cfg(feature = "rpc")] +pub mod registry; +#[cfg(feature = "rpc")] pub mod rpc; pub mod simulation; pub mod time; diff --git a/asynchronix/src/registry.rs b/asynchronix/src/registry.rs new file mode 100644 index 0000000..8880c80 --- /dev/null +++ b/asynchronix/src/registry.rs @@ -0,0 +1,75 @@ +//! Registry for sinks and sources. +//! +//! This module provides the `EndpointRegistry` object which associates each +//! event sink, event source and query source to a unique name. + +mod event_sink_registry; +mod event_source_registry; +mod query_source_registry; + +use serde::{de::DeserializeOwned, ser::Serialize}; + +use crate::ports::{EventSinkStream, EventSource, QuerySource}; + +pub(crate) use event_sink_registry::EventSinkRegistry; +pub(crate) use event_source_registry::EventSourceRegistry; +pub(crate) use query_source_registry::QuerySourceRegistry; + +/// A registry that holds all sources and sinks meant to be accessed through +/// bindings or remote procedure calls. +#[derive(Default, Debug)] +pub struct EndpointRegistry { + pub(crate) event_sink_registry: EventSinkRegistry, + pub(crate) event_source_registry: EventSourceRegistry, + pub(crate) query_source_registry: QuerySourceRegistry, +} + +impl EndpointRegistry { + /// Creates a new, empty registry. + pub fn new() -> Self { + Self::default() + } + + /// Adds an event source to the registry. + /// + /// If the specified name is already in use for another event source, the source + /// provided as argument is returned in the error. + pub fn add_event_source( + &mut self, + source: EventSource, + name: impl Into, + ) -> Result<(), EventSource> + where + T: DeserializeOwned + Clone + Send + 'static, + { + self.event_source_registry.add(source, name) + } + + /// Adds a query source to the registry. + /// + /// If the specified name is already in use for another query source, the + /// source provided as argument is returned in the error. + pub fn add_query_source( + &mut self, + source: QuerySource, + name: impl Into, + ) -> Result<(), QuerySource> + where + T: DeserializeOwned + Clone + Send + 'static, + R: Serialize + Send + 'static, + { + self.query_source_registry.add(source, name) + } + + /// Adds an event sink to the registry. + /// + /// If the specified name is already in use for another event sink, the + /// event sink provided as argument is returned in the error. + pub fn add_event_sink(&mut self, sink: S, name: impl Into) -> Result<(), S> + where + S: EventSinkStream + Send + 'static, + S::Item: Serialize, + { + self.event_sink_registry.add(sink, name) + } +} diff --git a/asynchronix/src/registry/event_sink_registry.rs b/asynchronix/src/registry/event_sink_registry.rs new file mode 100644 index 0000000..bac0ea8 --- /dev/null +++ b/asynchronix/src/registry/event_sink_registry.rs @@ -0,0 +1,90 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; + +use rmp_serde::encode::Error as RmpEncodeError; +use serde::Serialize; + +use crate::ports::EventSinkStream; + +/// A registry that holds all sources and sinks meant to be accessed through +/// remote procedure calls. +#[derive(Default)] +pub(crate) struct EventSinkRegistry(HashMap>); + +impl EventSinkRegistry { + /// Adds a sink to the registry. + /// + /// If the specified name is already in use for another sink, the sink + /// provided as argument is returned in the error. + pub(crate) fn add(&mut self, sink: S, name: impl Into) -> Result<(), S> + where + S: EventSinkStream + Send + 'static, + S::Item: Serialize, + { + match self.0.entry(name.into()) { + Entry::Vacant(s) => { + s.insert(Box::new(sink)); + + Ok(()) + } + Entry::Occupied(_) => Err(sink), + } + } + + /// Returns a mutable reference to the specified sink if it is in the + /// registry. + pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn EventSinkStreamAny> { + self.0.get_mut(name).map(|s| s.as_mut()) + } +} + +impl fmt::Debug for EventSinkRegistry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "EventSinkRegistry ({} sinks)", self.0.len()) + } +} + +/// A type-erased `EventSinkStream`. +pub(crate) trait EventSinkStreamAny: Send + 'static { + /// Human-readable name of the event type, as returned by + /// `any::type_name()`. + fn event_type_name(&self) -> &'static str; + + /// Starts or resumes the collection of new events. + fn open(&mut self); + + /// Pauses the collection of new events. + fn close(&mut self); + + /// Encode and collect all events in a vector. + fn collect(&mut self) -> Result>, RmpEncodeError>; +} + +impl EventSinkStreamAny for E +where + E: EventSinkStream + Send + 'static, + E::Item: Serialize, +{ + fn event_type_name(&self) -> &'static str { + std::any::type_name::() + } + + fn open(&mut self) { + self.open(); + } + + fn close(&mut self) { + self.close(); + } + + fn collect(&mut self) -> Result>, RmpEncodeError> { + self.__try_fold(Vec::new(), |mut encoded_events, event| { + rmp_serde::to_vec_named(&event).map(|encoded_event| { + encoded_events.push(encoded_event); + + encoded_events + }) + }) + } +} diff --git a/asynchronix/src/registry/event_source_registry.rs b/asynchronix/src/registry/event_source_registry.rs new file mode 100644 index 0000000..fa63e27 --- /dev/null +++ b/asynchronix/src/registry/event_source_registry.rs @@ -0,0 +1,121 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; +use std::time::Duration; + +use rmp_serde::decode::Error as RmpDecodeError; +use serde::de::DeserializeOwned; + +use crate::ports::EventSource; +use crate::simulation::{Action, ActionKey}; + +/// A registry that holds all sources and sinks meant to be accessed through +/// remote procedure calls. +#[derive(Default)] +pub(crate) struct EventSourceRegistry(HashMap>); + +impl EventSourceRegistry { + /// Adds an event source to the registry. + /// + /// If the specified name is already in use for another event source, the source + /// provided as argument is returned in the error. + pub(crate) fn add( + &mut self, + source: EventSource, + name: impl Into, + ) -> Result<(), EventSource> + where + T: DeserializeOwned + Clone + Send + 'static, + { + match self.0.entry(name.into()) { + Entry::Vacant(s) => { + s.insert(Box::new(source)); + + Ok(()) + } + Entry::Occupied(_) => Err(source), + } + } + + /// Returns a mutable reference to the specified event source if it is in + /// the registry. + pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> { + self.0.get_mut(name).map(|s| s.as_mut()) + } +} + +impl fmt::Debug for EventSourceRegistry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "EventSourceRegistry ({} sources)", self.0.len()) + } +} + +/// A type-erased `EventSource` that operates on MessagePack-encoded serialized +/// events. +pub(crate) trait EventSourceAny: Send + 'static { + /// Returns an action which, when processed, broadcasts an event to all + /// connected input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn event(&mut self, msgpack_arg: &[u8]) -> Result; + + /// Returns a cancellable action and a cancellation key; when processed, the + /// action broadcasts an event to all connected input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>; + + /// Returns a periodically recurring action which, when processed, + /// broadcasts an event to all connected input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result; + + /// Returns a cancellable, periodically recurring action and a cancellation + /// key; when processed, the action broadcasts an event to all connected + /// input ports. + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn keyed_periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result<(Action, ActionKey), RmpDecodeError>; + + /// Human-readable name of the event type, as returned by + /// `any::type_name()`. + fn event_type_name(&self) -> &'static str; +} + +impl EventSourceAny for EventSource +where + T: DeserializeOwned + Clone + Send + 'static, +{ + fn event(&mut self, msgpack_arg: &[u8]) -> Result { + rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg)) + } + fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> { + rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg)) + } + fn periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result { + rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg)) + } + fn keyed_periodic_event( + &mut self, + period: Duration, + msgpack_arg: &[u8], + ) -> Result<(Action, ActionKey), RmpDecodeError> { + rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg)) + } + fn event_type_name(&self) -> &'static str { + std::any::type_name::() + } +} diff --git a/asynchronix/src/registry/query_source_registry.rs b/asynchronix/src/registry/query_source_registry.rs new file mode 100644 index 0000000..fb18233 --- /dev/null +++ b/asynchronix/src/registry/query_source_registry.rs @@ -0,0 +1,125 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; + +use rmp_serde::decode::Error as RmpDecodeError; +use rmp_serde::encode::Error as RmpEncodeError; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::ports::{QuerySource, ReplyReceiver}; +use crate::simulation::Action; + +/// A registry that holds all sources and sinks meant to be accessed through +/// remote procedure calls. +#[derive(Default)] +pub(crate) struct QuerySourceRegistry(HashMap>); + +impl QuerySourceRegistry { + /// Adds a query source to the registry. + /// + /// If the specified name is already in use for another query source, the + /// source provided as argument is returned in the error. + pub(crate) fn add( + &mut self, + source: QuerySource, + name: impl Into, + ) -> Result<(), QuerySource> + where + T: DeserializeOwned + Clone + Send + 'static, + R: Serialize + Send + 'static, + { + match self.0.entry(name.into()) { + Entry::Vacant(s) => { + s.insert(Box::new(source)); + + Ok(()) + } + Entry::Occupied(_) => Err(source), + } + } + + /// Returns a mutable reference to the specified query source if it is in + /// the registry. + pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> { + self.0.get_mut(name).map(|s| s.as_mut()) + } +} + +impl fmt::Debug for QuerySourceRegistry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "QuerySourceRegistry ({} query sources)", self.0.len(),) + } +} + +/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized +/// queries and returns MessagePack-encoded replies. +pub(crate) trait QuerySourceAny: Send + 'static { + /// Returns an action which, when processed, broadcasts a query to all + /// connected replier ports. + /// + /// + /// The argument is expected to conform to the serde MessagePack encoding. + fn query( + &mut self, + msgpack_arg: &[u8], + ) -> Result<(Action, Box), RmpDecodeError>; + + /// Human-readable name of the request type, as returned by + /// `any::type_name()`. + fn request_type_name(&self) -> &'static str; + + /// Human-readable name of the reply type, as returned by + /// `any::type_name()`. + fn reply_type_name(&self) -> &'static str; +} + +impl QuerySourceAny for QuerySource +where + T: DeserializeOwned + Clone + Send + 'static, + R: Serialize + Send + 'static, +{ + fn query( + &mut self, + msgpack_arg: &[u8], + ) -> Result<(Action, Box), RmpDecodeError> { + rmp_serde::from_read(msgpack_arg).map(|arg| { + let (action, reply_recv) = self.query(arg); + let reply_recv: Box = Box::new(reply_recv); + + (action, reply_recv) + }) + } + + fn request_type_name(&self) -> &'static str { + std::any::type_name::() + } + + fn reply_type_name(&self) -> &'static str { + std::any::type_name::() + } +} + +/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies.. +pub(crate) trait ReplyReceiverAny { + /// Take the replies, if any, encode them and collect them in a vector. + fn take_collect(&mut self) -> Option>, RmpEncodeError>>; +} + +impl ReplyReceiverAny for ReplyReceiver { + fn take_collect(&mut self) -> Option>, RmpEncodeError>> { + let replies = self.take()?; + + let encoded_replies = (move || { + let mut encoded_replies = Vec::new(); + for reply in replies { + let encoded_reply = rmp_serde::to_vec_named(&reply)?; + encoded_replies.push(encoded_reply); + } + + Ok(encoded_replies) + })(); + + Some(encoded_replies) + } +} diff --git a/asynchronix/src/rpc.rs b/asynchronix/src/rpc.rs index 59928eb..9f0a640 100644 --- a/asynchronix/src/rpc.rs +++ b/asynchronix/src/rpc.rs @@ -4,14 +4,8 @@ mod codegen; #[cfg(feature = "grpc-service")] pub mod grpc; mod key_registry; -mod monitoring_service; -mod simulation_service; -mod sink_registry; -mod source_registry; +#[cfg(feature = "wasm-service")] +mod protobuf; +mod services; #[cfg(feature = "wasm-service")] pub mod wasm; - -pub use monitoring_service::MonitoringService; -pub use simulation_service::SimulationService; -pub use sink_registry::SinkRegistry; -pub use source_registry::SourceRegistry; diff --git a/asynchronix/src/rpc/grpc.rs b/asynchronix/src/rpc/grpc.rs index 02b4bf5..d94160c 100644 --- a/asynchronix/src/rpc/grpc.rs +++ b/asynchronix/src/rpc/grpc.rs @@ -6,11 +6,12 @@ use std::sync::MutexGuard; use tonic::{transport::Server, Request, Response, Status}; -use crate::rpc::EndpointRegistry; +use crate::registry::EndpointRegistry; use crate::simulation::SimInit; use super::codegen::simulation::*; -use super::simulation_service::SimulationService; +use super::key_registry::KeyRegistry; +use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}; /// Runs a gRPC simulation server. /// @@ -22,8 +23,10 @@ pub fn run(sim_gen: F, addr: SocketAddr) -> Result<(), Box (SimInit, EndpointRegistry) + Send + 'static, { - // Use a single-threaded server. - let rt = tokio::runtime::Builder::new_current_thread() + // Use 2 threads so that the controller and monitor services can be used + // concurrently. + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) .enable_io() .build()?; @@ -40,21 +43,37 @@ where } struct GrpcSimulationService { - inner: Mutex, + sim_gen: Mutex (SimInit, EndpointRegistry) + Send + 'static>>, + controller_service: Mutex, + monitor_service: Mutex, } impl GrpcSimulationService { - fn new(sim_gen: F) -> Self + /// Creates a new `GrpcSimulationService` without any active simulation. + /// + /// The argument is a closure that is called every time the simulation is + /// (re)started by the remote client. It must create a new `SimInit` object + /// complemented by a registry that exposes the public event and query + /// interface. + pub(crate) fn new(sim_gen: F) -> Self where F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, { Self { - inner: Mutex::new(SimulationService::new(sim_gen)), + sim_gen: Mutex::new(Box::new(sim_gen)), + controller_service: Mutex::new(ControllerService::NotStarted), + monitor_service: Mutex::new(MonitorService::NotStarted), } } - fn inner(&self) -> MutexGuard<'_, SimulationService> { - self.inner.lock().unwrap() + /// Locks the controller and returns the mutex guard. + fn controller(&self) -> MutexGuard<'_, ControllerService> { + self.controller_service.lock().unwrap() + } + + /// Locks the monitor and returns the mutex guard. + fn monitor(&self) -> MutexGuard<'_, MonitorService> { + self.monitor_service.lock().unwrap() } } @@ -63,17 +82,41 @@ impl simulation_server::Simulation for GrpcSimulationService { async fn init(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().init(request))) + let start_time = request.time.unwrap_or_default(); + let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { + let (sim_init, endpoint_registry) = (self.sim_gen.lock().unwrap())(); + let simulation = sim_init.init(start_time); + *self.controller() = ControllerService::Started { + simulation, + event_source_registry: endpoint_registry.event_source_registry, + query_source_registry: endpoint_registry.query_source_registry, + key_registry: KeyRegistry::default(), + }; + *self.monitor() = MonitorService::Started { + event_sink_registry: endpoint_registry.event_sink_registry, + }; + + init_reply::Result::Empty(()) + } else { + init_reply::Result::Error(Error { + code: ErrorCode::InvalidTime as i32, + message: "out-of-range nanosecond field".to_string(), + }) + }; + + Ok(Response::new(InitReply { + result: Some(reply), + })) } async fn time(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().time(request))) + Ok(Response::new(self.controller().time(request))) } async fn step(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().step(request))) + Ok(Response::new(self.controller().step(request))) } async fn step_until( &self, @@ -81,7 +124,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().step_until(request))) + Ok(Response::new(self.controller().step_until(request))) } async fn schedule_event( &self, @@ -89,7 +132,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().schedule_event(request))) + Ok(Response::new(self.controller().schedule_event(request))) } async fn cancel_event( &self, @@ -97,7 +140,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().cancel_event(request))) + Ok(Response::new(self.controller().cancel_event(request))) } async fn process_event( &self, @@ -105,7 +148,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().process_event(request))) + Ok(Response::new(self.controller().process_event(request))) } async fn process_query( &self, @@ -113,7 +156,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().process_query(request))) + Ok(Response::new(self.controller().process_query(request))) } async fn read_events( &self, @@ -121,7 +164,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().read_events(request))) + Ok(Response::new(self.monitor().read_events(request))) } async fn open_sink( &self, @@ -129,7 +172,7 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().open_sink(request))) + Ok(Response::new(self.monitor().open_sink(request))) } async fn close_sink( &self, @@ -137,6 +180,6 @@ impl simulation_server::Simulation for GrpcSimulationService { ) -> Result, Status> { let request = request.into_inner(); - Ok(Response::new(self.inner().close_sink(request))) + Ok(Response::new(self.monitor().close_sink(request))) } } diff --git a/asynchronix/src/rpc/protobuf.rs b/asynchronix/src/rpc/protobuf.rs new file mode 100644 index 0000000..ca1a242 --- /dev/null +++ b/asynchronix/src/rpc/protobuf.rs @@ -0,0 +1,154 @@ +use std::error; +use std::fmt; + +use bytes::Buf; +use prost::Message; + +use crate::registry::EndpointRegistry; +use crate::rpc::key_registry::KeyRegistry; +use crate::simulation::SimInit; + +use super::codegen::simulation::*; +use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}; + +/// Protobuf-based simulation manager. +/// +/// A `ProtobufService` enables the management of the lifecycle of a +/// simulation, including creating a +/// [`Simulation`](crate::simulation::Simulation), invoking its methods and +/// instantiating a new simulation. +/// +/// Its methods map the various RPC service methods defined in +/// `simulation.proto`. +pub(crate) struct ProtobufService { + sim_gen: Box (SimInit, EndpointRegistry) + Send + 'static>, + controller_service: ControllerService, + monitor_service: MonitorService, +} + +impl ProtobufService { + /// Creates a new `ProtobufService` without any active simulation. + /// + /// The argument is a closure that is called every time the simulation is + /// (re)started by the remote client. It must create a new `SimInit` object + /// complemented by a registry that exposes the public event and query + /// interface. + pub(crate) fn new(sim_gen: F) -> Self + where + F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + { + Self { + sim_gen: Box::new(sim_gen), + controller_service: ControllerService::NotStarted, + monitor_service: MonitorService::NotStarted, + } + } + + /// Processes an encoded `AnyRequest` message and returns an encoded reply. + pub(crate) fn process_request(&mut self, request_buf: B) -> Result, InvalidRequest> + where + B: Buf, + { + match AnyRequest::decode(request_buf) { + Ok(AnyRequest { request: Some(req) }) => match req { + any_request::Request::InitRequest(request) => { + Ok(self.init(request).encode_to_vec()) + } + any_request::Request::TimeRequest(request) => { + Ok(self.controller_service.time(request).encode_to_vec()) + } + any_request::Request::StepRequest(request) => { + Ok(self.controller_service.step(request).encode_to_vec()) + } + any_request::Request::StepUntilRequest(request) => { + Ok(self.controller_service.step_until(request).encode_to_vec()) + } + any_request::Request::ScheduleEventRequest(request) => Ok(self + .controller_service + .schedule_event(request) + .encode_to_vec()), + any_request::Request::CancelEventRequest(request) => Ok(self + .controller_service + .cancel_event(request) + .encode_to_vec()), + any_request::Request::ProcessEventRequest(request) => Ok(self + .controller_service + .process_event(request) + .encode_to_vec()), + any_request::Request::ProcessQueryRequest(request) => Ok(self + .controller_service + .process_query(request) + .encode_to_vec()), + any_request::Request::ReadEventsRequest(request) => { + Ok(self.monitor_service.read_events(request).encode_to_vec()) + } + any_request::Request::OpenSinkRequest(request) => { + Ok(self.monitor_service.open_sink(request).encode_to_vec()) + } + any_request::Request::CloseSinkRequest(request) => { + Ok(self.monitor_service.close_sink(request).encode_to_vec()) + } + }, + Ok(AnyRequest { request: None }) => Err(InvalidRequest { + description: "the message did not contain any request".to_string(), + }), + Err(err) => Err(InvalidRequest { + description: format!("bad request: {}", err), + }), + } + } + + /// Initialize a simulation with the provided time. + /// + /// If a simulation is already active, it is destructed and replaced with a + /// new simulation. + /// + /// If the initialization time is not provided, it is initialized with the + /// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI). + fn init(&mut self, request: InitRequest) -> InitReply { + let start_time = request.time.unwrap_or_default(); + let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { + let (sim_init, endpoint_registry) = (self.sim_gen)(); + let simulation = sim_init.init(start_time); + self.controller_service = ControllerService::Started { + simulation, + event_source_registry: endpoint_registry.event_source_registry, + query_source_registry: endpoint_registry.query_source_registry, + key_registry: KeyRegistry::default(), + }; + self.monitor_service = MonitorService::Started { + event_sink_registry: endpoint_registry.event_sink_registry, + }; + + init_reply::Result::Empty(()) + } else { + init_reply::Result::Error(Error { + code: ErrorCode::InvalidTime as i32, + message: "out-of-range nanosecond field".to_string(), + }) + }; + + InitReply { + result: Some(reply), + } + } +} + +impl fmt::Debug for ProtobufService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ProtobufService").finish_non_exhaustive() + } +} + +#[derive(Clone, Debug)] +pub(crate) struct InvalidRequest { + description: String, +} + +impl fmt::Display for InvalidRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.description) + } +} + +impl error::Error for InvalidRequest {} diff --git a/asynchronix/src/rpc/services.rs b/asynchronix/src/rpc/services.rs new file mode 100644 index 0000000..ed18e58 --- /dev/null +++ b/asynchronix/src/rpc/services.rs @@ -0,0 +1,94 @@ +mod controller_service; +mod monitor_service; + +use std::time::Duration; + +use prost_types::Timestamp; +use tai_time::MonotonicTime; + +use super::codegen::simulation::{Error, ErrorCode}; + +pub(crate) use controller_service::ControllerService; +pub(crate) use monitor_service::MonitorService; + +/// Transforms an error code and a message into a Protobuf error. +fn to_error(code: ErrorCode, message: impl Into) -> Error { + Error { + code: code as i32, + message: message.into(), + } +} + +/// An error returned when a simulation was not started. +fn simulation_not_started_error() -> Error { + to_error( + ErrorCode::SimulationNotStarted, + "the simulation was not started", + ) +} + +/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. +/// +/// This will fail if the time is outside the protobuf-specified range for +/// timestamps (0001-01-01 00:00:00 to 9999-12-31 23:59:59). +pub(crate) fn monotonic_to_timestamp(monotonic_time: MonotonicTime) -> Option { + // Unix timestamp for 0001-01-01 00:00:00, the minimum accepted by + // protobuf's specification for the `Timestamp` type. + const MIN_SECS: i64 = -62135596800; + // Unix timestamp for 9999-12-31 23:59:59, the maximum accepted by + // protobuf's specification for the `Timestamp` type. + const MAX_SECS: i64 = 253402300799; + + let secs = monotonic_time.as_secs(); + if !(MIN_SECS..=MAX_SECS).contains(&secs) { + return None; + } + + Some(Timestamp { + seconds: secs, + nanos: monotonic_time.subsec_nanos() as i32, + }) +} + +/// Attempts a cast from a protobuf `Timestamp` to a `MonotonicTime`. +/// +/// This should never fail provided that the `Timestamp` complies with the +/// protobuf specification. It can only fail if the nanosecond part is negative +/// or greater than 999'999'999. +pub(crate) fn timestamp_to_monotonic(timestamp: Timestamp) -> Option { + let nanos: u32 = timestamp.nanos.try_into().ok()?; + + MonotonicTime::new(timestamp.seconds, nanos) +} + +/// Attempts a cast from a protobuf `Duration` to a `std::time::Duration`. +/// +/// If the `Duration` complies with the protobuf specification, this can only +/// fail if the duration is negative. +pub(crate) fn to_positive_duration(duration: prost_types::Duration) -> Option { + if duration.seconds < 0 || duration.nanos < 0 { + return None; + } + + Some(Duration::new( + duration.seconds as u64, + duration.nanos as u32, + )) +} + +/// Attempts a cast from a protobuf `Duration` to a strictly positive +/// `std::time::Duration`. +/// +/// If the `Duration` complies with the protobuf specification, this can only +/// fail if the duration is negative or null. +pub(crate) fn to_strictly_positive_duration(duration: prost_types::Duration) -> Option { + if duration.seconds < 0 || duration.nanos < 0 || (duration.seconds == 0 && duration.nanos == 0) + { + return None; + } + + Some(Duration::new( + duration.seconds as u64, + duration.nanos as u32, + )) +} diff --git a/asynchronix/src/rpc/services/controller_service.rs b/asynchronix/src/rpc/services/controller_service.rs new file mode 100644 index 0000000..76d6a79 --- /dev/null +++ b/asynchronix/src/rpc/services/controller_service.rs @@ -0,0 +1,397 @@ +use std::fmt; + +use prost_types::Timestamp; + +use crate::registry::{EventSourceRegistry, QuerySourceRegistry}; +use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; +use crate::simulation::Simulation; + +use super::super::codegen::simulation::*; +use super::{ + monotonic_to_timestamp, simulation_not_started_error, timestamp_to_monotonic, to_error, + to_positive_duration, to_strictly_positive_duration, +}; + +/// Protobuf-based simulation manager. +/// +/// A `ControllerService` enables the management of the lifecycle of a +/// simulation. +/// +/// Its methods map the various RPC simulation control service methods defined +/// in `simulation.proto`. +#[allow(clippy::large_enum_variant)] +pub(crate) enum ControllerService { + NotStarted, + Started { + simulation: Simulation, + event_source_registry: EventSourceRegistry, + query_source_registry: QuerySourceRegistry, + key_registry: KeyRegistry, + }, +} + +impl ControllerService { + /// Returns the current simulation time. + pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { + let reply = match self { + Self::Started { simulation, .. } => { + if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { + time_reply::Result::Time(timestamp) + } else { + time_reply::Result::Error(to_error( + ErrorCode::SimulationTimeOutOfRange, + "the final simulation time is out of range", + )) + } + } + Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()), + }; + + TimeReply { + result: Some(reply), + } + } + + /// Advances simulation time to that of the next scheduled event, processing + /// that event as well as all other event scheduled for the same time. + /// + /// Processing is gated by a (possibly blocking) call to + /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the + /// configured simulation clock. This method blocks until all newly + /// processed events have completed. + pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply { + let reply = match self { + Self::Started { simulation, .. } => { + simulation.step(); + + if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { + step_reply::Result::Time(timestamp) + } else { + step_reply::Result::Error(to_error( + ErrorCode::SimulationTimeOutOfRange, + "the final simulation time is out of range", + )) + } + } + Self::NotStarted => step_reply::Result::Error(simulation_not_started_error()), + }; + + StepReply { + result: Some(reply), + } + } + + /// Iteratively advances the simulation time until the specified deadline, + /// as if by calling + /// [`Simulation::step()`](crate::simulation::Simulation::step) repeatedly. + /// + /// This method blocks until all events scheduled up to the specified target + /// time have completed. The simulation time upon completion is equal to the + /// specified target time, whether or not an event was scheduled for that + /// time. + pub(crate) fn step_until(&mut self, request: StepUntilRequest) -> StepUntilReply { + let reply = match self { + Self::Started { simulation, .. } => move || -> Result { + let deadline = request.deadline.ok_or(to_error( + ErrorCode::MissingArgument, + "missing deadline argument", + ))?; + + match deadline { + step_until_request::Deadline::Time(time) => { + let time = timestamp_to_monotonic(time).ok_or(to_error( + ErrorCode::InvalidTime, + "out-of-range nanosecond field", + ))?; + + simulation.step_until(time).map_err(|_| { + to_error( + ErrorCode::InvalidTime, + "the specified deadline lies in the past", + ) + })?; + } + step_until_request::Deadline::Duration(duration) => { + let duration = to_positive_duration(duration).ok_or(to_error( + ErrorCode::InvalidDuration, + "the specified deadline lies in the past", + ))?; + + simulation.step_by(duration); + } + }; + + let timestamp = monotonic_to_timestamp(simulation.time()).ok_or(to_error( + ErrorCode::SimulationTimeOutOfRange, + "the final simulation time is out of range", + ))?; + + Ok(timestamp) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + StepUntilReply { + result: Some(match reply { + Ok(timestamp) => step_until_reply::Result::Time(timestamp), + Err(error) => step_until_reply::Result::Error(error), + }), + } + } + + /// Schedules an event at a future time. + pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply { + let reply = match self { + Self::Started { + simulation, + event_source_registry, + key_registry, + .. + } => move || -> Result, Error> { + let source_name = &request.source_name; + let msgpack_event = &request.event; + let with_key = request.with_key; + let period = request + .period + .map(|period| { + to_strictly_positive_duration(period).ok_or(to_error( + ErrorCode::InvalidDuration, + "the specified event period is not strictly positive", + )) + }) + .transpose()?; + + let deadline = request.deadline.ok_or(to_error( + ErrorCode::MissingArgument, + "missing deadline argument", + ))?; + + let deadline = match deadline { + schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time) + .ok_or(to_error( + ErrorCode::InvalidTime, + "out-of-range nanosecond field", + ))?, + schedule_event_request::Deadline::Duration(duration) => { + let duration = to_strictly_positive_duration(duration).ok_or(to_error( + ErrorCode::InvalidDuration, + "the specified scheduling deadline is not in the future", + ))?; + + simulation.time() + duration + } + }; + + let source = event_source_registry.get_mut(source_name).ok_or(to_error( + ErrorCode::SourceNotFound, + "no event source is registered with the name '{}'".to_string(), + ))?; + + let (action, action_key) = match (with_key, period) { + (false, None) => source.event(msgpack_event).map(|action| (action, None)), + (false, Some(period)) => source + .periodic_event(period, msgpack_event) + .map(|action| (action, None)), + (true, None) => source + .keyed_event(msgpack_event) + .map(|(action, key)| (action, Some(key))), + (true, Some(period)) => source + .keyed_periodic_event(period, msgpack_event) + .map(|(action, key)| (action, Some(key))), + } + .map_err(|_| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the event could not be deserialized as type '{}'", + source.event_type_name() + ), + ) + })?; + + let key_id = action_key.map(|action_key| { + // Free stale keys from the registry. + key_registry.remove_expired_keys(simulation.time()); + + if period.is_some() { + key_registry.insert_eternal_key(action_key) + } else { + key_registry.insert_key(action_key, deadline) + } + }); + + simulation.process(action); + + Ok(key_id) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + ScheduleEventReply { + result: Some(match reply { + Ok(Some(key_id)) => { + let (subkey1, subkey2) = key_id.into_raw_parts(); + schedule_event_reply::Result::Key(EventKey { + subkey1: subkey1 + .try_into() + .expect("action key index is too large to be serialized"), + subkey2, + }) + } + Ok(None) => schedule_event_reply::Result::Empty(()), + Err(error) => schedule_event_reply::Result::Error(error), + }), + } + } + + /// Cancels a keyed event. + pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply { + let reply = match self { + Self::Started { + simulation, + key_registry, + .. + } => move || -> Result<(), Error> { + let key = request + .key + .ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?; + let subkey1: usize = key + .subkey1 + .try_into() + .map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?; + let subkey2 = key.subkey2; + + let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2); + + key_registry.remove_expired_keys(simulation.time()); + let key = key_registry.extract_key(key_id).ok_or(to_error( + ErrorCode::InvalidKey, + "invalid or expired event key", + ))?; + + key.cancel(); + + Ok(()) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + CancelEventReply { + result: Some(match reply { + Ok(()) => cancel_event_reply::Result::Empty(()), + Err(error) => cancel_event_reply::Result::Error(error), + }), + } + } + + /// Broadcasts an event from an event source immediately, blocking until + /// completion. + /// + /// Simulation time remains unchanged. + pub(crate) fn process_event(&mut self, request: ProcessEventRequest) -> ProcessEventReply { + let reply = match self { + Self::Started { + simulation, + event_source_registry, + .. + } => move || -> Result<(), Error> { + let source_name = &request.source_name; + let msgpack_event = &request.event; + + let source = event_source_registry.get_mut(source_name).ok_or(to_error( + ErrorCode::SourceNotFound, + "no source is registered with the name '{}'".to_string(), + ))?; + + let event = source.event(msgpack_event).map_err(|_| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the event could not be deserialized as type '{}'", + source.event_type_name() + ), + ) + })?; + + simulation.process(event); + + Ok(()) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + ProcessEventReply { + result: Some(match reply { + Ok(()) => process_event_reply::Result::Empty(()), + Err(error) => process_event_reply::Result::Error(error), + }), + } + } + + /// Broadcasts an event from an event source immediately, blocking until + /// completion. + /// + /// Simulation time remains unchanged. + pub(crate) fn process_query(&mut self, request: ProcessQueryRequest) -> ProcessQueryReply { + let reply = match self { + Self::Started { + simulation, + query_source_registry, + .. + } => move || -> Result>, Error> { + let source_name = &request.source_name; + let msgpack_request = &request.request; + + let source = query_source_registry.get_mut(source_name).ok_or(to_error( + ErrorCode::SourceNotFound, + "no source is registered with the name '{}'".to_string(), + ))?; + + let (query, mut promise) = source.query(msgpack_request).map_err(|_| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the request could not be deserialized as type '{}'", + source.request_type_name() + ), + ) + })?; + + simulation.process(query); + + let replies = promise.take_collect().ok_or(to_error( + ErrorCode::InternalError, + "a reply to the query was expected but none was available".to_string(), + ))?; + + replies.map_err(|_| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the reply could not be serialized as type '{}'", + source.reply_type_name() + ), + ) + }) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + match reply { + Ok(replies) => ProcessQueryReply { + replies, + result: Some(process_query_reply::Result::Empty(())), + }, + Err(error) => ProcessQueryReply { + replies: Vec::new(), + result: Some(process_query_reply::Result::Error(error)), + }, + } + } +} + +impl fmt::Debug for ControllerService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ControllerService").finish_non_exhaustive() + } +} diff --git a/asynchronix/src/rpc/services/monitor_service.rs b/asynchronix/src/rpc/services/monitor_service.rs new file mode 100644 index 0000000..b1e18f5 --- /dev/null +++ b/asynchronix/src/rpc/services/monitor_service.rs @@ -0,0 +1,120 @@ +use std::fmt; + +use crate::registry::EventSinkRegistry; + +use super::super::codegen::simulation::*; +use super::{simulation_not_started_error, to_error}; + +/// Protobuf-based simulation monitor. +/// +/// A `MonitorService` enables the monitoring of the event sinks of a +/// [`Simulation`](crate::simulation::Simulation). +/// +/// Its methods map the various RPC monitoring service methods defined in +/// `simulation.proto`. +pub(crate) enum MonitorService { + Started { + event_sink_registry: EventSinkRegistry, + }, + NotStarted, +} + +impl MonitorService { + /// Read all events from an event sink. + pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply { + let reply = match self { + Self::Started { + event_sink_registry, + } => move || -> Result>, Error> { + let sink_name = &request.sink_name; + + let sink = event_sink_registry.get_mut(sink_name).ok_or(to_error( + ErrorCode::SinkNotFound, + format!("no sink is registered with the name '{}'", sink_name), + ))?; + + sink.collect().map_err(|_| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the event could not be serialized from type '{}'", + sink.event_type_name() + ), + ) + }) + }(), + Self::NotStarted => Err(simulation_not_started_error()), + }; + + match reply { + Ok(events) => ReadEventsReply { + events, + result: Some(read_events_reply::Result::Empty(())), + }, + Err(error) => ReadEventsReply { + events: Vec::new(), + result: Some(read_events_reply::Result::Error(error)), + }, + } + } + + /// Opens an event sink. + pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply { + let reply = match self { + Self::Started { + event_sink_registry, + } => { + let sink_name = &request.sink_name; + + if let Some(sink) = event_sink_registry.get_mut(sink_name) { + sink.open(); + + open_sink_reply::Result::Empty(()) + } else { + open_sink_reply::Result::Error(to_error( + ErrorCode::SinkNotFound, + format!("no sink is registered with the name '{}'", sink_name), + )) + } + } + Self::NotStarted => open_sink_reply::Result::Error(simulation_not_started_error()), + }; + + OpenSinkReply { + result: Some(reply), + } + } + + /// Closes an event sink. + pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply { + let reply = match self { + Self::Started { + event_sink_registry, + } => { + let sink_name = &request.sink_name; + + if let Some(sink) = event_sink_registry.get_mut(sink_name) { + sink.close(); + + close_sink_reply::Result::Empty(()) + } else { + close_sink_reply::Result::Error(to_error( + ErrorCode::SinkNotFound, + format!("no sink is registered with the name '{}'", sink_name), + )) + } + } + Self::NotStarted => close_sink_reply::Result::Error(simulation_not_started_error()), + }; + + CloseSinkReply { + result: Some(reply), + } + } +} + +impl fmt::Debug for MonitorService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SimulationService").finish_non_exhaustive() + } +} diff --git a/asynchronix/src/rpc/simulation_service.rs b/asynchronix/src/rpc/simulation_service.rs deleted file mode 100644 index d554490..0000000 --- a/asynchronix/src/rpc/simulation_service.rs +++ /dev/null @@ -1,584 +0,0 @@ -mod management_service; -mod monitoring_service; - -use std::error; -use std::fmt; -use std::time::Duration; - -use bytes::Buf; -use prost::Message; -use prost_types::Timestamp; -use tai_time::MonotonicTime; - -use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; -use crate::rpc::{SinkRegistry, SourceRegistry}; -use crate::simulation::{SimInit, Simulation}; - -use super::codegen::simulation::*; -use super::sink_registry; - -use management_service::ManagementService; -use monitoring_service::MonitoringService; - -/// Protobuf-based simulation manager. -/// -/// A `SimulationService` enables the management of the lifecycle of a -/// simulation, including creating a -/// [`Simulation`](crate::simulation::Simulation), invoking its methods and -/// instantiating a new simulation. -/// -/// Its methods map the various RPC service methods defined in -/// `simulation.proto`. -pub struct SimulationService { - sim_gen: Box (SimInit, SourceRegistry, SinkRegistry) + Send + 'static>, - services: Option<(ManagementService, MonitoringService)>, -} - -impl SimulationService { - /// Creates a new `SimulationService` without any active simulation. - /// - /// The argument is a closure that is called every time the simulation is - /// (re)started by the remote client. It must create a new `SimInit` object - /// complemented by a registry that exposes the public event and query - /// interface. - pub fn new(sim_gen: F) -> Self - where - F: FnMut() -> (SimInit, SourceRegistry, SinkRegistry) + Send + 'static, - { - Self { - sim_gen: Box::new(sim_gen), - services: None, - } - } - - /// Processes an encoded `AnyRequest` message and returns an encoded reply. - /*pub fn process_request(&mut self, request_buf: B) -> Result, InvalidRequest> - where - B: Buf, - { - match AnyRequest::decode(request_buf) { - Ok(AnyRequest { request: Some(req) }) => match req { - any_request::Request::InitRequest(request) => { - Ok(self.init(request).encode_to_vec()) - } - any_request::Request::TimeRequest(request) => { - Ok(self.time(request).encode_to_vec()) - } - any_request::Request::StepRequest(request) => { - Ok(self.step(request).encode_to_vec()) - } - any_request::Request::StepUntilRequest(request) => { - Ok(self.step_until(request).encode_to_vec()) - } - any_request::Request::ScheduleEventRequest(request) => { - Ok(self.schedule_event(request).encode_to_vec()) - } - any_request::Request::CancelEventRequest(request) => { - Ok(self.cancel_event(request).encode_to_vec()) - } - any_request::Request::ProcessEventRequest(request) => { - Ok(self.process_event(request).encode_to_vec()) - } - any_request::Request::ProcessQueryRequest(request) => { - Ok(self.process_query(request).encode_to_vec()) - } - }, - Ok(AnyRequest { request: None }) => Err(InvalidRequest { - description: "the message did not contain any request".to_string(), - }), - Err(err) => Err(InvalidRequest { - description: format!("bad request: {}", err), - }), - } - }*/ - - /// Initialize a simulation with the provided time. - /// - /// If a simulation is already active, it is destructed and replaced with a - /// new simulation. - /// - /// If the initialization time is not provided, it is initialized with the - /// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI). - pub(crate) fn init(&mut self, request: InitRequest) -> InitReply { - let start_time = request.time.unwrap_or_default(); - let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { - let (sim_init, source_registry, sink_registry) = (self.sim_gen)(); - let simulation = sim_init.init(start_time); - self.services = Some(( - ManagementService::new(simulation, source_registry, KeyRegistry::default()), - MonitoringService::new(sink_registry), - )); - - init_reply::Result::Empty(()) - } else { - init_reply::Result::Error(Error { - code: ErrorCode::InvalidTime as i32, - message: "out-of-range nanosecond field".to_string(), - }) - }; - - InitReply { - result: Some(reply), - } - } - - /// Returns the current simulation time. - pub(crate) fn time(&mut self, request: TimeRequest) -> TimeReply { - if let Some((management_service, ..)) = &mut self.services { - management_service.time(request) - } else { - TimeReply { - result: Some(time_reply::Result::Error(simulation_not_started_error())), - } - } - } - - /// Advances simulation time to that of the next scheduled event, processing - /// that event as well as all other events scheduled for the same time. - /// - /// Processing is gated by a (possibly blocking) call to - /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the - /// configured simulation clock. This method blocks until all newly - /// processed events have completed. - pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply { - let reply = match &mut self.sim_context { - Some((simulation, ..)) => { - simulation.step(); - if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) { - step_reply::Result::Time(timestamp) - } else { - step_reply::Result::Error(Error { - code: ErrorCode::SimulationTimeOutOfRange as i32, - message: "the final simulation time is out of range".to_string(), - }) - } - } - None => step_reply::Result::Error(Error { - code: ErrorCode::SimulationNotStarted as i32, - message: "the simulation was not started".to_string(), - }), - }; - - StepReply { - result: Some(reply), - } - } - - /// Iteratively advances the simulation time until the specified deadline, - /// as if by calling - /// [`Simulation::step()`](crate::simulation::Simulation::step) repeatedly. - /// - /// This method blocks until all events scheduled up to the specified target - /// time have completed. The simulation time upon completion is equal to the - /// specified target time, whether or not an event was scheduled for that - /// time. - pub(crate) fn step_until(&mut self, request: StepUntilRequest) -> StepUntilReply { - let reply = move || -> Result { - let deadline = request - .deadline - .ok_or((ErrorCode::MissingArgument, "missing deadline argument"))?; - - let simulation = match deadline { - step_until_request::Deadline::Time(time) => { - let time = timestamp_to_monotonic(time) - .ok_or((ErrorCode::InvalidTime, "out-of-range nanosecond field"))?; - - let (simulation, ..) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started", - ))?; - - simulation.step_until(time).map_err(|_| { - ( - ErrorCode::InvalidTime, - "the specified deadline lies in the past", - ) - })?; - - simulation - } - - step_until_request::Deadline::Duration(duration) => { - let duration = to_positive_duration(duration).ok_or(( - ErrorCode::InvalidDuration, - "the specified deadline lies in the past", - ))?; - - let (simulation, ..) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started", - ))?; - - simulation.step_by(duration); - - simulation - } - }; - - let timestamp = monotonic_to_timestamp(simulation.time()).ok_or(( - ErrorCode::SimulationTimeOutOfRange, - "the final simulation time is out of range", - ))?; - - Ok(timestamp) - }(); - - StepUntilReply { - result: Some(match reply { - Ok(timestamp) => step_until_reply::Result::Time(timestamp), - Err((code, message)) => step_until_reply::Result::Error(Error { - code: code as i32, - message: message.to_string(), - }), - }), - } - } - - /// Schedules an event at a future time. - pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply { - let reply = move || -> Result, (ErrorCode, String)> { - let source_name = &request.source_name; - let msgpack_event = &request.event; - let with_key = request.with_key; - let period = request - .period - .map(|period| { - to_strictly_positive_duration(period).ok_or(( - ErrorCode::InvalidDuration, - "the specified event period is not strictly positive".to_string(), - )) - }) - .transpose()?; - - let (simulation, endpoint_registry, key_registry) = - self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let deadline = request.deadline.ok_or(( - ErrorCode::MissingArgument, - "missing deadline argument".to_string(), - ))?; - - let deadline = match deadline { - schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time) - .ok_or(( - ErrorCode::InvalidTime, - "out-of-range nanosecond field".to_string(), - ))?, - schedule_event_request::Deadline::Duration(duration) => { - let duration = to_strictly_positive_duration(duration).ok_or(( - ErrorCode::InvalidDuration, - "the specified scheduling deadline is not in the future".to_string(), - ))?; - - simulation.time() + duration - } - }; - - let source = endpoint_registry.get_event_source_mut(source_name).ok_or(( - ErrorCode::SourceNotFound, - "no event source is registered with the name '{}'".to_string(), - ))?; - - let (action, action_key) = match (with_key, period) { - (false, None) => source.event(msgpack_event).map(|action| (action, None)), - (false, Some(period)) => source - .periodic_event(period, msgpack_event) - .map(|action| (action, None)), - (true, None) => source - .keyed_event(msgpack_event) - .map(|(action, key)| (action, Some(key))), - (true, Some(period)) => source - .keyed_periodic_event(period, msgpack_event) - .map(|(action, key)| (action, Some(key))), - } - .map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the event could not be deserialized as type '{}'", - source.event_type_name() - ), - ) - })?; - - let key_id = action_key.map(|action_key| { - // Free stale keys from the registry. - key_registry.remove_expired_keys(simulation.time()); - - if period.is_some() { - key_registry.insert_eternal_key(action_key) - } else { - key_registry.insert_key(action_key, deadline) - } - }); - - simulation.process(action); - - Ok(key_id) - }(); - - ScheduleEventReply { - result: Some(match reply { - Ok(Some(key_id)) => { - let (subkey1, subkey2) = key_id.into_raw_parts(); - schedule_event_reply::Result::Key(EventKey { - subkey1: subkey1 - .try_into() - .expect("action key index is too large to be serialized"), - subkey2, - }) - } - Ok(None) => schedule_event_reply::Result::Empty(()), - Err((code, message)) => schedule_event_reply::Result::Error(Error { - code: code as i32, - message, - }), - }), - } - } - - /// Cancels a keyed event. - pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let key = request.key.ok_or(( - ErrorCode::MissingArgument, - "missing key argument".to_string(), - ))?; - let subkey1: usize = key - .subkey1 - .try_into() - .map_err(|_| (ErrorCode::InvalidKey, "invalid event key".to_string()))?; - let subkey2 = key.subkey2; - - let (simulation, _, key_registry) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2); - - key_registry.remove_expired_keys(simulation.time()); - let key = key_registry.extract_key(key_id).ok_or(( - ErrorCode::InvalidKey, - "invalid or expired event key".to_string(), - ))?; - - key.cancel(); - - Ok(()) - }(); - - CancelEventReply { - result: Some(match reply { - Ok(()) => cancel_event_reply::Result::Empty(()), - Err((code, message)) => cancel_event_reply::Result::Error(Error { - code: code as i32, - message, - }), - }), - } - } - - /// Broadcasts an event from an event source immediately, blocking until - /// completion. - /// - /// Simulation time remains unchanged. - pub(crate) fn process_event(&mut self, request: ProcessEventRequest) -> ProcessEventReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let source_name = &request.source_name; - let msgpack_event = &request.event; - - let (simulation, registry, _) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let source = registry.get_event_source_mut(source_name).ok_or(( - ErrorCode::SourceNotFound, - "no source is registered with the name '{}'".to_string(), - ))?; - - let event = source.event(msgpack_event).map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the event could not be deserialized as type '{}'", - source.event_type_name() - ), - ) - })?; - - simulation.process(event); - - Ok(()) - }(); - - ProcessEventReply { - result: Some(match reply { - Ok(()) => process_event_reply::Result::Empty(()), - Err((code, message)) => process_event_reply::Result::Error(Error { - code: code as i32, - message, - }), - }), - } - } - - /// Broadcasts an event from an event source immediately, blocking until - /// completion. - /// - /// Simulation time remains unchanged. - pub(crate) fn process_query(&mut self, request: ProcessQueryRequest) -> ProcessQueryReply { - let reply = move || -> Result>, (ErrorCode, String)> { - let source_name = &request.source_name; - let msgpack_request = &request.request; - - let (simulation, registry, _) = self.sim_context.as_mut().ok_or(( - ErrorCode::SimulationNotStarted, - "the simulation was not started".to_string(), - ))?; - - let source = registry.get_query_source_mut(source_name).ok_or(( - ErrorCode::SourceNotFound, - "no source is registered with the name '{}'".to_string(), - ))?; - - let (query, mut promise) = source.query(msgpack_request).map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the request could not be deserialized as type '{}'", - source.request_type_name() - ), - ) - })?; - - simulation.process(query); - - let replies = promise.take_collect().ok_or(( - ErrorCode::InternalError, - "a reply to the query was expected but none was available".to_string(), - ))?; - - replies.map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the reply could not be serialized as type '{}'", - source.reply_type_name() - ), - ) - }) - }(); - - match reply { - Ok(replies) => ProcessQueryReply { - replies, - result: Some(process_query_reply::Result::Empty(())), - }, - Err((code, message)) => ProcessQueryReply { - replies: Vec::new(), - result: Some(process_query_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } -} - -impl fmt::Debug for SimulationService { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SimulationService").finish_non_exhaustive() - } -} - -#[derive(Clone, Debug)] -pub struct InvalidRequest { - description: String, -} - -impl fmt::Display for InvalidRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&self.description) - } -} - -impl error::Error for InvalidRequest {} - -/// An error returned when a simulation was not started. -fn simulation_not_started_error() -> Error { - Error { - code: ErrorCode::SimulationNotStarted as i32, - message: "the simulation was not started".to_string(), - } -} - -/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. -/// -/// This will fail if the time is outside the protobuf-specified range for -/// timestamps (0001-01-01 00:00:00 to 9999-12-31 23:59:59). -fn monotonic_to_timestamp(monotonic_time: MonotonicTime) -> Option { - // Unix timestamp for 0001-01-01 00:00:00, the minimum accepted by - // protobuf's specification for the `Timestamp` type. - const MIN_SECS: i64 = -62135596800; - // Unix timestamp for 9999-12-31 23:59:59, the maximum accepted by - // protobuf's specification for the `Timestamp` type. - const MAX_SECS: i64 = 253402300799; - - let secs = monotonic_time.as_secs(); - if !(MIN_SECS..=MAX_SECS).contains(&secs) { - return None; - } - - Some(Timestamp { - seconds: secs, - nanos: monotonic_time.subsec_nanos() as i32, - }) -} - -/// Attempts a cast from a protobuf `Timestamp` to a `MonotonicTime`. -/// -/// This should never fail provided that the `Timestamp` complies with the -/// protobuf specification. It can only fail if the nanosecond part is negative -/// or greater than 999'999'999. -fn timestamp_to_monotonic(timestamp: Timestamp) -> Option { - let nanos: u32 = timestamp.nanos.try_into().ok()?; - - MonotonicTime::new(timestamp.seconds, nanos) -} - -/// Attempts a cast from a protobuf `Duration` to a `std::time::Duration`. -/// -/// If the `Duration` complies with the protobuf specification, this can only -/// fail if the duration is negative. -fn to_positive_duration(duration: prost_types::Duration) -> Option { - if duration.seconds < 0 || duration.nanos < 0 { - return None; - } - - Some(Duration::new( - duration.seconds as u64, - duration.nanos as u32, - )) -} - -/// Attempts a cast from a protobuf `Duration` to a strictly positive -/// `std::time::Duration`. -/// -/// If the `Duration` complies with the protobuf specification, this can only -/// fail if the duration is negative or null. -fn to_strictly_positive_duration(duration: prost_types::Duration) -> Option { - if duration.seconds < 0 || duration.nanos < 0 || (duration.seconds == 0 && duration.nanos == 0) - { - return None; - } - - Some(Duration::new( - duration.seconds as u64, - duration.nanos as u32, - )) -} diff --git a/asynchronix/src/rpc/simulation_service/management_service.rs b/asynchronix/src/rpc/simulation_service/management_service.rs deleted file mode 100644 index 8a55b0d..0000000 --- a/asynchronix/src/rpc/simulation_service/management_service.rs +++ /dev/null @@ -1,451 +0,0 @@ -use std::fmt; -use std::time::Duration; - -use prost_types::Timestamp; -use tai_time::MonotonicTime; - -use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; -use crate::rpc::SourceRegistry; -use crate::simulation::Simulation; - -use super::super::codegen::simulation::*; - -/// Protobuf-based simulation manager. -/// -/// A `ManagementService` enables the management of the lifecycle of a -/// simulation. -/// -/// Its methods map the various RPC management service methods defined in -/// `simulation.proto`. -pub(crate) struct ManagementService { - simulation: Simulation, - source_registry: SourceRegistry, - key_registry: KeyRegistry, -} - -impl ManagementService { - /// Creates a new `ManagementService`. - pub(crate) fn new( - simulation: Simulation, - source_registry: SourceRegistry, - key_registry: KeyRegistry, - ) -> Self { - Self { - simulation, - source_registry, - key_registry, - } - } - - /// Returns the current simulation time. - pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { - let reply = if let Some(timestamp) = monotonic_to_timestamp(self.simulation.time()) { - time_reply::Result::Time(timestamp) - } else { - time_reply::Result::Error(Error { - code: ErrorCode::SimulationTimeOutOfRange as i32, - message: "the final simulation time is out of range".to_string(), - }) - }; - - TimeReply { - result: Some(reply), - } - } - - /// Advances simulation time to that of the next scheduled event, processing - /// that event as well as all other event scheduled for the same time. - /// - /// Processing is gated by a (possibly blocking) call to - /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the - /// configured simulation clock. This method blocks until all newly - /// processed events have completed. - pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply { - self.simulation.step(); - - let reply = if let Some(timestamp) = monotonic_to_timestamp(self.simulation.time()) { - step_reply::Result::Time(timestamp) - } else { - step_reply::Result::Error(Error { - code: ErrorCode::SimulationTimeOutOfRange as i32, - message: "the final simulation time is out of range".to_string(), - }) - }; - - StepReply { - result: Some(reply), - } - } - - /// Iteratively advances the simulation time until the specified deadline, - /// as if by calling - /// [`Simulation::step()`](crate::simulation::Simulation::step) repeatedly. - /// - /// This method blocks until all events scheduled up to the specified target - /// time have completed. The simulation time upon completion is equal to the - /// specified target time, whether or not an event was scheduled for that - /// time. - pub(crate) fn step_until(&mut self, request: StepUntilRequest) -> StepUntilReply { - let reply = move || -> Result { - let deadline = request - .deadline - .ok_or((ErrorCode::MissingArgument, "missing deadline argument"))?; - - match deadline { - step_until_request::Deadline::Time(time) => { - let time = timestamp_to_monotonic(time) - .ok_or((ErrorCode::InvalidTime, "out-of-range nanosecond field"))?; - - self.simulation.step_until(time).map_err(|_| { - ( - ErrorCode::InvalidTime, - "the specified deadline lies in the past", - ) - })?; - } - step_until_request::Deadline::Duration(duration) => { - let duration = to_positive_duration(duration).ok_or(( - ErrorCode::InvalidDuration, - "the specified deadline lies in the past", - ))?; - - self.simulation.step_by(duration); - } - }; - - let timestamp = monotonic_to_timestamp(self.simulation.time()).ok_or(( - ErrorCode::SimulationTimeOutOfRange, - "the final simulation time is out of range", - ))?; - - Ok(timestamp) - }(); - - StepUntilReply { - result: Some(match reply { - Ok(timestamp) => step_until_reply::Result::Time(timestamp), - Err((code, message)) => step_until_reply::Result::Error(Error { - code: code as i32, - message: message.to_string(), - }), - }), - } - } - - /// Schedules an event at a future time. - pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply { - let reply = move || -> Result, (ErrorCode, String)> { - let source_name = &request.source_name; - let msgpack_event = &request.event; - let with_key = request.with_key; - let period = request - .period - .map(|period| { - to_strictly_positive_duration(period).ok_or(( - ErrorCode::InvalidDuration, - "the specified event period is not strictly positive".to_string(), - )) - }) - .transpose()?; - - let deadline = request.deadline.ok_or(( - ErrorCode::MissingArgument, - "missing deadline argument".to_string(), - ))?; - - let deadline = match deadline { - schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time) - .ok_or(( - ErrorCode::InvalidTime, - "out-of-range nanosecond field".to_string(), - ))?, - schedule_event_request::Deadline::Duration(duration) => { - let duration = to_strictly_positive_duration(duration).ok_or(( - ErrorCode::InvalidDuration, - "the specified scheduling deadline is not in the future".to_string(), - ))?; - - self.simulation.time() + duration - } - }; - - let source = self - .source_registry - .get_event_source_mut(source_name) - .ok_or(( - ErrorCode::SourceNotFound, - "no event source is registered with the name '{}'".to_string(), - ))?; - - let (action, action_key) = match (with_key, period) { - (false, None) => source.event(msgpack_event).map(|action| (action, None)), - (false, Some(period)) => source - .periodic_event(period, msgpack_event) - .map(|action| (action, None)), - (true, None) => source - .keyed_event(msgpack_event) - .map(|(action, key)| (action, Some(key))), - (true, Some(period)) => source - .keyed_periodic_event(period, msgpack_event) - .map(|(action, key)| (action, Some(key))), - } - .map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the event could not be deserialized as type '{}'", - source.event_type_name() - ), - ) - })?; - - let key_id = action_key.map(|action_key| { - // Free stale keys from the registry. - self.key_registry - .remove_expired_keys(self.simulation.time()); - - if period.is_some() { - self.key_registry.insert_eternal_key(action_key) - } else { - self.key_registry.insert_key(action_key, deadline) - } - }); - - self.simulation.process(action); - - Ok(key_id) - }(); - - ScheduleEventReply { - result: Some(match reply { - Ok(Some(key_id)) => { - let (subkey1, subkey2) = key_id.into_raw_parts(); - schedule_event_reply::Result::Key(EventKey { - subkey1: subkey1 - .try_into() - .expect("action key index is too large to be serialized"), - subkey2, - }) - } - Ok(None) => schedule_event_reply::Result::Empty(()), - Err((code, message)) => schedule_event_reply::Result::Error(Error { - code: code as i32, - message, - }), - }), - } - } - - /// Cancels a keyed event. - pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let key = request.key.ok_or(( - ErrorCode::MissingArgument, - "missing key argument".to_string(), - ))?; - let subkey1: usize = key - .subkey1 - .try_into() - .map_err(|_| (ErrorCode::InvalidKey, "invalid event key".to_string()))?; - let subkey2 = key.subkey2; - - let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2); - - self.key_registry - .remove_expired_keys(self.simulation.time()); - let key = self.key_registry.extract_key(key_id).ok_or(( - ErrorCode::InvalidKey, - "invalid or expired event key".to_string(), - ))?; - - key.cancel(); - - Ok(()) - }(); - - CancelEventReply { - result: Some(match reply { - Ok(()) => cancel_event_reply::Result::Empty(()), - Err((code, message)) => cancel_event_reply::Result::Error(Error { - code: code as i32, - message, - }), - }), - } - } - - /// Broadcasts an event from an event source immediately, blocking until - /// completion. - /// - /// Simulation time remains unchanged. - pub(crate) fn process_event(&mut self, request: ProcessEventRequest) -> ProcessEventReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let source_name = &request.source_name; - let msgpack_event = &request.event; - - let source = self - .source_registry - .get_event_source_mut(source_name) - .ok_or(( - ErrorCode::SourceNotFound, - "no source is registered with the name '{}'".to_string(), - ))?; - - let event = source.event(msgpack_event).map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the event could not be deserialized as type '{}'", - source.event_type_name() - ), - ) - })?; - - self.simulation.process(event); - - Ok(()) - }(); - - ProcessEventReply { - result: Some(match reply { - Ok(()) => process_event_reply::Result::Empty(()), - Err((code, message)) => process_event_reply::Result::Error(Error { - code: code as i32, - message, - }), - }), - } - } - - /// Broadcasts an event from an event source immediately, blocking until - /// completion. - /// - /// Simulation time remains unchanged. - pub(crate) fn process_query(&mut self, request: ProcessQueryRequest) -> ProcessQueryReply { - let reply = move || -> Result>, (ErrorCode, String)> { - let source_name = &request.source_name; - let msgpack_request = &request.request; - - let source = self - .source_registry - .get_query_source_mut(source_name) - .ok_or(( - ErrorCode::SourceNotFound, - "no source is registered with the name '{}'".to_string(), - ))?; - - let (query, mut promise) = source.query(msgpack_request).map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the request could not be deserialized as type '{}'", - source.request_type_name() - ), - ) - })?; - - self.simulation.process(query); - - let replies = promise.take_collect().ok_or(( - ErrorCode::InternalError, - "a reply to the query was expected but none was available".to_string(), - ))?; - - replies.map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the reply could not be serialized as type '{}'", - source.reply_type_name() - ), - ) - }) - }(); - - match reply { - Ok(replies) => ProcessQueryReply { - replies, - result: Some(process_query_reply::Result::Empty(())), - }, - Err((code, message)) => ProcessQueryReply { - replies: Vec::new(), - result: Some(process_query_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } -} - -impl fmt::Debug for ManagementService { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ManagementService").finish_non_exhaustive() - } -} - -/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. -/// -/// This will fail if the time is outside the protobuf-specified range for -/// timestamps (0001-01-01 00:00:00 to 9999-12-31 23:59:59). -fn monotonic_to_timestamp(monotonic_time: MonotonicTime) -> Option { - // Unix timestamp for 0001-01-01 00:00:00, the minimum accepted by - // protobuf's specification for the `Timestamp` type. - const MIN_SECS: i64 = -62135596800; - // Unix timestamp for 9999-12-31 23:59:59, the maximum accepted by - // protobuf's specification for the `Timestamp` type. - const MAX_SECS: i64 = 253402300799; - - let secs = monotonic_time.as_secs(); - if !(MIN_SECS..=MAX_SECS).contains(&secs) { - return None; - } - - Some(Timestamp { - seconds: secs, - nanos: monotonic_time.subsec_nanos() as i32, - }) -} - -/// Attempts a cast from a protobuf `Timestamp` to a `MonotonicTime`. -/// -/// This should never fail provided that the `Timestamp` complies with the -/// protobuf specification. It can only fail if the nanosecond part is negative -/// or greater than 999'999'999. -fn timestamp_to_monotonic(timestamp: Timestamp) -> Option { - let nanos: u32 = timestamp.nanos.try_into().ok()?; - - MonotonicTime::new(timestamp.seconds, nanos) -} - -/// Attempts a cast from a protobuf `Duration` to a `std::time::Duration`. -/// -/// If the `Duration` complies with the protobuf specification, this can only -/// fail if the duration is negative. -fn to_positive_duration(duration: prost_types::Duration) -> Option { - if duration.seconds < 0 || duration.nanos < 0 { - return None; - } - - Some(Duration::new( - duration.seconds as u64, - duration.nanos as u32, - )) -} - -/// Attempts a cast from a protobuf `Duration` to a strictly positive -/// `std::time::Duration`. -/// -/// If the `Duration` complies with the protobuf specification, this can only -/// fail if the duration is negative or null. -fn to_strictly_positive_duration(duration: prost_types::Duration) -> Option { - if duration.seconds < 0 || duration.nanos < 0 || (duration.seconds == 0 && duration.nanos == 0) - { - return None; - } - - Some(Duration::new( - duration.seconds as u64, - duration.nanos as u32, - )) -} diff --git a/asynchronix/src/rpc/simulation_service/monitoring_service.rs b/asynchronix/src/rpc/simulation_service/monitoring_service.rs deleted file mode 100644 index ddae842..0000000 --- a/asynchronix/src/rpc/simulation_service/monitoring_service.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::error; -use std::fmt; - -use crate::rpc::SinkRegistry; - -use super::super::codegen::simulation::*; - -/// Protobuf-based simulation monitor. -/// -/// A `MonitoringService` enables the monitoring of the event sinks of a -/// [`Simulation`](crate::simulation::Simulation). -/// -/// Its methods map the various RPC monitoring service methods defined in -/// `simulation.proto`. -pub(crate) struct MonitoringService { - sink_registry: SinkRegistry, -} - -impl MonitoringService { - /// Creates a new `MonitoringService`. - pub(crate) fn new(sink_registry: SinkRegistry) -> Self { - Self { sink_registry } - } - - /// Read all events from an event sink. - pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply { - let reply = move || -> Result>, (ErrorCode, String)> { - let sink_name = &request.sink_name; - - let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or(( - ErrorCode::SinkNotFound, - "no sink is registered with the name '{}'".to_string(), - ))?; - - sink.collect().map_err(|_| { - ( - ErrorCode::InvalidMessage, - format!( - "the event could not be serialized from type '{}'", - sink.event_type_name() - ), - ) - }) - }(); - - match reply { - Ok(events) => ReadEventsReply { - events, - result: Some(read_events_reply::Result::Empty(())), - }, - Err((code, message)) => ReadEventsReply { - events: Vec::new(), - result: Some(read_events_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } - - /// Opens an event sink. - pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let sink_name = &request.sink_name; - - let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or(( - ErrorCode::SinkNotFound, - "no sink is registered with the name '{}'".to_string(), - ))?; - - sink.open(); - - Ok(()) - }(); - - match reply { - Ok(()) => OpenSinkReply { - result: Some(open_sink_reply::Result::Empty(())), - }, - Err((code, message)) => OpenSinkReply { - result: Some(open_sink_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } - - /// Closes an event sink. - pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply { - let reply = move || -> Result<(), (ErrorCode, String)> { - let sink_name = &request.sink_name; - - let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or(( - ErrorCode::SinkNotFound, - "no sink is registered with the name '{}'".to_string(), - ))?; - - sink.close(); - - Ok(()) - }(); - - match reply { - Ok(()) => CloseSinkReply { - result: Some(close_sink_reply::Result::Empty(())), - }, - Err((code, message)) => CloseSinkReply { - result: Some(close_sink_reply::Result::Error(Error { - code: code as i32, - message, - })), - }, - } - } -} - -impl fmt::Debug for MonitoringService { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SimulationService").finish_non_exhaustive() - } -} diff --git a/asynchronix/src/rpc/sink_registry.rs b/asynchronix/src/rpc/sink_registry.rs deleted file mode 100644 index dc7fb0e..0000000 --- a/asynchronix/src/rpc/sink_registry.rs +++ /dev/null @@ -1,243 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::fmt; -use std::time::Duration; - -use rmp_serde::decode::Error as RmpDecodeError; -use rmp_serde::encode::Error as RmpEncodeError; -use serde::de::DeserializeOwned; -use serde::Serialize; - -use crate::ports::{EventSinkStream, EventSource, QuerySource, ReplyReceiver}; -use crate::simulation::{Action, ActionKey}; - -/// A registry that holds all sources and sinks meant to be accessed through -/// remote procedure calls. -#[derive(Default)] -pub struct SinkRegistry { - sinks: HashMap>, -} - -impl SinkRegistry { - /// Creates an empty `EndpointRegistry`. - pub fn new() -> Self { - Self::default() - } - - /// Adds a sink to the registry. - /// - /// If the specified name is already in use for another sink, the sink - /// provided as argument is returned in the error. - pub fn add_event_sink(&mut self, sink: S, name: impl Into) -> Result<(), S> - where - S: EventSinkStream + Send + 'static, - S::Item: Serialize, - { - match self.sinks.entry(name.into()) { - Entry::Vacant(s) => { - s.insert(Box::new(sink)); - - Ok(()) - } - Entry::Occupied(_) => Err(sink), - } - } - - /// Returns a mutable reference to the specified sink if it is in the - /// registry. - pub(crate) fn get_event_sink_mut(&mut self, name: &str) -> Option<&mut dyn EventSinkStreamAny> { - self.sinks.get_mut(name).map(|s| s.as_mut()) - } -} - -impl fmt::Debug for SinkRegistry { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "SinkRegistry ({} sinks)", self.sinks.len()) - } -} - -/// A type-erased `EventSource` that operates on MessagePack-encoded serialized -/// events. -pub(crate) trait EventSourceAny: Send + 'static { - /// Returns an action which, when processed, broadcasts an event to all - /// connected input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn event(&mut self, msgpack_arg: &[u8]) -> Result; - - /// Returns a cancellable action and a cancellation key; when processed, the - /// action broadcasts an event to all connected input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>; - - /// Returns a periodically recurring action which, when processed, - /// broadcasts an event to all connected input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result; - - /// Returns a cancellable, periodically recurring action and a cancellation - /// key; when processed, the action broadcasts an event to all connected - /// input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn keyed_periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result<(Action, ActionKey), RmpDecodeError>; - - /// Human-readable name of the event type, as returned by - /// `any::type_name()`. - fn event_type_name(&self) -> &'static str; -} - -impl EventSourceAny for EventSource -where - T: DeserializeOwned + Clone + Send + 'static, -{ - fn event(&mut self, msgpack_arg: &[u8]) -> Result { - rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg)) - } - fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg)) - } - fn periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result { - rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg)) - } - fn keyed_periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result<(Action, ActionKey), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg)) - } - fn event_type_name(&self) -> &'static str { - std::any::type_name::() - } -} - -/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized -/// queries and returns MessagePack-encoded replies. -pub(crate) trait QuerySourceAny: Send + 'static { - /// Returns an action which, when processed, broadcasts a query to all - /// connected replier ports. - /// - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn query( - &mut self, - msgpack_arg: &[u8], - ) -> Result<(Action, Box), RmpDecodeError>; - - /// Human-readable name of the request type, as returned by - /// `any::type_name()`. - fn request_type_name(&self) -> &'static str; - - /// Human-readable name of the reply type, as returned by - /// `any::type_name()`. - fn reply_type_name(&self) -> &'static str; -} - -impl QuerySourceAny for QuerySource -where - T: DeserializeOwned + Clone + Send + 'static, - R: Serialize + Send + 'static, -{ - fn query( - &mut self, - msgpack_arg: &[u8], - ) -> Result<(Action, Box), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| { - let (action, reply_recv) = self.query(arg); - let reply_recv: Box = Box::new(reply_recv); - - (action, reply_recv) - }) - } - - fn request_type_name(&self) -> &'static str { - std::any::type_name::() - } - - fn reply_type_name(&self) -> &'static str { - std::any::type_name::() - } -} - -/// A type-erased `EventSinkStream`. -pub(crate) trait EventSinkStreamAny: Send + 'static { - /// Human-readable name of the event type, as returned by - /// `any::type_name()`. - fn event_type_name(&self) -> &'static str; - - /// Starts or resumes the collection of new events. - fn open(&mut self); - - /// Pauses the collection of new events. - fn close(&mut self); - - /// Encode and collect all events in a vector. - fn collect(&mut self) -> Result>, RmpEncodeError>; -} - -impl EventSinkStreamAny for E -where - E: EventSinkStream + Send + 'static, - E::Item: Serialize, -{ - fn event_type_name(&self) -> &'static str { - std::any::type_name::() - } - - fn open(&mut self) { - self.open(); - } - - fn close(&mut self) { - self.close(); - } - - fn collect(&mut self) -> Result>, RmpEncodeError> { - self.__try_fold(Vec::new(), |mut encoded_events, event| { - rmp_serde::to_vec_named(&event).map(|encoded_event| { - encoded_events.push(encoded_event); - - encoded_events - }) - }) - } -} - -/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies.. -pub(crate) trait ReplyReceiverAny { - /// Take the replies, if any, encode them and collect them in a vector. - fn take_collect(&mut self) -> Option>, RmpEncodeError>>; -} - -impl ReplyReceiverAny for ReplyReceiver { - fn take_collect(&mut self) -> Option>, RmpEncodeError>> { - let replies = self.take()?; - - let encoded_replies = (move || { - let mut encoded_replies = Vec::new(); - for reply in replies { - let encoded_reply = rmp_serde::to_vec_named(&reply)?; - encoded_replies.push(encoded_reply); - } - - Ok(encoded_replies) - })(); - - Some(encoded_replies) - } -} diff --git a/asynchronix/src/rpc/source_registry.rs b/asynchronix/src/rpc/source_registry.rs deleted file mode 100644 index 3f47733..0000000 --- a/asynchronix/src/rpc/source_registry.rs +++ /dev/null @@ -1,237 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::fmt; -use std::time::Duration; - -use rmp_serde::decode::Error as RmpDecodeError; -use rmp_serde::encode::Error as RmpEncodeError; -use serde::de::DeserializeOwned; -use serde::Serialize; - -use crate::ports::{EventSource, QuerySource, ReplyReceiver}; -use crate::simulation::{Action, ActionKey}; - -/// A registry that holds all sources and sinks meant to be accessed through -/// remote procedure calls. -#[derive(Default)] -pub struct SourceRegistry { - event_sources: HashMap>, - query_sources: HashMap>, -} - -impl SourceRegistry { - /// Creates an empty `EndpointRegistry`. - pub fn new() -> Self { - Self::default() - } - - /// Adds an event source to the registry. - /// - /// If the specified name is already in use for another event source, the source - /// provided as argument is returned in the error. - pub fn add_event_source( - &mut self, - source: EventSource, - name: impl Into, - ) -> Result<(), EventSource> - where - T: DeserializeOwned + Clone + Send + 'static, - { - match self.event_sources.entry(name.into()) { - Entry::Vacant(s) => { - s.insert(Box::new(source)); - - Ok(()) - } - Entry::Occupied(_) => Err(source), - } - } - - /// Returns a mutable reference to the specified event source if it is in - /// the registry. - pub(crate) fn get_event_source_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> { - self.event_sources.get_mut(name).map(|s| s.as_mut()) - } - - /// Adds a query source to the registry. - /// - /// If the specified name is already in use for another query source, the - /// source provided as argument is returned in the error. - pub fn add_query_source( - &mut self, - source: QuerySource, - name: impl Into, - ) -> Result<(), QuerySource> - where - T: DeserializeOwned + Clone + Send + 'static, - R: Serialize + Send + 'static, - { - match self.query_sources.entry(name.into()) { - Entry::Vacant(s) => { - s.insert(Box::new(source)); - - Ok(()) - } - Entry::Occupied(_) => Err(source), - } - } - - /// Returns a mutable reference to the specified query source if it is in - /// the registry. - pub(crate) fn get_query_source_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> { - self.query_sources.get_mut(name).map(|s| s.as_mut()) - } -} - -impl fmt::Debug for SourceRegistry { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "SourceRegistry ({} event sources, {} query sources)", - self.event_sources.len(), - self.query_sources.len() - ) - } -} - -/// A type-erased `EventSource` that operates on MessagePack-encoded serialized -/// events. -pub(crate) trait EventSourceAny: Send + 'static { - /// Returns an action which, when processed, broadcasts an event to all - /// connected input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn event(&mut self, msgpack_arg: &[u8]) -> Result; - - /// Returns a cancellable action and a cancellation key; when processed, the - /// action broadcasts an event to all connected input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>; - - /// Returns a periodically recurring action which, when processed, - /// broadcasts an event to all connected input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result; - - /// Returns a cancellable, periodically recurring action and a cancellation - /// key; when processed, the action broadcasts an event to all connected - /// input ports. - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn keyed_periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result<(Action, ActionKey), RmpDecodeError>; - - /// Human-readable name of the event type, as returned by - /// `any::type_name()`. - fn event_type_name(&self) -> &'static str; -} - -impl EventSourceAny for EventSource -where - T: DeserializeOwned + Clone + Send + 'static, -{ - fn event(&mut self, msgpack_arg: &[u8]) -> Result { - rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg)) - } - fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg)) - } - fn periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result { - rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg)) - } - fn keyed_periodic_event( - &mut self, - period: Duration, - msgpack_arg: &[u8], - ) -> Result<(Action, ActionKey), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg)) - } - fn event_type_name(&self) -> &'static str { - std::any::type_name::() - } -} - -/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized -/// queries and returns MessagePack-encoded replies. -pub(crate) trait QuerySourceAny: Send + 'static { - /// Returns an action which, when processed, broadcasts a query to all - /// connected replier ports. - /// - /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn query( - &mut self, - msgpack_arg: &[u8], - ) -> Result<(Action, Box), RmpDecodeError>; - - /// Human-readable name of the request type, as returned by - /// `any::type_name()`. - fn request_type_name(&self) -> &'static str; - - /// Human-readable name of the reply type, as returned by - /// `any::type_name()`. - fn reply_type_name(&self) -> &'static str; -} - -impl QuerySourceAny for QuerySource -where - T: DeserializeOwned + Clone + Send + 'static, - R: Serialize + Send + 'static, -{ - fn query( - &mut self, - msgpack_arg: &[u8], - ) -> Result<(Action, Box), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| { - let (action, reply_recv) = self.query(arg); - let reply_recv: Box = Box::new(reply_recv); - - (action, reply_recv) - }) - } - - fn request_type_name(&self) -> &'static str { - std::any::type_name::() - } - - fn reply_type_name(&self) -> &'static str { - std::any::type_name::() - } -} - -/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies.. -pub(crate) trait ReplyReceiverAny { - /// Take the replies, if any, encode them and collect them in a vector. - fn take_collect(&mut self) -> Option>, RmpEncodeError>>; -} - -impl ReplyReceiverAny for ReplyReceiver { - fn take_collect(&mut self) -> Option>, RmpEncodeError>> { - let replies = self.take()?; - - let encoded_replies = (move || { - let mut encoded_replies = Vec::new(); - for reply in replies { - let encoded_reply = rmp_serde::to_vec_named(&reply)?; - encoded_replies.push(encoded_reply); - } - - Ok(encoded_replies) - })(); - - Some(encoded_replies) - } -} diff --git a/asynchronix/src/rpc/wasm.rs b/asynchronix/src/rpc/wasm.rs index 6c43f95..dc443a6 100644 --- a/asynchronix/src/rpc/wasm.rs +++ b/asynchronix/src/rpc/wasm.rs @@ -20,9 +20,11 @@ use wasm_bindgen::prelude::*; -use super::{SimulationService, SinkRegistry}; +use crate::registry::EndpointRegistry; use crate::simulation::SimInit; +use super::protobuf::ProtobufService; + /// A simulation service that can be used from JavaScript. /// /// This would typically be used by implementing a `run` function in Rust and @@ -49,7 +51,7 @@ use crate::simulation::SimInit; /// ``` #[wasm_bindgen(js_name = SimulationService)] #[derive(Debug)] -pub struct WasmSimulationService(SimulationService); +pub struct WasmSimulationService(ProtobufService); #[wasm_bindgen(js_class = SimulationService)] impl WasmSimulationService { @@ -75,8 +77,8 @@ impl WasmSimulationService { /// interface. pub fn new(sim_gen: F) -> Self where - F: FnMut() -> (SimInit, SinkRegistry) + Send + 'static, + F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, { - Self(SimulationService::new(sim_gen)) + Self(ProtobufService::new(sim_gen)) } }