diff --git a/asynchronix/src/rpc/api/simulation.proto b/asynchronix/src/rpc/api/simulation.proto index 8aa9f68..b340415 100644 --- a/asynchronix/src/rpc/api/simulation.proto +++ b/asynchronix/src/rpc/api/simulation.proto @@ -17,8 +17,7 @@ enum ErrorCode { INVALID_KEY = 6; SOURCE_NOT_FOUND = 10; SINK_NOT_FOUND = 11; - KEY_NOT_FOUND = 12; - SIMULATION_TIME_OUT_OF_RANGE = 13; + SIMULATION_TIME_OUT_OF_RANGE = 12; } message Error { @@ -31,7 +30,10 @@ message EventKey { uint64 subkey2 = 2; } -message InitRequest { google.protobuf.Timestamp time = 1; } +message InitRequest { + google.protobuf.Timestamp time = 1; + bytes cfg = 2; +} message InitReply { oneof result { // Always returns exactly 1 variant. google.protobuf.Empty empty = 1; diff --git a/asynchronix/src/rpc/codegen/simulation.rs b/asynchronix/src/rpc/codegen/simulation.rs index 672aed1..daef764 100644 --- a/asynchronix/src/rpc/codegen/simulation.rs +++ b/asynchronix/src/rpc/codegen/simulation.rs @@ -20,6 +20,8 @@ pub struct EventKey { pub struct InitRequest { #[prost(message, optional, tag = "1")] pub time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(bytes = "vec", tag = "2")] + pub cfg: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -382,8 +384,7 @@ pub enum ErrorCode { InvalidKey = 6, SourceNotFound = 10, SinkNotFound = 11, - KeyNotFound = 12, - SimulationTimeOutOfRange = 13, + SimulationTimeOutOfRange = 12, } impl ErrorCode { /// String value of the enum field names used in the ProtoBuf definition. @@ -401,7 +402,6 @@ impl ErrorCode { ErrorCode::InvalidKey => "INVALID_KEY", ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", ErrorCode::SinkNotFound => "SINK_NOT_FOUND", - ErrorCode::KeyNotFound => "KEY_NOT_FOUND", ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", } } @@ -417,7 +417,6 @@ impl ErrorCode { "INVALID_KEY" => Some(Self::InvalidKey), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "SINK_NOT_FOUND" => Some(Self::SinkNotFound), - "KEY_NOT_FOUND" => Some(Self::KeyNotFound), "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), _ => None, } diff --git a/asynchronix/src/rpc/grpc.rs b/asynchronix/src/rpc/grpc.rs index d94160c..964e351 100644 --- a/asynchronix/src/rpc/grpc.rs +++ b/asynchronix/src/rpc/grpc.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use std::sync::Mutex; use std::sync::MutexGuard; +use serde::de::DeserializeOwned; use tonic::{transport::Server, Request, Response, Status}; use crate::registry::EndpointRegistry; @@ -11,18 +12,31 @@ use crate::simulation::SimInit; use super::codegen::simulation::*; use super::key_registry::KeyRegistry; -use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}; +use super::services::InitService; +use super::services::{ControllerService, MonitorService}; /// Runs a gRPC simulation server. /// -/// The first argument is a closure that is called every time the simulation is -/// (re)started by the remote client. It must create a new `SimInit` object -/// complemented by a registry that exposes the public event and query -/// interface. -pub fn run(sim_gen: F, addr: SocketAddr) -> Result<(), Box> +/// The first argument is a closure that takes an initialization configuration +/// and is called every time the simulation is (re)started by the remote client. +/// It must create a new `SimInit` object complemented by a registry that +/// exposes the public event and query interface. +pub fn run(sim_gen: F, addr: SocketAddr) -> Result<(), Box> where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, + I: DeserializeOwned, { + run_service(GrpcSimulationService::new(sim_gen), addr) +} + +/// Monomorphization of the networking code. +/// +/// Keeping this as a separate monomorphized fragment can even triple +/// compilation speed for incremental release builds. +fn run_service( + service: GrpcSimulationService, + addr: SocketAddr, +) -> Result<(), Box> { // Use 2 threads so that the controller and monitor services can be used // concurrently. let rt = tokio::runtime::Builder::new_multi_thread() @@ -30,11 +44,9 @@ where .enable_io() .build()?; - let sim_manager = GrpcSimulationService::new(sim_gen); - rt.block_on(async move { Server::builder() - .add_service(simulation_server::SimulationServer::new(sim_manager)) + .add_service(simulation_server::SimulationServer::new(service)) .serve(addr) .await?; @@ -43,7 +55,7 @@ where } struct GrpcSimulationService { - sim_gen: Mutex (SimInit, EndpointRegistry) + Send + 'static>>, + init_service: Mutex, controller_service: Mutex, monitor_service: Mutex, } @@ -51,21 +63,27 @@ struct GrpcSimulationService { impl GrpcSimulationService { /// Creates a new `GrpcSimulationService` without any active simulation. /// - /// The argument is a closure that is called every time the simulation is - /// (re)started by the remote client. It must create a new `SimInit` object - /// complemented by a registry that exposes the public event and query - /// interface. - pub(crate) fn new(sim_gen: F) -> Self + /// The argument is a closure that takes an initialization configuration and + /// is called every time the simulation is (re)started by the remote client. + /// It must create a new `SimInit` object complemented by a registry that + /// exposes the public event and query interface. + pub(crate) fn new(sim_gen: F) -> Self where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, + I: DeserializeOwned, { Self { - sim_gen: Mutex::new(Box::new(sim_gen)), + init_service: Mutex::new(InitService::new(sim_gen)), controller_service: Mutex::new(ControllerService::NotStarted), monitor_service: Mutex::new(MonitorService::NotStarted), } } + /// Locks the initializer and returns the mutex guard. + fn initializer(&self) -> MutexGuard<'_, InitService> { + self.init_service.lock().unwrap() + } + /// Locks the controller and returns the mutex guard. fn controller(&self) -> MutexGuard<'_, ControllerService> { self.controller_service.lock().unwrap() @@ -82,10 +100,9 @@ impl simulation_server::Simulation for GrpcSimulationService { async fn init(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - let start_time = request.time.unwrap_or_default(); - let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { - let (sim_init, endpoint_registry) = (self.sim_gen.lock().unwrap())(); - let simulation = sim_init.init(start_time); + let (reply, bench) = self.initializer().init(request); + + if let Some((simulation, endpoint_registry)) = bench { *self.controller() = ControllerService::Started { simulation, event_source_registry: endpoint_registry.event_source_registry, @@ -95,18 +112,9 @@ impl simulation_server::Simulation for GrpcSimulationService { *self.monitor() = MonitorService::Started { event_sink_registry: endpoint_registry.event_sink_registry, }; + } - init_reply::Result::Empty(()) - } else { - init_reply::Result::Error(Error { - code: ErrorCode::InvalidTime as i32, - message: "out-of-range nanosecond field".to_string(), - }) - }; - - Ok(Response::new(InitReply { - result: Some(reply), - })) + Ok(Response::new(reply)) } async fn time(&self, request: Request) -> Result, Status> { let request = request.into_inner(); diff --git a/asynchronix/src/rpc/protobuf.rs b/asynchronix/src/rpc/protobuf.rs index ca1a242..fb417bb 100644 --- a/asynchronix/src/rpc/protobuf.rs +++ b/asynchronix/src/rpc/protobuf.rs @@ -3,13 +3,14 @@ use std::fmt; use bytes::Buf; use prost::Message; +use serde::de::DeserializeOwned; use crate::registry::EndpointRegistry; use crate::rpc::key_registry::KeyRegistry; use crate::simulation::SimInit; use super::codegen::simulation::*; -use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}; +use super::services::{ControllerService, InitService, MonitorService}; /// Protobuf-based simulation manager. /// @@ -21,7 +22,7 @@ use super::services::{timestamp_to_monotonic, ControllerService, MonitorService} /// Its methods map the various RPC service methods defined in /// `simulation.proto`. pub(crate) struct ProtobufService { - sim_gen: Box (SimInit, EndpointRegistry) + Send + 'static>, + init_service: InitService, controller_service: ControllerService, monitor_service: MonitorService, } @@ -29,16 +30,17 @@ pub(crate) struct ProtobufService { impl ProtobufService { /// Creates a new `ProtobufService` without any active simulation. /// - /// The argument is a closure that is called every time the simulation is - /// (re)started by the remote client. It must create a new `SimInit` object - /// complemented by a registry that exposes the public event and query - /// interface. - pub(crate) fn new(sim_gen: F) -> Self + /// The argument is a closure that takes an initialization configuration and + /// is called every time the simulation is (re)started by the remote client. + /// It must create a new `SimInit` object complemented by a registry that + /// exposes the public event and query interface. + pub(crate) fn new(sim_gen: F) -> Self where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, + I: DeserializeOwned, { Self { - sim_gen: Box::new(sim_gen), + init_service: InitService::new(sim_gen), controller_service: ControllerService::NotStarted, monitor_service: MonitorService::NotStarted, } @@ -106,10 +108,9 @@ impl ProtobufService { /// If the initialization time is not provided, it is initialized with the /// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI). fn init(&mut self, request: InitRequest) -> InitReply { - let start_time = request.time.unwrap_or_default(); - let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { - let (sim_init, endpoint_registry) = (self.sim_gen)(); - let simulation = sim_init.init(start_time); + let (reply, bench) = self.init_service.init(request); + + if let Some((simulation, endpoint_registry)) = bench { self.controller_service = ControllerService::Started { simulation, event_source_registry: endpoint_registry.event_source_registry, @@ -119,18 +120,9 @@ impl ProtobufService { self.monitor_service = MonitorService::Started { event_sink_registry: endpoint_registry.event_sink_registry, }; - - init_reply::Result::Empty(()) - } else { - init_reply::Result::Error(Error { - code: ErrorCode::InvalidTime as i32, - message: "out-of-range nanosecond field".to_string(), - }) - }; - - InitReply { - result: Some(reply), } + + reply } } diff --git a/asynchronix/src/rpc/services.rs b/asynchronix/src/rpc/services.rs index ed18e58..455c865 100644 --- a/asynchronix/src/rpc/services.rs +++ b/asynchronix/src/rpc/services.rs @@ -1,4 +1,5 @@ mod controller_service; +mod init_service; mod monitor_service; use std::time::Duration; @@ -9,6 +10,7 @@ use tai_time::MonotonicTime; use super::codegen::simulation::{Error, ErrorCode}; pub(crate) use controller_service::ControllerService; +pub(crate) use init_service::InitService; pub(crate) use monitor_service::MonitorService; /// Transforms an error code and a message into a Protobuf error. diff --git a/asynchronix/src/rpc/services/controller_service.rs b/asynchronix/src/rpc/services/controller_service.rs index 037946b..2c1bdf9 100644 --- a/asynchronix/src/rpc/services/controller_service.rs +++ b/asynchronix/src/rpc/services/controller_service.rs @@ -330,7 +330,7 @@ impl ControllerService { } } - /// Broadcasts an event from an event source immediately, blocking until + /// Broadcasts a query from a query source immediately, blocking until /// completion. /// /// Simulation time remains unchanged. diff --git a/asynchronix/src/rpc/services/init_service.rs b/asynchronix/src/rpc/services/init_service.rs new file mode 100644 index 0000000..551f41e --- /dev/null +++ b/asynchronix/src/rpc/services/init_service.rs @@ -0,0 +1,87 @@ +use ciborium; +use serde::de::DeserializeOwned; + +use crate::registry::EndpointRegistry; +use crate::simulation::SimInit; +use crate::simulation::Simulation; + +use super::{timestamp_to_monotonic, to_error}; + +use super::super::codegen::simulation::*; + +type DeserializationError = ciborium::de::Error; +type SimGen = Box< + dyn FnMut(&[u8]) -> Result<(SimInit, EndpointRegistry), DeserializationError> + Send + 'static, +>; + +/// Protobuf-based simulation initializer. +/// +/// An `InitService` creates a new simulation bench based on a serialized initialization configuration. +/// +/// It maps the `Init` method defined in `simulation.proto`. +pub(crate) struct InitService { + sim_gen: SimGen, +} + +impl InitService { + /// Creates a new `InitService`. + /// + /// The argument is a closure that takes a CBOR-serialized initialization + /// configuration and is called every time the simulation is (re)started by + /// the remote client. It must create a new `SimInit` object complemented by + /// a registry that exposes the public event and query interface. + pub(crate) fn new(mut sim_gen: F) -> Self + where + F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, + I: DeserializeOwned, + { + // Wrap `sim_gen` so it accepts a serialized init configuration. + let sim_gen = move |serialized_cfg: &[u8]| -> Result<(SimInit, EndpointRegistry), DeserializationError> { + let cfg = ciborium::from_reader(serialized_cfg)?; + + Ok(sim_gen(cfg)) + }; + + Self { + sim_gen: Box::new(sim_gen), + } + } + + /// Initializes the simulation based on the specified configuration. + pub(crate) fn init( + &mut self, + request: InitRequest, + ) -> (InitReply, Option<(Simulation, EndpointRegistry)>) { + let start_time = request.time.unwrap_or_default(); + + let reply = (self.sim_gen)(&request.cfg) + .map_err(|e| { + to_error( + ErrorCode::InvalidMessage, + format!( + "the initialization configuration could not be deserialized: {}", + e + ), + ) + }) + .and_then(|(sim_init, registry)| { + timestamp_to_monotonic(start_time) + .ok_or_else(|| { + to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field") + }) + .map(|start_time| (sim_init.init(start_time), registry)) + }); + + let (reply, bench) = match reply { + Ok(bench) => (init_reply::Result::Empty(()), Some(bench)), + Err(e) => (init_reply::Result::Error(e), None), + }; + + ( + InitReply { + result: Some(reply), + }, + bench, + ) + } +} diff --git a/asynchronix/src/rpc/wasm.rs b/asynchronix/src/rpc/wasm.rs index dc443a6..e0dd32a 100644 --- a/asynchronix/src/rpc/wasm.rs +++ b/asynchronix/src/rpc/wasm.rs @@ -18,6 +18,7 @@ //! `SimulationService`, and [`WasmSimulationService::process_request`] as //! `SimulationService.processRequest`. +use serde::de::DeserializeOwned; use wasm_bindgen::prelude::*; use crate::registry::EndpointRegistry; @@ -75,9 +76,10 @@ impl WasmSimulationService { /// (re)started by the remote client. It must create a new `SimInit` object /// complemented by a registry that exposes the public event and query /// interface. - pub fn new(sim_gen: F) -> Self + pub fn new(sim_gen: F) -> Self where - F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, + F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static, + I: DeserializeOwned, { Self(ProtobufService::new(sim_gen)) }