1
0
forked from ROMEO/nexosim

Merge pull request #29 from asynchronics/feature/grpc-init-parameters

Accept an arbitrary argument for remote init
This commit is contained in:
Serge Barral 2024-07-31 16:00:29 +02:00 committed by GitHub
commit a6a2c85129
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 159 additions and 67 deletions

View File

@ -17,8 +17,7 @@ enum ErrorCode {
INVALID_KEY = 6;
SOURCE_NOT_FOUND = 10;
SINK_NOT_FOUND = 11;
KEY_NOT_FOUND = 12;
SIMULATION_TIME_OUT_OF_RANGE = 13;
SIMULATION_TIME_OUT_OF_RANGE = 12;
}
message Error {
@ -31,7 +30,10 @@ message EventKey {
uint64 subkey2 = 2;
}
message InitRequest { google.protobuf.Timestamp time = 1; }
message InitRequest {
google.protobuf.Timestamp time = 1;
bytes cfg = 2;
}
message InitReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;

View File

@ -20,6 +20,8 @@ pub struct EventKey {
pub struct InitRequest {
#[prost(message, optional, tag = "1")]
pub time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(bytes = "vec", tag = "2")]
pub cfg: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@ -382,8 +384,7 @@ pub enum ErrorCode {
InvalidKey = 6,
SourceNotFound = 10,
SinkNotFound = 11,
KeyNotFound = 12,
SimulationTimeOutOfRange = 13,
SimulationTimeOutOfRange = 12,
}
impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition.
@ -401,7 +402,6 @@ impl ErrorCode {
ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND",
ErrorCode::KeyNotFound => "KEY_NOT_FOUND",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
}
}
@ -417,7 +417,6 @@ impl ErrorCode {
"INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound),
"KEY_NOT_FOUND" => Some(Self::KeyNotFound),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
_ => None,
}

View File

@ -4,6 +4,7 @@ use std::net::SocketAddr;
use std::sync::Mutex;
use std::sync::MutexGuard;
use serde::de::DeserializeOwned;
use tonic::{transport::Server, Request, Response, Status};
use crate::registry::EndpointRegistry;
@ -11,18 +12,31 @@ use crate::simulation::SimInit;
use super::codegen::simulation::*;
use super::key_registry::KeyRegistry;
use super::services::{timestamp_to_monotonic, ControllerService, MonitorService};
use super::services::InitService;
use super::services::{ControllerService, MonitorService};
/// Runs a gRPC simulation server.
///
/// The first argument is a closure that is called every time the simulation is
/// (re)started by the remote client. It must create a new `SimInit` object
/// complemented by a registry that exposes the public event and query
/// interface.
pub fn run<F>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
run_service(GrpcSimulationService::new(sim_gen), addr)
}
/// Monomorphization of the networking code.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the controller and monitor services can be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
@ -30,11 +44,9 @@ where
.enable_io()
.build()?;
let sim_manager = GrpcSimulationService::new(sim_gen);
rt.block_on(async move {
Server::builder()
.add_service(simulation_server::SimulationServer::new(sim_manager))
.add_service(simulation_server::SimulationServer::new(service))
.serve(addr)
.await?;
@ -43,7 +55,7 @@ where
}
struct GrpcSimulationService {
sim_gen: Mutex<Box<dyn FnMut() -> (SimInit, EndpointRegistry) + Send + 'static>>,
init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>,
monitor_service: Mutex<MonitorService>,
}
@ -51,21 +63,27 @@ struct GrpcSimulationService {
impl GrpcSimulationService {
/// Creates a new `GrpcSimulationService` without any active simulation.
///
/// The argument is a closure that is called every time the simulation is
/// (re)started by the remote client. It must create a new `SimInit` object
/// complemented by a registry that exposes the public event and query
/// interface.
pub(crate) fn new<F>(sim_gen: F) -> Self
/// The argument is a closure that takes an initialization configuration and
/// is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
pub(crate) fn new<F, I>(sim_gen: F) -> Self
where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
Self {
sim_gen: Mutex::new(Box::new(sim_gen)),
init_service: Mutex::new(InitService::new(sim_gen)),
controller_service: Mutex::new(ControllerService::NotStarted),
monitor_service: Mutex::new(MonitorService::NotStarted),
}
}
/// Locks the initializer and returns the mutex guard.
fn initializer(&self) -> MutexGuard<'_, InitService> {
self.init_service.lock().unwrap()
}
/// Locks the controller and returns the mutex guard.
fn controller(&self) -> MutexGuard<'_, ControllerService> {
self.controller_service.lock().unwrap()
@ -82,10 +100,9 @@ impl simulation_server::Simulation for GrpcSimulationService {
async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitReply>, Status> {
let request = request.into_inner();
let start_time = request.time.unwrap_or_default();
let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) {
let (sim_init, endpoint_registry) = (self.sim_gen.lock().unwrap())();
let simulation = sim_init.init(start_time);
let (reply, bench) = self.initializer().init(request);
if let Some((simulation, endpoint_registry)) = bench {
*self.controller() = ControllerService::Started {
simulation,
event_source_registry: endpoint_registry.event_source_registry,
@ -95,18 +112,9 @@ impl simulation_server::Simulation for GrpcSimulationService {
*self.monitor() = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry,
};
}
init_reply::Result::Empty(())
} else {
init_reply::Result::Error(Error {
code: ErrorCode::InvalidTime as i32,
message: "out-of-range nanosecond field".to_string(),
})
};
Ok(Response::new(InitReply {
result: Some(reply),
}))
Ok(Response::new(reply))
}
async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> {
let request = request.into_inner();

View File

@ -3,13 +3,14 @@ use std::fmt;
use bytes::Buf;
use prost::Message;
use serde::de::DeserializeOwned;
use crate::registry::EndpointRegistry;
use crate::rpc::key_registry::KeyRegistry;
use crate::simulation::SimInit;
use super::codegen::simulation::*;
use super::services::{timestamp_to_monotonic, ControllerService, MonitorService};
use super::services::{ControllerService, InitService, MonitorService};
/// Protobuf-based simulation manager.
///
@ -21,7 +22,7 @@ use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}
/// Its methods map the various RPC service methods defined in
/// `simulation.proto`.
pub(crate) struct ProtobufService {
sim_gen: Box<dyn FnMut() -> (SimInit, EndpointRegistry) + Send + 'static>,
init_service: InitService,
controller_service: ControllerService,
monitor_service: MonitorService,
}
@ -29,16 +30,17 @@ pub(crate) struct ProtobufService {
impl ProtobufService {
/// Creates a new `ProtobufService` without any active simulation.
///
/// The argument is a closure that is called every time the simulation is
/// (re)started by the remote client. It must create a new `SimInit` object
/// complemented by a registry that exposes the public event and query
/// interface.
pub(crate) fn new<F>(sim_gen: F) -> Self
/// The argument is a closure that takes an initialization configuration and
/// is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
pub(crate) fn new<F, I>(sim_gen: F) -> Self
where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
Self {
sim_gen: Box::new(sim_gen),
init_service: InitService::new(sim_gen),
controller_service: ControllerService::NotStarted,
monitor_service: MonitorService::NotStarted,
}
@ -106,10 +108,9 @@ impl ProtobufService {
/// If the initialization time is not provided, it is initialized with the
/// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI).
fn init(&mut self, request: InitRequest) -> InitReply {
let start_time = request.time.unwrap_or_default();
let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) {
let (sim_init, endpoint_registry) = (self.sim_gen)();
let simulation = sim_init.init(start_time);
let (reply, bench) = self.init_service.init(request);
if let Some((simulation, endpoint_registry)) = bench {
self.controller_service = ControllerService::Started {
simulation,
event_source_registry: endpoint_registry.event_source_registry,
@ -119,18 +120,9 @@ impl ProtobufService {
self.monitor_service = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry,
};
init_reply::Result::Empty(())
} else {
init_reply::Result::Error(Error {
code: ErrorCode::InvalidTime as i32,
message: "out-of-range nanosecond field".to_string(),
})
};
InitReply {
result: Some(reply),
}
reply
}
}

View File

@ -1,4 +1,5 @@
mod controller_service;
mod init_service;
mod monitor_service;
use std::time::Duration;
@ -9,6 +10,7 @@ use tai_time::MonotonicTime;
use super::codegen::simulation::{Error, ErrorCode};
pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
pub(crate) use monitor_service::MonitorService;
/// Transforms an error code and a message into a Protobuf error.

View File

@ -330,7 +330,7 @@ impl ControllerService {
}
}
/// Broadcasts an event from an event source immediately, blocking until
/// Broadcasts a query from a query source immediately, blocking until
/// completion.
///
/// Simulation time remains unchanged.

View File

@ -0,0 +1,87 @@
use ciborium;
use serde::de::DeserializeOwned;
use crate::registry::EndpointRegistry;
use crate::simulation::SimInit;
use crate::simulation::Simulation;
use super::{timestamp_to_monotonic, to_error};
use super::super::codegen::simulation::*;
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SimGen = Box<
dyn FnMut(&[u8]) -> Result<(SimInit, EndpointRegistry), DeserializationError> + Send + 'static,
>;
/// Protobuf-based simulation initializer.
///
/// An `InitService` creates a new simulation bench based on a serialized initialization configuration.
///
/// It maps the `Init` method defined in `simulation.proto`.
pub(crate) struct InitService {
sim_gen: SimGen,
}
impl InitService {
/// Creates a new `InitService`.
///
/// The argument is a closure that takes a CBOR-serialized initialization
/// configuration and is called every time the simulation is (re)started by
/// the remote client. It must create a new `SimInit` object complemented by
/// a registry that exposes the public event and query interface.
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
// Wrap `sim_gen` so it accepts a serialized init configuration.
let sim_gen = move |serialized_cfg: &[u8]| -> Result<(SimInit, EndpointRegistry), DeserializationError> {
let cfg = ciborium::from_reader(serialized_cfg)?;
Ok(sim_gen(cfg))
};
Self {
sim_gen: Box::new(sim_gen),
}
}
/// Initializes the simulation based on the specified configuration.
pub(crate) fn init(
&mut self,
request: InitRequest,
) -> (InitReply, Option<(Simulation, EndpointRegistry)>) {
let start_time = request.time.unwrap_or_default();
let reply = (self.sim_gen)(&request.cfg)
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the initialization configuration could not be deserialized: {}",
e
),
)
})
.and_then(|(sim_init, registry)| {
timestamp_to_monotonic(start_time)
.ok_or_else(|| {
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
})
.map(|start_time| (sim_init.init(start_time), registry))
});
let (reply, bench) = match reply {
Ok(bench) => (init_reply::Result::Empty(()), Some(bench)),
Err(e) => (init_reply::Result::Error(e), None),
};
(
InitReply {
result: Some(reply),
},
bench,
)
}
}

View File

@ -18,6 +18,7 @@
//! `SimulationService`, and [`WasmSimulationService::process_request`] as
//! `SimulationService.processRequest`.
use serde::de::DeserializeOwned;
use wasm_bindgen::prelude::*;
use crate::registry::EndpointRegistry;
@ -75,9 +76,10 @@ impl WasmSimulationService {
/// (re)started by the remote client. It must create a new `SimInit` object
/// complemented by a registry that exposes the public event and query
/// interface.
pub fn new<F>(sim_gen: F) -> Self
pub fn new<F, I>(sim_gen: F) -> Self
where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
Self(ProtobufService::new(sim_gen))
}