1
0
forked from ROMEO/nexosim

Accept an arbitrary argument for remote init

This commit is contained in:
Serge Barral 2024-07-29 18:02:06 +02:00
parent cb7caa10e9
commit 1b0395f693
8 changed files with 159 additions and 67 deletions

View File

@ -17,8 +17,7 @@ enum ErrorCode {
INVALID_KEY = 6; INVALID_KEY = 6;
SOURCE_NOT_FOUND = 10; SOURCE_NOT_FOUND = 10;
SINK_NOT_FOUND = 11; SINK_NOT_FOUND = 11;
KEY_NOT_FOUND = 12; SIMULATION_TIME_OUT_OF_RANGE = 12;
SIMULATION_TIME_OUT_OF_RANGE = 13;
} }
message Error { message Error {
@ -31,7 +30,10 @@ message EventKey {
uint64 subkey2 = 2; uint64 subkey2 = 2;
} }
message InitRequest { google.protobuf.Timestamp time = 1; } message InitRequest {
google.protobuf.Timestamp time = 1;
bytes cfg = 2;
}
message InitReply { message InitReply {
oneof result { // Always returns exactly 1 variant. oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1; google.protobuf.Empty empty = 1;

View File

@ -20,6 +20,8 @@ pub struct EventKey {
pub struct InitRequest { pub struct InitRequest {
#[prost(message, optional, tag = "1")] #[prost(message, optional, tag = "1")]
pub time: ::core::option::Option<::prost_types::Timestamp>, pub time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(bytes = "vec", tag = "2")]
pub cfg: ::prost::alloc::vec::Vec<u8>,
} }
#[allow(clippy::derive_partial_eq_without_eq)] #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)] #[derive(Clone, PartialEq, ::prost::Message)]
@ -382,8 +384,7 @@ pub enum ErrorCode {
InvalidKey = 6, InvalidKey = 6,
SourceNotFound = 10, SourceNotFound = 10,
SinkNotFound = 11, SinkNotFound = 11,
KeyNotFound = 12, SimulationTimeOutOfRange = 12,
SimulationTimeOutOfRange = 13,
} }
impl ErrorCode { impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition. /// String value of the enum field names used in the ProtoBuf definition.
@ -401,7 +402,6 @@ impl ErrorCode {
ErrorCode::InvalidKey => "INVALID_KEY", ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND", ErrorCode::SinkNotFound => "SINK_NOT_FOUND",
ErrorCode::KeyNotFound => "KEY_NOT_FOUND",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
} }
} }
@ -417,7 +417,6 @@ impl ErrorCode {
"INVALID_KEY" => Some(Self::InvalidKey), "INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound), "SINK_NOT_FOUND" => Some(Self::SinkNotFound),
"KEY_NOT_FOUND" => Some(Self::KeyNotFound),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
_ => None, _ => None,
} }

View File

