1
0
forked from ROMEO/nexosim

Unix socket server support + rename grpc -> server

This commit is contained in:
Serge Barral 2025-01-19 14:42:50 +01:00
parent 81c1d61290
commit e526071a29
16 changed files with 104 additions and 20 deletions

View File

@ -30,7 +30,7 @@ keywords = [
[features]
# gRPC service.
grpc = [
server = [
"dep:bytes",
"dep:ciborium",
"dep:prost",
@ -38,6 +38,7 @@ grpc = [
"dep:serde",
"dep:tonic",
"dep:tokio",
"dep:tokio-stream",
"dep:tonic",
"tai-time/serde",
]
@ -84,6 +85,9 @@ tracing = { version = "0.1.40", default-features = false, features = [
], optional = true }
tracing-subscriber = { version = "0.3.18", optional = true }
[target.'cfg(unix)'.dependencies]
tokio-stream = { version = "0.1.10", features = ["net"], optional = true }
[dev-dependencies]
futures-util = "0.3"
futures-executor = "0.3"
@ -93,15 +97,15 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
loom = "0.7"
waker-fn = "1.1"
[target.'cfg(nexosim_grpc_codegen)'.build-dependencies]
[target.'cfg(nexosim_server_codegen)'.build-dependencies]
tonic-build = { version = "0.12" }
[lints.rust]
# `nexosim_loom` flag: run loom-based tests.
# `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions.
# `nexosim_server_codegen` flag: regenerate gRPC code from .proto definitions.
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(nexosim_loom)',
'cfg(nexosim_grpc_codegen)',
'cfg(nexosim_server_codegen)',
] }
[package.metadata.docs.rs]

View File

@ -1,9 +1,9 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(nexosim_grpc_codegen)]
#[cfg(nexosim_server_codegen)]
tonic_build::configure()
.build_client(false)
.out_dir("src/grpc/codegen/")
.compile_protos(&["simulation.proto"], &["src/grpc/api/"])?;
.out_dir("src/server/codegen/")
.compile_protos(&["simulation.proto"], &["src/server/api/"])?;
Ok(())
}

View File

@ -399,14 +399,14 @@
//!
//! See the [`tracing`] module for more information.
//!
//! ## gRPC server
//! ## Server
//!
//! The `grpc` feature provides a gRPC server for remote control and monitoring,
//! The `server` feature provides a gRPC server for remote control and monitoring,
//! e.g. from a Python client. It can be activated with:
//!
//! ```toml
//! [dependencies]
//! nexosim = { version = "0.3.0-beta.0", features = ["grpc"] }
//! nexosim = { version = "0.3.0-beta.0", features = ["server"] }
//! ```
//!
//! # Other resources
@ -449,10 +449,10 @@ pub mod simulation;
pub mod time;
pub(crate) mod util;
#[cfg(feature = "grpc")]
pub mod grpc;
#[cfg(feature = "grpc")]
#[cfg(feature = "server")]
pub mod registry;
#[cfg(feature = "server")]
pub mod server;
#[cfg(feature = "tracing")]
pub mod tracing;

View File

@ -6,3 +6,6 @@ mod run;
mod services;
pub use run::run;
#[cfg(unix)]
pub use run::run_local;

View File

@ -1,6 +1,7 @@
//! gRPC simulation service.
//! Simulation server.
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
@ -16,7 +17,7 @@ use super::key_registry::KeyRegistry;
use super::services::InitService;
use super::services::{ControllerService, MonitorService, SchedulerService};
/// Runs a gRPC simulation server.
/// Runs a simulation from a network server.
///
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
@ -30,7 +31,7 @@ where
run_service(GrpcSimulationService::new(sim_gen), addr)
}
/// Monomorphization of the networking code.
/// Monomorphization of the network server.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
@ -38,8 +39,8 @@ fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the even if the controller service is blocked due
// to ongoing simulation execution, other services can still be used
// Use 2 threads so that even if the controller service is blocked due to
// ongoing simulation execution, other services can still be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
@ -56,6 +57,82 @@ fn run_service(
})
}
/// Runs a simulation locally from a Unix Domain Sockets server.
///
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
/// It must create a new simulation, complemented by a registry that exposes the
/// public event and query interface.
#[cfg(unix)]
pub fn run_local<F, I, P>(sim_gen: F, path: P) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> Result<(Simulation, EndpointRegistry), SimulationError> + Send + 'static,
I: DeserializeOwned,
P: AsRef<Path>,
{
let path = path.as_ref();
run_local_service(GrpcSimulationService::new(sim_gen), path)
}
/// Monomorphization of the Unix Domain Sockets server.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
#[cfg(unix)]
fn run_local_service(
service: GrpcSimulationService,
path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
use std::fs;
use std::io;
use std::os::unix::fs::FileTypeExt;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
// Unlink the socket if it already exists to prevent an `AddrInUse` error.
match fs::metadata(path) {
// The path is valid: make sure it actually points to a socket.
Ok(socket_meta) => {
if !socket_meta.file_type().is_socket() {
return Err(Box::new(io::Error::new(
io::ErrorKind::AlreadyExists,
"the specified path points to an existing non-socket file",
)));
}
fs::remove_file(path)?;
}
// Nothing to do: the socket does not exist yet.
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
// We don't have permission to use the socket.
Err(e) => return Err(Box::new(e)),
}
// (Re-)Create the socket.
fs::create_dir_all(path.parent().unwrap())?;
// Use 2 threads so that even if the controller service is blocked due to
// ongoing simulation execution, other services can still be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_io()
.build()?;
rt.block_on(async move {
let uds = UnixListener::bind(path)?;
let uds_stream = UnixListenerStream::new(uds);
Server::builder()
.add_service(simulation_server::SimulationServer::new(service))
.serve_with_incoming(uds_stream)
.await?;
Ok(())
})
}
struct GrpcSimulationService {
init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>,

View File

@ -1,8 +1,8 @@
use std::fmt;
use std::sync::Arc;
use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::EventSourceRegistry;
use crate::server::key_registry::{KeyRegistry, KeyRegistryId};
use crate::simulation::Scheduler;
use super::super::codegen::simulation::*;

View File

@ -537,7 +537,7 @@ impl Simulation {
}
/// Returns a scheduler handle.
#[cfg(feature = "grpc")]
#[cfg(feature = "server")]
pub(crate) fn scheduler(&self) -> Scheduler {
Scheduler::new(
self.scheduler_queue.clone(),