From e526071a293e6bec13e734d9c87274f13b5fd9f3 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Sun, 19 Jan 2025 14:42:50 +0100 Subject: [PATCH] Unix socket server support + rename grpc -> server --- nexosim/Cargo.toml | 12 ++- nexosim/build.rs | 6 +- nexosim/src/lib.rs | 12 +-- nexosim/src/{grpc.rs => server.rs} | 3 + .../src/{grpc => server}/api/simulation.proto | 0 nexosim/src/{grpc => server}/codegen.rs | 0 nexosim/src/{grpc => server}/codegen/.gitkeep | 0 .../{grpc => server}/codegen/simulation.v1.rs | 0 nexosim/src/{grpc => server}/key_registry.rs | 0 nexosim/src/{grpc => server}/run.rs | 87 +++++++++++++++++-- nexosim/src/{grpc => server}/services.rs | 0 .../services/controller_service.rs | 0 .../{grpc => server}/services/init_service.rs | 0 .../services/monitor_service.rs | 0 .../services/scheduler_service.rs | 2 +- nexosim/src/simulation.rs | 2 +- 16 files changed, 104 insertions(+), 20 deletions(-) rename nexosim/src/{grpc.rs => server.rs} (77%) rename nexosim/src/{grpc => server}/api/simulation.proto (100%) rename nexosim/src/{grpc => server}/codegen.rs (100%) rename nexosim/src/{grpc => server}/codegen/.gitkeep (100%) rename nexosim/src/{grpc => server}/codegen/simulation.v1.rs (100%) rename nexosim/src/{grpc => server}/key_registry.rs (100%) rename nexosim/src/{grpc => server}/run.rs (70%) rename nexosim/src/{grpc => server}/services.rs (100%) rename nexosim/src/{grpc => server}/services/controller_service.rs (100%) rename nexosim/src/{grpc => server}/services/init_service.rs (100%) rename nexosim/src/{grpc => server}/services/monitor_service.rs (100%) rename nexosim/src/{grpc => server}/services/scheduler_service.rs (99%) diff --git a/nexosim/Cargo.toml b/nexosim/Cargo.toml index 6813d52..daf1299 100644 --- a/nexosim/Cargo.toml +++ b/nexosim/Cargo.toml @@ -30,7 +30,7 @@ keywords = [ [features] # gRPC service. -grpc = [ +server = [ "dep:bytes", "dep:ciborium", "dep:prost", @@ -38,6 +38,7 @@ grpc = [ "dep:serde", "dep:tonic", "dep:tokio", + "dep:tokio-stream", "dep:tonic", "tai-time/serde", ] @@ -84,6 +85,9 @@ tracing = { version = "0.1.40", default-features = false, features = [ ], optional = true } tracing-subscriber = { version = "0.3.18", optional = true } +[target.'cfg(unix)'.dependencies] +tokio-stream = { version = "0.1.10", features = ["net"], optional = true } + [dev-dependencies] futures-util = "0.3" futures-executor = "0.3" @@ -93,15 +97,15 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } loom = "0.7" waker-fn = "1.1" -[target.'cfg(nexosim_grpc_codegen)'.build-dependencies] +[target.'cfg(nexosim_server_codegen)'.build-dependencies] tonic-build = { version = "0.12" } [lints.rust] # `nexosim_loom` flag: run loom-based tests. -# `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions. +# `nexosim_server_codegen` flag: regenerate gRPC code from .proto definitions. unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(nexosim_loom)', - 'cfg(nexosim_grpc_codegen)', + 'cfg(nexosim_server_codegen)', ] } [package.metadata.docs.rs] diff --git a/nexosim/build.rs b/nexosim/build.rs index 908f394..64e57f9 100644 --- a/nexosim/build.rs +++ b/nexosim/build.rs @@ -1,9 +1,9 @@ fn main() -> Result<(), Box> { - #[cfg(nexosim_grpc_codegen)] + #[cfg(nexosim_server_codegen)] tonic_build::configure() .build_client(false) - .out_dir("src/grpc/codegen/") - .compile_protos(&["simulation.proto"], &["src/grpc/api/"])?; + .out_dir("src/server/codegen/") + .compile_protos(&["simulation.proto"], &["src/server/api/"])?; Ok(()) } diff --git a/nexosim/src/lib.rs b/nexosim/src/lib.rs index 8d16d98..c627a48 100644 --- a/nexosim/src/lib.rs +++ b/nexosim/src/lib.rs @@ -399,14 +399,14 @@ //! //! See the [`tracing`] module for more information. //! -//! ## gRPC server +//! ## Server //! -//! The `grpc` feature provides a gRPC server for remote control and monitoring, +//! The `server` feature provides a gRPC server for remote control and monitoring, //! e.g. from a Python client. It can be activated with: //! //! ```toml //! [dependencies] -//! nexosim = { version = "0.3.0-beta.0", features = ["grpc"] } +//! nexosim = { version = "0.3.0-beta.0", features = ["server"] } //! ``` //! //! # Other resources @@ -449,10 +449,10 @@ pub mod simulation; pub mod time; pub(crate) mod util; -#[cfg(feature = "grpc")] -pub mod grpc; -#[cfg(feature = "grpc")] +#[cfg(feature = "server")] pub mod registry; +#[cfg(feature = "server")] +pub mod server; #[cfg(feature = "tracing")] pub mod tracing; diff --git a/nexosim/src/grpc.rs b/nexosim/src/server.rs similarity index 77% rename from nexosim/src/grpc.rs rename to nexosim/src/server.rs index 9c95935..30c2580 100644 --- a/nexosim/src/grpc.rs +++ b/nexosim/src/server.rs @@ -6,3 +6,6 @@ mod run; mod services; pub use run::run; + +#[cfg(unix)] +pub use run::run_local; diff --git a/nexosim/src/grpc/api/simulation.proto b/nexosim/src/server/api/simulation.proto similarity index 100% rename from nexosim/src/grpc/api/simulation.proto rename to nexosim/src/server/api/simulation.proto diff --git a/nexosim/src/grpc/codegen.rs b/nexosim/src/server/codegen.rs similarity index 100% rename from nexosim/src/grpc/codegen.rs rename to nexosim/src/server/codegen.rs diff --git a/nexosim/src/grpc/codegen/.gitkeep b/nexosim/src/server/codegen/.gitkeep similarity index 100% rename from nexosim/src/grpc/codegen/.gitkeep rename to nexosim/src/server/codegen/.gitkeep diff --git a/nexosim/src/grpc/codegen/simulation.v1.rs b/nexosim/src/server/codegen/simulation.v1.rs similarity index 100% rename from nexosim/src/grpc/codegen/simulation.v1.rs rename to nexosim/src/server/codegen/simulation.v1.rs diff --git a/nexosim/src/grpc/key_registry.rs b/nexosim/src/server/key_registry.rs similarity index 100% rename from nexosim/src/grpc/key_registry.rs rename to nexosim/src/server/key_registry.rs diff --git a/nexosim/src/grpc/run.rs b/nexosim/src/server/run.rs similarity index 70% rename from nexosim/src/grpc/run.rs rename to nexosim/src/server/run.rs index a5c2d02..8c669ea 100644 --- a/nexosim/src/grpc/run.rs +++ b/nexosim/src/server/run.rs @@ -1,6 +1,7 @@ -//! gRPC simulation service. +//! Simulation server. use std::net::SocketAddr; +use std::path::Path; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; @@ -16,7 +17,7 @@ use super::key_registry::KeyRegistry; use super::services::InitService; use super::services::{ControllerService, MonitorService, SchedulerService}; -/// Runs a gRPC simulation server. +/// Runs a simulation from a network server. /// /// The first argument is a closure that takes an initialization configuration /// and is called every time the simulation is (re)started by the remote client. @@ -30,7 +31,7 @@ where run_service(GrpcSimulationService::new(sim_gen), addr) } -/// Monomorphization of the networking code. +/// Monomorphization of the network server. /// /// Keeping this as a separate monomorphized fragment can even triple /// compilation speed for incremental release builds. @@ -38,8 +39,8 @@ fn run_service( service: GrpcSimulationService, addr: SocketAddr, ) -> Result<(), Box> { - // Use 2 threads so that the even if the controller service is blocked due - // to ongoing simulation execution, other services can still be used + // Use 2 threads so that even if the controller service is blocked due to + // ongoing simulation execution, other services can still be used // concurrently. let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) @@ -56,6 +57,82 @@ fn run_service( }) } +/// Runs a simulation locally from a Unix Domain Sockets server. +/// +/// The first argument is a closure that takes an initialization configuration +/// and is called every time the simulation is (re)started by the remote client. +/// It must create a new simulation, complemented by a registry that exposes the +/// public event and query interface. +#[cfg(unix)] +pub fn run_local(sim_gen: F, path: P) -> Result<(), Box> +where + F: FnMut(I) -> Result<(Simulation, EndpointRegistry), SimulationError> + Send + 'static, + I: DeserializeOwned, + P: AsRef, +{ + let path = path.as_ref(); + run_local_service(GrpcSimulationService::new(sim_gen), path) +} + +/// Monomorphization of the Unix Domain Sockets server. +/// +/// Keeping this as a separate monomorphized fragment can even triple +/// compilation speed for incremental release builds. +#[cfg(unix)] +fn run_local_service( + service: GrpcSimulationService, + path: &Path, +) -> Result<(), Box> { + use std::fs; + use std::io; + use std::os::unix::fs::FileTypeExt; + + use tokio::net::UnixListener; + use tokio_stream::wrappers::UnixListenerStream; + + // Unlink the socket if it already exists to prevent an `AddrInUse` error. + match fs::metadata(path) { + // The path is valid: make sure it actually points to a socket. + Ok(socket_meta) => { + if !socket_meta.file_type().is_socket() { + return Err(Box::new(io::Error::new( + io::ErrorKind::AlreadyExists, + "the specified path points to an existing non-socket file", + ))); + } + + fs::remove_file(path)?; + } + // Nothing to do: the socket does not exist yet. + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + // We don't have permission to use the socket. + Err(e) => return Err(Box::new(e)), + } + + // (Re-)Create the socket. + fs::create_dir_all(path.parent().unwrap())?; + + // Use 2 threads so that even if the controller service is blocked due to + // ongoing simulation execution, other services can still be used + // concurrently. + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .build()?; + + rt.block_on(async move { + let uds = UnixListener::bind(path)?; + let uds_stream = UnixListenerStream::new(uds); + + Server::builder() + .add_service(simulation_server::SimulationServer::new(service)) + .serve_with_incoming(uds_stream) + .await?; + + Ok(()) + }) +} + struct GrpcSimulationService { init_service: Mutex, controller_service: Mutex, diff --git a/nexosim/src/grpc/services.rs b/nexosim/src/server/services.rs similarity index 100% rename from nexosim/src/grpc/services.rs rename to nexosim/src/server/services.rs diff --git a/nexosim/src/grpc/services/controller_service.rs b/nexosim/src/server/services/controller_service.rs similarity index 100% rename from nexosim/src/grpc/services/controller_service.rs rename to nexosim/src/server/services/controller_service.rs diff --git a/nexosim/src/grpc/services/init_service.rs b/nexosim/src/server/services/init_service.rs similarity index 100% rename from nexosim/src/grpc/services/init_service.rs rename to nexosim/src/server/services/init_service.rs diff --git a/nexosim/src/grpc/services/monitor_service.rs b/nexosim/src/server/services/monitor_service.rs similarity index 100% rename from nexosim/src/grpc/services/monitor_service.rs rename to nexosim/src/server/services/monitor_service.rs diff --git a/nexosim/src/grpc/services/scheduler_service.rs b/nexosim/src/server/services/scheduler_service.rs similarity index 99% rename from nexosim/src/grpc/services/scheduler_service.rs rename to nexosim/src/server/services/scheduler_service.rs index 7c818cf..5a5b9a7 100644 --- a/nexosim/src/grpc/services/scheduler_service.rs +++ b/nexosim/src/server/services/scheduler_service.rs @@ -1,8 +1,8 @@ use std::fmt; use std::sync::Arc; -use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::registry::EventSourceRegistry; +use crate::server::key_registry::{KeyRegistry, KeyRegistryId}; use crate::simulation::Scheduler; use super::super::codegen::simulation::*; diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index 70e159d..55ea54d 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -537,7 +537,7 @@ impl Simulation { } /// Returns a scheduler handle. - #[cfg(feature = "grpc")] + #[cfg(feature = "server")] pub(crate) fn scheduler(&self) -> Scheduler { Scheduler::new( self.scheduler_queue.clone(),