forked from ROMEO/nexosim
Merge pull request #64 from asynchronics/feature/grpc_api_change
Make the gRPC init more general
This commit is contained in:
commit
a96a4dc0bd
@ -38,7 +38,6 @@ message EventKey {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message InitRequest {
|
message InitRequest {
|
||||||
google.protobuf.Timestamp time = 1;
|
|
||||||
bytes cfg = 2;
|
bytes cfg = 2;
|
||||||
}
|
}
|
||||||
message InitReply {
|
message InitReply {
|
||||||
|
@ -15,8 +15,6 @@ pub struct EventKey {
|
|||||||
}
|
}
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
pub struct InitRequest {
|
pub struct InitRequest {
|
||||||
#[prost(message, optional, tag = "1")]
|
|
||||||
pub time: ::core::option::Option<::prost_types::Timestamp>,
|
|
||||||
#[prost(bytes = "vec", tag = "2")]
|
#[prost(bytes = "vec", tag = "2")]
|
||||||
pub cfg: ::prost::alloc::vec::Vec<u8>,
|
pub cfg: ::prost::alloc::vec::Vec<u8>,
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ use serde::de::DeserializeOwned;
|
|||||||
use tonic::{transport::Server, Request, Response, Status};
|
use tonic::{transport::Server, Request, Response, Status};
|
||||||
|
|
||||||
use crate::registry::EndpointRegistry;
|
use crate::registry::EndpointRegistry;
|
||||||
use crate::simulation::SimInit;
|
use crate::simulation::{Scheduler, Simulation, SimulationError};
|
||||||
|
|
||||||
use super::codegen::simulation::*;
|
use super::codegen::simulation::*;
|
||||||
use super::key_registry::KeyRegistry;
|
use super::key_registry::KeyRegistry;
|
||||||
@ -19,11 +19,13 @@ use super::services::{ControllerService, MonitorService};
|
|||||||
///
|
///
|
||||||
/// The first argument is a closure that takes an initialization configuration
|
/// The first argument is a closure that takes an initialization configuration
|
||||||
/// and is called every time the simulation is (re)started by the remote client.
|
/// and is called every time the simulation is (re)started by the remote client.
|
||||||
/// It must create a new `SimInit` object complemented by a registry that
|
/// It must create a new simulation and its scheduler, complemented by a
|
||||||
/// exposes the public event and query interface.
|
/// registry that exposes the public event and query interface.
|
||||||
pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
|
pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
|
||||||
where
|
where
|
||||||
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
|
F: FnMut(I) -> Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
I: DeserializeOwned,
|
I: DeserializeOwned,
|
||||||
{
|
{
|
||||||
run_service(GrpcSimulationService::new(sim_gen), addr)
|
run_service(GrpcSimulationService::new(sim_gen), addr)
|
||||||
@ -65,11 +67,13 @@ impl GrpcSimulationService {
|
|||||||
///
|
///
|
||||||
/// The argument is a closure that takes an initialization configuration and
|
/// The argument is a closure that takes an initialization configuration and
|
||||||
/// is called every time the simulation is (re)started by the remote client.
|
/// is called every time the simulation is (re)started by the remote client.
|
||||||
/// It must create a new `SimInit` object complemented by a registry that
|
/// It must create a new simulation and its scheduler, complemented by a
|
||||||
/// exposes the public event and query interface.
|
/// registry that exposes the public event and query interface.
|
||||||
pub(crate) fn new<F, I>(sim_gen: F) -> Self
|
pub(crate) fn new<F, I>(sim_gen: F) -> Self
|
||||||
where
|
where
|
||||||
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
|
F: FnMut(I) -> Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
I: DeserializeOwned,
|
I: DeserializeOwned,
|
||||||
{
|
{
|
||||||
Self {
|
Self {
|
||||||
|
@ -8,7 +8,7 @@ use prost_types::Timestamp;
|
|||||||
use tai_time::MonotonicTime;
|
use tai_time::MonotonicTime;
|
||||||
|
|
||||||
use super::codegen::simulation::{Error, ErrorCode};
|
use super::codegen::simulation::{Error, ErrorCode};
|
||||||
use crate::simulation::{ExecutionError, SchedulingError};
|
use crate::simulation::{ExecutionError, SchedulingError, SimulationError};
|
||||||
|
|
||||||
pub(crate) use controller_service::ControllerService;
|
pub(crate) use controller_service::ControllerService;
|
||||||
pub(crate) use init_service::InitService;
|
pub(crate) use init_service::InitService;
|
||||||
@ -59,6 +59,14 @@ fn map_scheduling_error(error: SchedulingError) -> Error {
|
|||||||
to_error(error_code, error_message)
|
to_error(error_code, error_message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Map a `SimulationError` to a Protobuf error.
|
||||||
|
fn map_simulation_error(error: SimulationError) -> Error {
|
||||||
|
match error {
|
||||||
|
SimulationError::ExecutionError(e) => map_execution_error(e),
|
||||||
|
SimulationError::SchedulingError(e) => map_scheduling_error(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
|
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
|
||||||
///
|
///
|
||||||
/// This will fail if the time is outside the protobuf-specified range for
|
/// This will fail if the time is outside the protobuf-specified range for
|
||||||
|
@ -2,20 +2,20 @@ use ciborium;
|
|||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
|
|
||||||
use crate::registry::EndpointRegistry;
|
use crate::registry::EndpointRegistry;
|
||||||
use crate::simulation::{Scheduler, SimInit, Simulation};
|
use crate::simulation::{Scheduler, Simulation, SimulationError};
|
||||||
|
|
||||||
use super::{map_execution_error, timestamp_to_monotonic, to_error};
|
use super::{map_simulation_error, to_error};
|
||||||
|
|
||||||
use super::super::codegen::simulation::*;
|
use super::super::codegen::simulation::*;
|
||||||
|
|
||||||
|
type InitResult = Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>;
|
||||||
type DeserializationError = ciborium::de::Error<std::io::Error>;
|
type DeserializationError = ciborium::de::Error<std::io::Error>;
|
||||||
type SimGen = Box<
|
type SimGen = Box<dyn FnMut(&[u8]) -> Result<InitResult, DeserializationError> + Send + 'static>;
|
||||||
dyn FnMut(&[u8]) -> Result<(SimInit, EndpointRegistry), DeserializationError> + Send + 'static,
|
|
||||||
>;
|
|
||||||
|
|
||||||
/// Protobuf-based simulation initializer.
|
/// Protobuf-based simulation initializer.
|
||||||
///
|
///
|
||||||
/// An `InitService` creates a new simulation bench based on a serialized initialization configuration.
|
/// An `InitService` creates a new simulation bench based on a serialized
|
||||||
|
/// initialization configuration.
|
||||||
///
|
///
|
||||||
/// It maps the `Init` method defined in `simulation.proto`.
|
/// It maps the `Init` method defined in `simulation.proto`.
|
||||||
pub(crate) struct InitService {
|
pub(crate) struct InitService {
|
||||||
@ -27,15 +27,18 @@ impl InitService {
|
|||||||
///
|
///
|
||||||
/// The argument is a closure that takes a CBOR-serialized initialization
|
/// The argument is a closure that takes a CBOR-serialized initialization
|
||||||
/// configuration and is called every time the simulation is (re)started by
|
/// configuration and is called every time the simulation is (re)started by
|
||||||
/// the remote client. It must create a new `SimInit` object complemented by
|
/// the remote client. It must create a new simulation and its scheduler,
|
||||||
/// a registry that exposes the public event and query interface.
|
/// complemented by a registry that exposes the public event and query
|
||||||
|
/// interface.
|
||||||
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
|
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
|
||||||
where
|
where
|
||||||
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
|
F: FnMut(I) -> Result<(Simulation, Scheduler, EndpointRegistry), SimulationError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
I: DeserializeOwned,
|
I: DeserializeOwned,
|
||||||
{
|
{
|
||||||
// Wrap `sim_gen` so it accepts a serialized init configuration.
|
// Wrap `sim_gen` so it accepts a serialized init configuration.
|
||||||
let sim_gen = move |serialized_cfg: &[u8]| -> Result<(SimInit, EndpointRegistry), DeserializationError> {
|
let sim_gen = move |serialized_cfg: &[u8]| -> Result<InitResult, DeserializationError> {
|
||||||
let cfg = ciborium::from_reader(serialized_cfg)?;
|
let cfg = ciborium::from_reader(serialized_cfg)?;
|
||||||
|
|
||||||
Ok(sim_gen(cfg))
|
Ok(sim_gen(cfg))
|
||||||
@ -51,8 +54,6 @@ impl InitService {
|
|||||||
&mut self,
|
&mut self,
|
||||||
request: InitRequest,
|
request: InitRequest,
|
||||||
) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) {
|
) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) {
|
||||||
let start_time = request.time.unwrap_or_default();
|
|
||||||
|
|
||||||
let reply = (self.sim_gen)(&request.cfg)
|
let reply = (self.sim_gen)(&request.cfg)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
to_error(
|
to_error(
|
||||||
@ -63,18 +64,7 @@ impl InitService {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.and_then(|(sim_init, registry)| {
|
.and_then(|init_result| init_result.map_err(map_simulation_error));
|
||||||
timestamp_to_monotonic(start_time)
|
|
||||||
.ok_or_else(|| {
|
|
||||||
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
|
|
||||||
})
|
|
||||||
.and_then(|start_time| {
|
|
||||||
sim_init
|
|
||||||
.init(start_time)
|
|
||||||
.map_err(map_execution_error)
|
|
||||||
.map(|(sim, sched)| (sim, sched, registry))
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let (reply, bench) = match reply {
|
let (reply, bench) = match reply {
|
||||||
Ok(bench) => (init_reply::Result::Empty(()), Some(bench)),
|
Ok(bench) => (init_reply::Result::Empty(()), Some(bench)),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user