1
0
forked from ROMEO/nexosim

Make the gRPC init more general

Instead of producing a SimInit object, a bench is now expected to return
a fully constructed simulation with its scheduler.

This means that the client does not necessarily need to provide the
starting time for the simulation. This start time may be hardcoded in
the bench, or may be taken as a parameter for the bench configuration.

This change make it possible for benches to do more, for instance to
pre-schedule some events, or to do less, for instance by hardcoding the
simulation time rather than accept an arbitrary simulation time.
This commit is contained in:
Serge Barral 2024-11-15 23:18:39 +01:00
parent c749a49154
commit 84ad02a248
5 changed files with 34 additions and 35 deletions

View File

@ -38,7 +38,6 @@ message EventKey {
}
message InitRequest {
google.protobuf.Timestamp time = 1;
bytes cfg = 2;
}
message InitReply {

View File

@ -15,8 +15,6 @@ pub struct EventKey {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InitRequest {
#[prost(message, optional, tag = "1")]
pub time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(bytes = "vec", tag = "2")]
pub cfg: ::prost::alloc::vec::Vec<u8>,
}

View File

@ -8,7 +8,7 @@ use serde::de::DeserializeOwned;
use tonic::{transport::Server, Request, Response, Status};
use crate::registry::EndpointRegistry;
use crate::simulation::SimInit;
use crate::simulation::{Scheduler, Simulation, SimulationError};
use super::codegen::simulation::*;
use super::key_registry::KeyRegistry;
@ -19,11 +19,13 @@ use super::services::{ControllerService, MonitorService};
///
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
/// It must create a new simulation and its scheduler, complemented by a
/// registry that exposes the public event and query interface.
pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>
+ Send
+ 'static,
I: DeserializeOwned,
{
run_service(GrpcSimulationService::new(sim_gen), addr)
@ -65,11 +67,13 @@ impl GrpcSimulationService {
///
/// The argument is a closure that takes an initialization configuration and
/// is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
/// It must create a new simulation and its scheduler, complemented by a
/// registry that exposes the public event and query interface.
pub(crate) fn new<F, I>(sim_gen: F) -> Self
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>
+ Send
+ 'static,
I: DeserializeOwned,
{
Self {

View File

@ -8,7 +8,7 @@ use prost_types::Timestamp;
use tai_time::MonotonicTime;
use super::codegen::simulation::{Error, ErrorCode};
use crate::simulation::{ExecutionError, SchedulingError};
use crate::simulation::{ExecutionError, SchedulingError, SimulationError};
pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
@ -59,6 +59,14 @@ fn map_scheduling_error(error: SchedulingError) -> Error {
to_error(error_code, error_message)
}
/// Map a `SimulationError` to a Protobuf error.
fn map_simulation_error(error: SimulationError) -> Error {
match error {
SimulationError::ExecutionError(e) => map_execution_error(e),
SimulationError::SchedulingError(e) => map_scheduling_error(e),
}
}
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
///
/// This will fail if the time is outside the protobuf-specified range for

View File

@ -2,20 +2,20 @@ use ciborium;
use serde::de::DeserializeOwned;
use crate::registry::EndpointRegistry;
use crate::simulation::{Scheduler, SimInit, Simulation};
use crate::simulation::{Scheduler, Simulation, SimulationError};
use super::{map_execution_error, timestamp_to_monotonic, to_error};
use super::{map_simulation_error, to_error};
use super::super::codegen::simulation::*;
type InitResult = Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>;
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SimGen = Box<
dyn FnMut(&[u8]) -> Result<(SimInit, EndpointRegistry), DeserializationError> + Send + 'static,
>;
type SimGen = Box<dyn FnMut(&[u8]) -> Result<InitResult, DeserializationError> + Send + 'static>;
/// Protobuf-based simulation initializer.
///
/// An `InitService` creates a new simulation bench based on a serialized initialization configuration.
/// An `InitService` creates a new simulation bench based on a serialized
/// initialization configuration.
///
/// It maps the `Init` method defined in `simulation.proto`.
pub(crate) struct InitService {
@ -27,15 +27,18 @@ impl InitService {
///
/// The argument is a closure that takes a CBOR-serialized initialization
/// configuration and is called every time the simulation is (re)started by
/// the remote client. It must create a new `SimInit` object complemented by
/// a registry that exposes the public event and query interface.
/// the remote client. It must create a new simulation and its scheduler,
/// complemented by a registry that exposes the public event and query
/// interface.
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
F: FnMut(I) -> Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>
+ Send
+ 'static,
I: DeserializeOwned,
{
// Wrap `sim_gen` so it accepts a serialized init configuration.
let sim_gen = move |serialized_cfg: &[u8]| -> Result<(SimInit, EndpointRegistry), DeserializationError> {
let sim_gen = move |serialized_cfg: &[u8]| -> Result<InitResult, DeserializationError> {
let cfg = ciborium::from_reader(serialized_cfg)?;
Ok(sim_gen(cfg))
@ -51,8 +54,6 @@ impl InitService {
&mut self,
request: InitRequest,
) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) {
let start_time = request.time.unwrap_or_default();
let reply = (self.sim_gen)(&request.cfg)
.map_err(|e| {
to_error(
@ -63,18 +64,7 @@ impl InitService {
),
)
})
.and_then(|(sim_init, registry)| {
timestamp_to_monotonic(start_time)
.ok_or_else(|| {
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
})
.and_then(|start_time| {
sim_init
.init(start_time)
.map_err(map_execution_error)
.map(|(sim, sched)| (sim, sched, registry))
})
});
.and_then(|init_result| init_result.map_err(map_simulation_error));
let (reply, bench) = match reply {
Ok(bench) => (init_reply::Result::Empty(()), Some(bench)),