@ -4,6 +4,7 @@ use std::net::SocketAddr;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::MutexGuard; use std::sync::MutexGuard;
use serde::de::DeserializeOwned;
use tonic::{transport::Server, Request, Response, Status}; use tonic::{transport::Server, Request, Response, Status};
use crate::registry::EndpointRegistry; use crate::registry::EndpointRegistry;
@ -11,18 +12,31 @@ use crate::simulation::SimInit;
use super::codegen::simulation::*; use super::codegen::simulation::*;
use super::key_registry::KeyRegistry; use super::key_registry::KeyRegistry;
use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}; use super::services::InitService;
use super::services::{ControllerService, MonitorService};
/// Runs a gRPC simulation server. /// Runs a gRPC simulation server.
/// ///
/// The first argument is a closure that is called every time the simulation is /// The first argument is a closure that takes an initialization configuration
/// (re)started by the remote client. It must create a new `SimInit` object /// and is called every time the simulation is (re)started by the remote client.
/// complemented by a registry that exposes the public event and query /// It must create a new `SimInit` object complemented by a registry that
/// interface. /// exposes the public event and query interface.
pub fn run<F>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
where where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{ {
run_service(GrpcSimulationService::new(sim_gen), addr)
}
/// Monomorphization of the networking code.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the controller and monitor services can be used // Use 2 threads so that the controller and monitor services can be used
// concurrently. // concurrently.
let rt = tokio::runtime::Builder::new_multi_thread() let rt = tokio::runtime::Builder::new_multi_thread()
@ -30,11 +44,9 @@ where
.enable_io() .enable_io()
.build()?; .build()?;
let sim_manager = GrpcSimulationService::new(sim_gen);
rt.block_on(async move { rt.block_on(async move {
Server::builder() Server::builder()
.add_service(simulation_server::SimulationServer::new(sim_manager)) .add_service(simulation_server::SimulationServer::new(service))
.serve(addr) .serve(addr)
.await?; .await?;
@ -43,7 +55,7 @@ where
} }
struct GrpcSimulationService { struct GrpcSimulationService {
sim_gen: Mutex<Box<dyn FnMut() -> (SimInit, EndpointRegistry) + Send + 'static>>, init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>, controller_service: Mutex<ControllerService>,
monitor_service: Mutex<MonitorService>, monitor_service: Mutex<MonitorService>,
} }
@ -51,21 +63,27 @@ struct GrpcSimulationService {
impl GrpcSimulationService { impl GrpcSimulationService {
/// Creates a new `GrpcSimulationService` without any active simulation. /// Creates a new `GrpcSimulationService` without any active simulation.
/// ///
/// The argument is a closure that is called every time the simulation is /// The argument is a closure that takes an initialization configuration and
/// (re)started by the remote client. It must create a new `SimInit` object /// is called every time the simulation is (re)started by the remote client.
/// complemented by a registry that exposes the public event and query /// It must create a new `SimInit` object complemented by a registry that
/// interface. /// exposes the public event and query interface.
pub(crate) fn new<F>(sim_gen: F) -> Self pub(crate) fn new<F, I>(sim_gen: F) -> Self
where where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{ {
Self { Self {
sim_gen: Mutex::new(Box::new(sim_gen)), init_service: Mutex::new(InitService::new(sim_gen)),
controller_service: Mutex::new(ControllerService::NotStarted), controller_service: Mutex::new(ControllerService::NotStarted),
monitor_service: Mutex::new(MonitorService::NotStarted), monitor_service: Mutex::new(MonitorService::NotStarted),
} }
} }
/// Locks the initializer and returns the mutex guard.
fn initializer(&self) -> MutexGuard<'_, InitService> {
self.init_service.lock().unwrap()
}
/// Locks the controller and returns the mutex guard. /// Locks the controller and returns the mutex guard.
fn controller(&self) -> MutexGuard<'_, ControllerService> { fn controller(&self) -> MutexGuard<'_, ControllerService> {
self.controller_service.lock().unwrap() self.controller_service.lock().unwrap()
@ -82,10 +100,9 @@ impl simulation_server::Simulation for GrpcSimulationService {
async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitReply>, Status> { async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitReply>, Status> {
let request = request.into_inner(); let request = request.into_inner();
let start_time = request.time.unwrap_or_default(); let (reply, bench) = self.initializer().init(request);
let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) {
let (sim_init, endpoint_registry) = (self.sim_gen.lock().unwrap())(); if let Some((simulation, endpoint_registry)) = bench {
let simulation = sim_init.init(start_time);
*self.controller() = ControllerService::Started { *self.controller() = ControllerService::Started {
simulation, simulation,
event_source_registry: endpoint_registry.event_source_registry, event_source_registry: endpoint_registry.event_source_registry,
@ -95,18 +112,9 @@ impl simulation_server::Simulation for GrpcSimulationService {
*self.monitor() = MonitorService::Started { *self.monitor() = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry, event_sink_registry: endpoint_registry.event_sink_registry,
}; };
}
init_reply::Result::Empty(()) Ok(Response::new(reply))
} else {
init_reply::Result::Error(Error {
code: ErrorCode::InvalidTime as i32,
message: "out-of-range nanosecond field".to_string(),
})
};
Ok(Response::new(InitReply {
result: Some(reply),
}))
} }
async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> { async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> {
let request = request.into_inner(); let request = request.into_inner();

View File

@ -3,13 +3,14 @@ use std::fmt;
use bytes::Buf; use bytes::Buf;
use prost::Message; use prost::Message;
use serde::de::DeserializeOwned;
use crate::registry::EndpointRegistry; use crate::registry::EndpointRegistry;
use crate::rpc::key_registry::KeyRegistry; use crate::rpc::key_registry::KeyRegistry;
use crate::simulation::SimInit; use crate::simulation::SimInit;
use super::codegen::simulation::*; use super::codegen::simulation::*;
use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}; use super::services::{ControllerService, InitService, MonitorService};
/// Protobuf-based simulation manager. /// Protobuf-based simulation manager.
/// ///
@ -21,7 +22,7 @@ use super::services::{timestamp_to_monotonic, ControllerService, MonitorService}
/// Its methods map the various RPC service methods defined in /// Its methods map the various RPC service methods defined in
/// `simulation.proto`. /// `simulation.proto`.
pub(crate) struct ProtobufService { pub(crate) struct ProtobufService {
sim_gen: Box<dyn FnMut() -> (SimInit, EndpointRegistry) + Send + 'static>, init_service: InitService,
controller_service: ControllerService, controller_service: ControllerService,
monitor_service: MonitorService, monitor_service: MonitorService,
} }
@ -29,16 +30,17 @@ pub(crate) struct ProtobufService {
impl ProtobufService { impl ProtobufService {
/// Creates a new `ProtobufService` without any active simulation. /// Creates a new `ProtobufService` without any active simulation.
/// ///
/// The argument is a closure that is called every time the simulation is /// The argument is a closure that takes an initialization configuration and
/// (re)started by the remote client. It must create a new `SimInit` object /// is called every time the simulation is (re)started by the remote client.
/// complemented by a registry that exposes the public event and query /// It must create a new `SimInit` object complemented by a registry that
/// interface. /// exposes the public event and query interface.
pub(crate) fn new<F>(sim_gen: F) -> Self pub(crate) fn new<F, I>(sim_gen: F) -> Self
where where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{ {
Self { Self {
sim_gen: Box::new(sim_gen), init_service: InitService::new(sim_gen),
controller_service: ControllerService::NotStarted, controller_service: ControllerService::NotStarted,
monitor_service: MonitorService::NotStarted, monitor_service: MonitorService::NotStarted,
} }
@ -106,10 +108,9 @@ impl ProtobufService {
/// If the initialization time is not provided, it is initialized with the /// If the initialization time is not provided, it is initialized with the
/// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI). /// epoch of `MonotonicTime` (1970-01-01 00:00:00 TAI).
fn init(&mut self, request: InitRequest) -> InitReply { fn init(&mut self, request: InitRequest) -> InitReply {
let start_time = request.time.unwrap_or_default(); let (reply, bench) = self.init_service.init(request);
let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) {
let (sim_init, endpoint_registry) = (self.sim_gen)(); if let Some((simulation, endpoint_registry)) = bench {
let simulation = sim_init.init(start_time);
self.controller_service = ControllerService::Started { self.controller_service = ControllerService::Started {
simulation, simulation,
event_source_registry: endpoint_registry.event_source_registry, event_source_registry: endpoint_registry.event_source_registry,
@ -119,18 +120,9 @@ impl ProtobufService {
self.monitor_service = MonitorService::Started { self.monitor_service = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry, event_sink_registry: endpoint_registry.event_sink_registry,
}; };
init_reply::Result::Empty(())
} else {
init_reply::Result::Error(Error {
code: ErrorCode::InvalidTime as i32,
message: "out-of-range nanosecond field".to_string(),
})
};
InitReply {
result: Some(reply),
} }
reply
} }
} }

View File

@ -1,4 +1,5 @@
mod controller_service; mod controller_service;
mod init_service;
mod monitor_service; mod monitor_service;
use std::time::Duration; use std::time::Duration;
@ -9,6 +10,7 @@ use tai_time::MonotonicTime;
use super::codegen::simulation::{Error, ErrorCode}; use super::codegen::simulation::{Error, ErrorCode};
pub(crate) use controller_service::ControllerService; pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
pub(crate) use monitor_service::MonitorService; pub(crate) use monitor_service::MonitorService;
/// Transforms an error code and a message into a Protobuf error. /// Transforms an error code and a message into a Protobuf error.

View File

@ -330,7 +330,7 @@ impl ControllerService {
} }
} }
/// Broadcasts an event from an event source immediately, blocking until /// Broadcasts a query from a query source immediately, blocking until
/// completion. /// completion.
/// ///
/// Simulation time remains unchanged. /// Simulation time remains unchanged.

View File

@ -0,0 +1,87 @@
use ciborium;
use serde::de::DeserializeOwned;
use crate::registry::EndpointRegistry;
use crate::simulation::SimInit;
use crate::simulation::Simulation;
use super::{timestamp_to_monotonic, to_error};
use super::super::codegen::simulation::*;
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SimGen = Box<
dyn FnMut(&[u8]) -> Result<(SimInit, EndpointRegistry), DeserializationError> + Send + 'static,
>;
/// Protobuf-based simulation initializer.
///
/// An `InitService` creates a new simulation bench based on a serialized initialization configuration.
///
/// It maps the `Init` method defined in `simulation.proto`.
pub(crate) struct InitService {
sim_gen: SimGen,
}
impl InitService {
/// Creates a new `InitService`.
///
/// The argument is a closure that takes a CBOR-serialized initialization
/// configuration and is called every time the simulation is (re)started by
/// the remote client. It must create a new `SimInit` object complemented by
/// a registry that exposes the public event and query interface.
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
// Wrap `sim_gen` so it accepts a serialized init configuration.
let sim_gen = move |serialized_cfg: &[u8]| -> Result<(SimInit, EndpointRegistry), DeserializationError> {
let cfg = ciborium::from_reader(serialized_cfg)?;
Ok(sim_gen(cfg))
};
Self {
sim_gen: Box::new(sim_gen),
}
}
/// Initializes the simulation based on the specified configuration.
pub(crate) fn init(
&mut self,
request: InitRequest,
) -> (InitReply, Option<(Simulation, EndpointRegistry)>) {
let start_time = request.time.unwrap_or_default();
let reply = (self.sim_gen)(&request.cfg)
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the initialization configuration could not be deserialized: {}",
e
),
)
})
.and_then(|(sim_init, registry)| {
timestamp_to_monotonic(start_time)
.ok_or_else(|| {
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
})
.map(|start_time| (sim_init.init(start_time), registry))
});
let (reply, bench) = match reply {
Ok(bench) => (init_reply::Result::Empty(()), Some(bench)),
Err(e) => (init_reply::Result::Error(e), None),
};
(
InitReply {
result: Some(reply),
},
bench,
)
}
}

View File

@ -18,6 +18,7 @@
//! `SimulationService`, and [`WasmSimulationService::process_request`] as //! `SimulationService`, and [`WasmSimulationService::process_request`] as
//! `SimulationService.processRequest`. //! `SimulationService.processRequest`.
use serde::de::DeserializeOwned;
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use crate::registry::EndpointRegistry; use crate::registry::EndpointRegistry;
@ -75,9 +76,10 @@ impl WasmSimulationService {
/// (re)started by the remote client. It must create a new `SimInit` object /// (re)started by the remote client. It must create a new `SimInit` object
/// complemented by a registry that exposes the public event and query /// complemented by a registry that exposes the public event and query
/// interface. /// interface.
pub fn new<F>(sim_gen: F) -> Self pub fn new<F, I>(sim_gen: F) -> Self
where where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{ {
Self(ProtobufService::new(sim_gen)) Self(ProtobufService::new(sim_gen))
} }