From 6b43fcf70439d54abf144071e4cb7b2524b4540f Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sun, 8 Sep 2024 13:54:09 +0200 Subject: [PATCH] Simplify gRPC backend arch + remove wasm backend --- .github/workflows/ci.yml | 16 +- asynchronix/Cargo.toml | 36 ++--- asynchronix/build.rs | 13 +- .../src/executor/mt_executor/pool_manager.rs | 40 ----- asynchronix/src/grpc.rs | 8 + .../src/{rpc => grpc}/api/simulation.proto | 0 asynchronix/src/{rpc => grpc}/codegen.rs | 0 .../src/{rpc => grpc}/codegen/.gitkeep | 0 .../src/{rpc => grpc}/codegen/simulation.rs | 104 +++---------- asynchronix/src/{rpc => grpc}/key_registry.rs | 0 asynchronix/src/{rpc/grpc.rs => grpc/run.rs} | 0 asynchronix/src/{rpc => grpc}/services.rs | 0 .../services/controller_service.rs | 4 +- .../{rpc => grpc}/services/init_service.rs | 0 .../{rpc => grpc}/services/monitor_service.rs | 0 asynchronix/src/lib.rs | 6 +- asynchronix/src/rpc.rs | 11 -- asynchronix/src/rpc/protobuf.rs | 146 ------------------ asynchronix/src/rpc/wasm.rs | 86 ----------- 19 files changed, 60 insertions(+), 410 deletions(-) create mode 100644 asynchronix/src/grpc.rs rename asynchronix/src/{rpc => grpc}/api/simulation.proto (100%) rename asynchronix/src/{rpc => grpc}/codegen.rs (100%) rename asynchronix/src/{rpc => grpc}/codegen/.gitkeep (100%) rename asynchronix/src/{rpc => grpc}/codegen/simulation.rs (92%) rename asynchronix/src/{rpc => grpc}/key_registry.rs (100%) rename asynchronix/src/{rpc/grpc.rs => grpc/run.rs} (100%) rename asynchronix/src/{rpc => grpc}/services.rs (100%) rename asynchronix/src/{rpc => grpc}/services/controller_service.rs (99%) rename asynchronix/src/{rpc => grpc}/services/init_service.rs (100%) rename asynchronix/src/{rpc => grpc}/services/monitor_service.rs (100%) delete mode 100644 asynchronix/src/rpc.rs delete mode 100644 asynchronix/src/rpc/protobuf.rs delete mode 100644 asynchronix/src/rpc/wasm.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9e6b33c..b1c0f5d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: toolchain: ${{ matrix.rust }} - name: Run cargo check - run: cargo check --features="grpc-service" + run: cargo check --all-features build-wasm: name: Build wasm32 @@ -43,7 +43,7 @@ jobs: targets: wasm32-unknown-unknown - name: Run cargo build (wasm) - run: cargo build --target wasm32-unknown-unknown --features="wasm-service" + run: cargo build --target wasm32-unknown-unknown test: name: Test suite @@ -56,7 +56,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run cargo test - run: cargo test --features="grpc-service" + run: cargo test --all-features loom-dry-run: name: Loom dry run @@ -69,7 +69,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Dry-run cargo test (Loom) - run: cargo test --no-run --tests --features="grpc-service" + run: cargo test --no-run --tests --all-features env: RUSTFLAGS: --cfg asynchronix_loom @@ -86,12 +86,12 @@ jobs: components: miri - name: Run cargo miri tests (single-threaded executor) - run: cargo miri test --tests --lib --features="grpc-service" + run: cargo miri test --tests --lib --all-features env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - name: Run cargo miri tests (multi-threaded executor) - run: cargo miri test --tests --lib --features="grpc-service" + run: cargo miri test --tests --lib --all-features env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 @@ -149,7 +149,7 @@ jobs: run: cargo fmt --all -- --check - name: Run cargo clippy - run: cargo clippy --features="grpc-service" + run: cargo clippy --all-features docs: name: Docs @@ -162,4 +162,4 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run cargo doc - run: cargo doc --no-deps --features="grpc-service" --document-private-items + run: cargo doc --no-deps --all-features --document-private-items diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 43fa866..a9f0856 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -20,21 +20,12 @@ categories = ["simulation", "aerospace", "science"] keywords = ["simulation", "discrete-event", "systems", "cyberphysical", "real-time"] autotests = false - [features] -# Remote procedure call API. -rpc = ["dep:ciborium", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"] -# This feature forces protobuf/gRPC code (re-)generation. -rpc-codegen = ["dep:tonic-build"] # gRPC service. -grpc-service = ["rpc", "dep:tokio" , "tonic/transport"] -# wasm service. -wasm-service = ["rpc", "dep:wasm-bindgen"] -# API-unstable public exports meant for external test/benchmarking; development only. -dev-hooks = [] -# Logging of performance-related statistics; development only. -dev-logs = [] +grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"] +# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking. +dev-hooks = [] [dependencies] # Mandatory dependencies. @@ -53,19 +44,14 @@ spin_sleep = "1" st3 = "0.4" tai-time = "0.3" -# Common RPC dependencies. +# gRPC service dependencies. bytes = { version = "1", default-features = false, optional = true } -prost = { version = "0.12", optional = true } -prost-types = { version = "0.12", optional = true } +prost = { version = "0.13", optional = true } +prost-types = { version = "0.13", optional = true } ciborium = { version = "0.2.2", optional = true } serde = { version = "1", optional = true } - -# gRPC service dependencies. tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true } -tonic = { version = "0.11", default-features = false, features=["codegen", "prost"], optional = true } - -# WASM service dependencies. -wasm-bindgen = { version = "0.2", optional = true } +tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true } [dev-dependencies] atomic-wait = "1.1" @@ -77,11 +63,13 @@ mio = { version = "1.0", features = ["os-poll", "net"] } loom = "0.5" waker-fn = "1.1" -[build-dependencies] -tonic-build = { version = "0.11", optional = true } +[target.'cfg(asynchronix_grpc_codegen)'.build-dependencies] +tonic-build = { version = "0.12" } [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(asynchronix_loom)'] } +# `asynchronix_loom` flag: run loom-based tests. +# `asynchronix_grpc_codegen` flag: regenerate gRPC code from .proto definitions. +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(asynchronix_loom)', 'cfg(asynchronix_grpc_codegen)'] } [[test]] name = "integration" diff --git a/asynchronix/build.rs b/asynchronix/build.rs index f6ce175..cff7996 100644 --- a/asynchronix/build.rs +++ b/asynchronix/build.rs @@ -1,14 +1,9 @@ fn main() -> Result<(), Box> { - #[cfg(feature = "rpc-codegen")] - let builder = tonic_build::configure() + #[cfg(asynchronix_grpc_codegen)] + tonic_build::configure() .build_client(false) - .out_dir("src/rpc/codegen/"); - - #[cfg(all(feature = "rpc-codegen", not(feature = "grpc-service")))] - let builder = builder.build_server(false); - - #[cfg(feature = "rpc-codegen")] - builder.compile(&["simulation.proto"], &["src/rpc/api/"])?; + .out_dir("src/grpc/codegen/") + .compile(&["simulation.proto"], &["src/grpc/api/"])?; Ok(()) } diff --git a/asynchronix/src/executor/mt_executor/pool_manager.rs b/asynchronix/src/executor/mt_executor/pool_manager.rs index ffd082f..119c335 100644 --- a/asynchronix/src/executor/mt_executor/pool_manager.rs +++ b/asynchronix/src/executor/mt_executor/pool_manager.rs @@ -26,9 +26,6 @@ pub(super) struct PoolManager { terminate_signal: AtomicBool, /// Panic caught in a worker thread. worker_panic: Mutex>>, - #[cfg(feature = "dev-logs")] - /// Thread wake-up statistics. - record: Record, } impl PoolManager { @@ -61,8 +58,6 @@ impl PoolManager { searching_workers: AtomicUsize::new(0), terminate_signal: AtomicBool::new(false), worker_panic: Mutex::new(None), - #[cfg(feature = "dev-logs")] - record: Record::new(pool_size), } } @@ -85,8 +80,6 @@ impl PoolManager { .active_workers .fetch_or(1 << first_idle_worker, Ordering::Relaxed); if active_workers & (1 << first_idle_worker) == 0 { - #[cfg(feature = "dev-logs")] - self.record.increment(first_idle_worker); self.begin_worker_search(); self.worker_unparkers[first_idle_worker].unpark(); return; @@ -117,8 +110,6 @@ impl PoolManager { .active_workers .fetch_or(1 << first_idle_worker, Ordering::Relaxed); if active_workers & (1 << first_idle_worker) == 0 { - #[cfg(feature = "dev-logs")] - self.record.increment(first_idle_worker); self.begin_worker_search(); self.worker_unparkers[first_idle_worker].unpark(); return; @@ -273,13 +264,6 @@ impl PoolManager { } } -#[cfg(feature = "dev-logs")] -impl Drop for PoolManager { - fn drop(&mut self) { - println!("Thread launch count: {:?}", self.record.get()); - } -} - /// An iterator over active workers that yields their associated stealer, /// starting from a randomly selected active worker. pub(super) struct ShuffledStealers<'a> { @@ -346,27 +330,3 @@ impl<'a> Iterator for ShuffledStealers<'a> { Some(&self.stealers[current_candidate]) } } - -#[cfg(feature = "dev-logs")] -#[derive(Debug)] -struct Record { - stats: Vec, -} - -#[cfg(feature = "dev-logs")] -impl Record { - fn new(worker_count: usize) -> Self { - let mut stats = Vec::new(); - stats.resize_with(worker_count, Default::default); - Self { stats } - } - fn increment(&self, worker_id: usize) { - self.stats[worker_id].fetch_add(1, Ordering::Relaxed); - } - fn get(&self) -> Vec { - self.stats - .iter() - .map(|s| s.load(Ordering::Relaxed)) - .collect() - } -} diff --git a/asynchronix/src/grpc.rs b/asynchronix/src/grpc.rs new file mode 100644 index 0000000..9c95935 --- /dev/null +++ b/asynchronix/src/grpc.rs @@ -0,0 +1,8 @@ +//! Simulation management through remote procedure calls. + +mod codegen; +mod key_registry; +mod run; +mod services; + +pub use run::run; diff --git a/asynchronix/src/rpc/api/simulation.proto b/asynchronix/src/grpc/api/simulation.proto similarity index 100% rename from asynchronix/src/rpc/api/simulation.proto rename to asynchronix/src/grpc/api/simulation.proto diff --git a/asynchronix/src/rpc/codegen.rs b/asynchronix/src/grpc/codegen.rs similarity index 100% rename from asynchronix/src/rpc/codegen.rs rename to asynchronix/src/grpc/codegen.rs diff --git a/asynchronix/src/rpc/codegen/.gitkeep b/asynchronix/src/grpc/codegen/.gitkeep similarity index 100% rename from asynchronix/src/rpc/codegen/.gitkeep rename to asynchronix/src/grpc/codegen/.gitkeep diff --git a/asynchronix/src/rpc/codegen/simulation.rs b/asynchronix/src/grpc/codegen/simulation.rs similarity index 92% rename from asynchronix/src/rpc/codegen/simulation.rs rename to asynchronix/src/grpc/codegen/simulation.rs index daef764..3430a88 100644 --- a/asynchronix/src/rpc/codegen/simulation.rs +++ b/asynchronix/src/grpc/codegen/simulation.rs @@ -1,5 +1,4 @@ // This file is @generated by prost-build. -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Error { #[prost(enumeration = "ErrorCode", tag = "1")] @@ -7,15 +6,13 @@ pub struct Error { #[prost(string, tag = "2")] pub message: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct EventKey { #[prost(uint64, tag = "1")] pub subkey1: u64, #[prost(uint64, tag = "2")] pub subkey2: u64, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct InitRequest { #[prost(message, optional, tag = "1")] @@ -23,7 +20,6 @@ pub struct InitRequest { #[prost(bytes = "vec", tag = "2")] pub cfg: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct InitReply { /// Always returns exactly 1 variant. @@ -33,7 +29,6 @@ pub struct InitReply { /// Nested message and enum types in `InitReply`. pub mod init_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -42,10 +37,8 @@ pub mod init_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct TimeRequest {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct TimeReply { /// Always returns exactly 1 variant. @@ -55,7 +48,6 @@ pub struct TimeReply { /// Nested message and enum types in `TimeReply`. pub mod time_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -64,10 +56,8 @@ pub mod time_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct StepRequest {} -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct StepReply { /// Always returns exactly 1 variant. @@ -77,7 +67,6 @@ pub struct StepReply { /// Nested message and enum types in `StepReply`. pub mod step_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -86,8 +75,7 @@ pub mod step_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct StepUntilRequest { /// Always returns exactly 1 variant. #[prost(oneof = "step_until_request::Deadline", tags = "1, 2")] @@ -96,8 +84,7 @@ pub struct StepUntilRequest { /// Nested message and enum types in `StepUntilRequest`. pub mod step_until_request { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Deadline { #[prost(message, tag = "1")] Time(::prost_types::Timestamp), @@ -105,7 +92,6 @@ pub mod step_until_request { Duration(::prost_types::Duration), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct StepUntilReply { /// Always returns exactly 1 variant. @@ -115,7 +101,6 @@ pub struct StepUntilReply { /// Nested message and enum types in `StepUntilReply`. pub mod step_until_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -124,7 +109,6 @@ pub mod step_until_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ScheduleEventRequest { #[prost(string, tag = "3")] @@ -142,8 +126,7 @@ pub struct ScheduleEventRequest { /// Nested message and enum types in `ScheduleEventRequest`. pub mod schedule_event_request { /// Expects exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Deadline { #[prost(message, tag = "1")] Time(::prost_types::Timestamp), @@ -151,7 +134,6 @@ pub mod schedule_event_request { Duration(::prost_types::Duration), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ScheduleEventReply { /// Always returns exactly 1 variant. @@ -161,7 +143,6 @@ pub struct ScheduleEventReply { /// Nested message and enum types in `ScheduleEventReply`. pub mod schedule_event_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -172,13 +153,11 @@ pub mod schedule_event_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CancelEventRequest { #[prost(message, optional, tag = "1")] pub key: ::core::option::Option, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CancelEventReply { /// Always returns exactly 1 variant. @@ -188,7 +167,6 @@ pub struct CancelEventReply { /// Nested message and enum types in `CancelEventReply`. pub mod cancel_event_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -197,7 +175,6 @@ pub mod cancel_event_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ProcessEventRequest { #[prost(string, tag = "1")] @@ -205,7 +182,6 @@ pub struct ProcessEventRequest { #[prost(bytes = "vec", tag = "2")] pub event: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ProcessEventReply { /// Always returns exactly 1 variant. @@ -215,7 +191,6 @@ pub struct ProcessEventReply { /// Nested message and enum types in `ProcessEventReply`. pub mod process_event_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "1")] @@ -224,7 +199,6 @@ pub mod process_event_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ProcessQueryRequest { #[prost(string, tag = "1")] @@ -232,7 +206,6 @@ pub struct ProcessQueryRequest { #[prost(bytes = "vec", tag = "2")] pub request: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ProcessQueryReply { /// This field is hoisted because protobuf3 does not support `repeated` within @@ -246,7 +219,6 @@ pub struct ProcessQueryReply { /// Nested message and enum types in `ProcessQueryReply`. pub mod process_query_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "10")] @@ -255,13 +227,11 @@ pub mod process_query_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReadEventsRequest { #[prost(string, tag = "1")] pub sink_name: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReadEventsReply { /// This field is hoisted because protobuf3 does not support `repeated` within @@ -275,7 +245,6 @@ pub struct ReadEventsReply { /// Nested message and enum types in `ReadEventsReply`. pub mod read_events_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "10")] @@ -284,13 +253,11 @@ pub mod read_events_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OpenSinkRequest { #[prost(string, tag = "1")] pub sink_name: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OpenSinkReply { /// Always returns exactly 1 variant. @@ -300,7 +267,6 @@ pub struct OpenSinkReply { /// Nested message and enum types in `OpenSinkReply`. pub mod open_sink_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "10")] @@ -309,13 +275,11 @@ pub mod open_sink_reply { Error(super::Error), } } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CloseSinkRequest { #[prost(string, tag = "1")] pub sink_name: ::prost::alloc::string::String, } -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CloseSinkReply { /// Always returns exactly 1 variant. @@ -325,7 +289,6 @@ pub struct CloseSinkReply { /// Nested message and enum types in `CloseSinkReply`. pub mod close_sink_reply { /// Always returns exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Result { #[prost(message, tag = "10")] @@ -335,7 +298,6 @@ pub mod close_sink_reply { } } /// A convenience message type for custom transport implementation. -#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnyRequest { /// Expects exactly 1 variant. @@ -345,7 +307,6 @@ pub struct AnyRequest { /// Nested message and enum types in `AnyRequest`. pub mod any_request { /// Expects exactly 1 variant. - #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Request { #[prost(message, tag = "1")] @@ -428,7 +389,7 @@ pub mod simulation_server { use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with SimulationServer. #[async_trait] - pub trait Simulation: Send + Sync + 'static { + pub trait Simulation: std::marker::Send + std::marker::Sync + 'static { async fn init( &self, request: tonic::Request, @@ -487,20 +448,18 @@ pub mod simulation_server { ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] - pub struct SimulationServer { - inner: _Inner, + pub struct SimulationServer { + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); - impl SimulationServer { + impl SimulationServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -550,8 +509,8 @@ pub mod simulation_server { impl tonic::codegen::Service> for SimulationServer where T: Simulation, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -563,7 +522,6 @@ pub mod simulation_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/simulation.Simulation/Init" => { #[allow(non_camel_case_types)] @@ -592,7 +550,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = InitSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -636,7 +593,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = TimeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -680,7 +636,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = StepSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -726,7 +681,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = StepUntilSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -772,7 +726,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ScheduleEventSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -818,7 +771,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CancelEventSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -864,7 +816,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ProcessEventSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -910,7 +861,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ProcessQuerySvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -956,7 +906,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ReadEventsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1002,7 +951,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = OpenSinkSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1048,7 +996,6 @@ pub mod simulation_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CloseSinkSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1070,8 +1017,11 @@ pub mod simulation_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -1080,7 +1030,7 @@ pub mod simulation_server { } } } - impl Clone for SimulationServer { + impl Clone for SimulationServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -1092,17 +1042,9 @@ pub mod simulation_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService for SimulationServer { - const NAME: &'static str = "simulation.Simulation"; + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "simulation.Simulation"; + impl tonic::server::NamedService for SimulationServer { + const NAME: &'static str = SERVICE_NAME; } } diff --git a/asynchronix/src/rpc/key_registry.rs b/asynchronix/src/grpc/key_registry.rs similarity index 100% rename from asynchronix/src/rpc/key_registry.rs rename to asynchronix/src/grpc/key_registry.rs diff --git a/asynchronix/src/rpc/grpc.rs b/asynchronix/src/grpc/run.rs similarity index 100% rename from asynchronix/src/rpc/grpc.rs rename to asynchronix/src/grpc/run.rs diff --git a/asynchronix/src/rpc/services.rs b/asynchronix/src/grpc/services.rs similarity index 100% rename from asynchronix/src/rpc/services.rs rename to asynchronix/src/grpc/services.rs diff --git a/asynchronix/src/rpc/services/controller_service.rs b/asynchronix/src/grpc/services/controller_service.rs similarity index 99% rename from asynchronix/src/rpc/services/controller_service.rs rename to asynchronix/src/grpc/services/controller_service.rs index 2c1bdf9..9345949 100644 --- a/asynchronix/src/rpc/services/controller_service.rs +++ b/asynchronix/src/grpc/services/controller_service.rs @@ -2,8 +2,8 @@ use std::fmt; use prost_types::Timestamp; +use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::registry::{EventSourceRegistry, QuerySourceRegistry}; -use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::simulation::Simulation; use super::super::codegen::simulation::*; @@ -53,7 +53,7 @@ impl ControllerService { } /// Advances simulation time to that of the next scheduled event, processing - /// that event as well as all other event scheduled for the same time. + /// that event as well as all other events scheduled for the same time. /// /// Processing is gated by a (possibly blocking) call to /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the diff --git a/asynchronix/src/rpc/services/init_service.rs b/asynchronix/src/grpc/services/init_service.rs similarity index 100% rename from asynchronix/src/rpc/services/init_service.rs rename to asynchronix/src/grpc/services/init_service.rs diff --git a/asynchronix/src/rpc/services/monitor_service.rs b/asynchronix/src/grpc/services/monitor_service.rs similarity index 100% rename from asynchronix/src/rpc/services/monitor_service.rs rename to asynchronix/src/grpc/services/monitor_service.rs diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index 5ba0707..ffa8139 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -409,14 +409,14 @@ pub(crate) mod channel; pub(crate) mod executor; +#[cfg(feature = "grpc")] +pub mod grpc; mod loom_exports; pub(crate) mod macros; pub mod model; pub mod ports; -#[cfg(feature = "rpc")] +#[cfg(feature = "grpc")] pub mod registry; -#[cfg(feature = "rpc")] -pub mod rpc; pub mod simulation; pub mod time; pub(crate) mod util; diff --git a/asynchronix/src/rpc.rs b/asynchronix/src/rpc.rs deleted file mode 100644 index 9f0a640..0000000 --- a/asynchronix/src/rpc.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Simulation management through remote procedure calls. - -mod codegen; -#[cfg(feature = "grpc-service")] -pub mod grpc; -mod key_registry; -#[cfg(feature = "wasm-service")] -mod protobuf; -mod services; -#[cfg(feature = "wasm-service")] -pub mod wasm; diff --git a/asynchronix/src/rpc/protobuf.rs b/asynchronix/src/rpc/protobuf.rs deleted file mode 100644 index fb417bb..0000000 --- a/asynchronix/src/rpc/protobuf.rs +++ /dev/null @@ -1,146 +0,0 @@ -use std::error; -use std::fmt; - -use bytes::Buf; -use prost::Message; -use serde::de::DeserializeOwned; - -use crate::registry::EndpointRegistry; -use crate::rpc::key_registry::KeyRegistry; -use crate::simulation::SimInit; - -use super::codegen::simulation::*; -use super::services::{ControllerService, InitService, MonitorService}; - -/// Protobuf-based simulation manager. -/// -/// A `ProtobufService` enables the management of the lifecycle of a -/// simulation, including creating a -/// [`Simulation`](crate::simulation::Simulation), invoking its methods and -/// instantiating a new simulation. -/// -/// Its methods map the various RPC service methods defined in -/// `simulation.proto`. -pub(crate) struct ProtobufService { - init_service: InitService, - controller_service: ControllerService, - monitor_service: MonitorService, -} - -impl ProtobufService { - /// Creates a new `ProtobufService` without any active simulation. - /// - /// The argument is a closure that takes an initialization configuration and - /// is called every time the simulation is (re)started by the remote client. - /// It must create a new `SimInit` object complemented by a registry that - /// exposes the public event and query interface. - pub(crate) fn new(sim_gen: F) -> Self - where - F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, - I: DeserializeOwned, - { - Self { - init_service: InitService::new(sim_gen), - controller_service: ControllerService::NotStarted, - monitor_service: MonitorService::NotStarted, - } - } - - /// Processes an encoded `AnyRequest` message and returns an encoded reply. - pub(crate) fn process_request(&mut self, request_buf: B) -> Result, InvalidRequest> - where - B: Buf, - { - match AnyRequest::decode(request_buf) { - Ok(AnyRequest { request: Some(req) }) => match req { - any_request::Request::InitRequest(request) => { - Ok(self.init(request).encode_to_vec()) - } - any_request::Request::TimeRequest(request) => { - Ok(self.controller_service.time(request).encode_to_vec()) - } - any_request::Request::StepRequest(request) => { - Ok(self.controller_service.step(request).encode_to_vec()) - } - any_request::Request::StepUntilRequest(request) => { - Ok(self.controller_service.step_until(request).encode_to_vec()) - } - any_request::Request::ScheduleEventRequest(request) => Ok(self - .controller_service - .schedule_event(request) - .encode_to_vec()), - any_request::Request::CancelEventRequest(request) => Ok(self - .controller_service - .cancel_event(request) - .encode_to_vec()), - any_request::Request::ProcessEventRequest(request) => Ok(self - .controller_service - .process_event(request) - .encode_to_vec()), - any_request::Request::ProcessQueryRequest(request) => Ok(self - .controller_service - .process_query(request) - .encode_to_vec()), - any_request::Request::ReadEventsRequest(request) => { - Ok(self.monitor_service.read_events(request).encode_to_vec()) - } - any_request::Request::OpenSinkRequest(request) => { - Ok(self.monitor_service.open_sink(request).encode_to_vec()) - } - any_request::Request::CloseSinkRequest(request) => { - Ok(self.monitor_service.close_sink(request).encode_to_vec()) - } - }, - Ok(AnyRequest { request: None }) => Err(InvalidRequest { - description: "the message did not contain any request".to_string(), - }), - Err(err) => Err(InvalidRequest { - description: format!("bad request: {}", err), - }), - } - } - - /// Initialize a simulation with the provided time. - /// - /// If a simulation is already active, it is destructed and replaced with a - /// new simulation. - /// - /// If the initialization time is not provided, it is initialized with the - /// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI). - fn init(&mut self, request: InitRequest) -> InitReply { - let (reply, bench) = self.init_service.init(request); - - if let Some((simulation, endpoint_registry)) = bench { - self.controller_service = ControllerService::Started { - simulation, - event_source_registry: endpoint_registry.event_source_registry, - query_source_registry: endpoint_registry.query_source_registry, - key_registry: KeyRegistry::default(), - }; - self.monitor_service = MonitorService::Started { - event_sink_registry: endpoint_registry.event_sink_registry, - }; - } - - reply - } -} - -impl fmt::Debug for ProtobufService { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ProtobufService").finish_non_exhaustive() - } -} - -#[derive(Clone, Debug)] -pub(crate) struct InvalidRequest { - description: String, -} - -impl fmt::Display for InvalidRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&self.description) - } -} - -impl error::Error for InvalidRequest {} diff --git a/asynchronix/src/rpc/wasm.rs b/asynchronix/src/rpc/wasm.rs deleted file mode 100644 index e0dd32a..0000000 --- a/asynchronix/src/rpc/wasm.rs +++ /dev/null @@ -1,86 +0,0 @@ -//! WASM simulation service. -//! -//! This module provides [`WasmSimulationService`], a thin wrapper over a -//! [`SimulationService`] that can be use from JavaScript. -//! -//! Although it is readily possible to use a -//! [`Simulation`](crate::simulation::Simulation) object from WASM, -//! [`WasmSimulationService`] goes further by exposing the complete simulation -//! API to JavaScript through protobuf. -//! -//! Keep in mind that WASM only supports single-threaded execution and therefore -//! any simulation bench compiled to WASM should instantiate simulations with -//! either [`SimInit::new()`](crate::simulation::SimInit::new) or -//! [`SimInit::with_num_threads(1)`](crate::simulation::SimInit::with_num_threads), -//! failing which the simulation will panic upon initialization. -//! -//! [`WasmSimulationService`] is exported to the JavaScript namespace as -//! `SimulationService`, and [`WasmSimulationService::process_request`] as -//! `SimulationService.processRequest`. - -use serde::de::DeserializeOwned; -use wasm_bindgen::prelude::*; - -use crate::registry::EndpointRegistry; -use crate::simulation::SimInit; - -use super::protobuf::ProtobufService; - -/// A simulation service that can be used from JavaScript. -/// -/// This would typically be used by implementing a `run` function in Rust and -/// export it to WASM: -/// -/// ```no_run -/// #[wasm_bindgen] -/// pub fn run() -> WasmSimulationService { -/// WasmSimulationService::new(my_custom_bench_generator) -/// } -/// ``` -/// -/// which can then be used on the JS side to create a `SimulationService` as a -/// JS object, e.g. with: -/// -/// ```js -/// const simu = run(); -/// -/// // ...build a protobuf request and encode it as a `Uint8Array`... -/// -/// const reply = simu.processRequest(myRequest); -/// -/// // ...decode the protobuf reply... -/// ``` -#[wasm_bindgen(js_name = SimulationService)] -#[derive(Debug)] -pub struct WasmSimulationService(ProtobufService); - -#[wasm_bindgen(js_class = SimulationService)] -impl WasmSimulationService { - /// Processes a protobuf-encoded `AnyRequest` message and returns a - /// protobuf-encoded reply. - /// - /// For the Protocol Buffer definitions, see the `simulation.proto` file. - #[wasm_bindgen(js_name = processRequest)] - pub fn process_request(&mut self, request: &[u8]) -> Result, JsError> { - self.0 - .process_request(request) - .map(|reply| reply.into_boxed_slice()) - .map_err(|e| JsError::new(&e.to_string())) - } -} - -impl WasmSimulationService { - /// Creates a new `SimulationService` without any active simulation. - /// - /// The argument is a closure that is called every time the simulation is - /// (re)started by the remote client. It must create a new `SimInit` object - /// complemented by a registry that exposes the public event and query - /// interface. - pub fn new(sim_gen: F) -> Self - where - F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, - I: DeserializeOwned, - { - Self(ProtobufService::new(sim_gen)) - } -}