1
0
forked from ROMEO/nexosim

Simplify gRPC backend arch + remove wasm backend

This commit is contained in:
Serge Barral
2024-09-08 13:54:09 +02:00
parent 3ccf05335b
commit 6b43fcf704
19 changed files with 60 additions and 410 deletions

View File

@ -0,0 +1,180 @@
// The main simulation protocol.
syntax = "proto3";
package simulation;
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
enum ErrorCode {
INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1;
MISSING_ARGUMENT = 2;
INVALID_TIME = 3;
INVALID_DURATION = 4;
INVALID_MESSAGE = 5;
INVALID_KEY = 6;
SOURCE_NOT_FOUND = 10;
SINK_NOT_FOUND = 11;
SIMULATION_TIME_OUT_OF_RANGE = 12;
}
message Error {
ErrorCode code = 1;
string message = 2;
}
message EventKey {
uint64 subkey1 = 1;
uint64 subkey2 = 2;
}
message InitRequest {
google.protobuf.Timestamp time = 1;
bytes cfg = 2;
}
message InitReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;
Error error = 100;
}
}
message TimeRequest {}
message TimeReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Timestamp time = 1;
Error error = 100;
}
}
message StepRequest {}
message StepReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Timestamp time = 1;
Error error = 100;
}
}
message StepUntilRequest {
oneof deadline { // Always returns exactly 1 variant.
google.protobuf.Timestamp time = 1;
google.protobuf.Duration duration = 2;
}
}
message StepUntilReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Timestamp time = 1;
Error error = 100;
}
}
message ScheduleEventRequest {
oneof deadline { // Expects exactly 1 variant.
google.protobuf.Timestamp time = 1;
google.protobuf.Duration duration = 2;
}
string source_name = 3;
bytes event = 4;
google.protobuf.Duration period = 5;
bool with_key = 6;
}
message ScheduleEventReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;
EventKey key = 2;
Error error = 100;
}
}
message CancelEventRequest { EventKey key = 1; }
message CancelEventReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;
Error error = 100;
}
}
message ProcessEventRequest {
string source_name = 1;
bytes event = 2;
}
message ProcessEventReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;
Error error = 100;
}
}
message ProcessQueryRequest {
string source_name = 1;
bytes request = 2;
}
message ProcessQueryReply {
// This field is hoisted because protobuf3 does not support `repeated` within
// a `oneof`. It is Always empty if an error is returned
repeated bytes replies = 1;
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 10;
Error error = 100;
}
}
message ReadEventsRequest { string sink_name = 1; }
message ReadEventsReply {
// This field is hoisted because protobuf3 does not support `repeated` within
// a `oneof`. It is Always empty if an error is returned
repeated bytes events = 1;
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 10;
Error error = 100;
}
}
message OpenSinkRequest { string sink_name = 1; }
message OpenSinkReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 10;
Error error = 100;
}
}
message CloseSinkRequest { string sink_name = 1; }
message CloseSinkReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 10;
Error error = 100;
}
}
// A convenience message type for custom transport implementation.
message AnyRequest {
oneof request { // Expects exactly 1 variant.
InitRequest init_request = 1;
TimeRequest time_request = 2;
StepRequest step_request = 3;
StepUntilRequest step_until_request = 4;
ScheduleEventRequest schedule_event_request = 5;
CancelEventRequest cancel_event_request = 6;
ProcessEventRequest process_event_request = 7;
ProcessQueryRequest process_query_request = 8;
ReadEventsRequest read_events_request = 9;
OpenSinkRequest open_sink_request = 10;
CloseSinkRequest close_sink_request = 11;
}
}
service Simulation {
rpc Init(InitRequest) returns (InitReply);
rpc Time(TimeRequest) returns (TimeReply);
rpc Step(StepRequest) returns (StepReply);
rpc StepUntil(StepUntilRequest) returns (StepUntilReply);
rpc ScheduleEvent(ScheduleEventRequest) returns (ScheduleEventReply);
rpc CancelEvent(CancelEventRequest) returns (CancelEventReply);
rpc ProcessEvent(ProcessEventRequest) returns (ProcessEventReply);
rpc ProcessQuery(ProcessQueryRequest) returns (ProcessQueryReply);
rpc ReadEvents(ReadEventsRequest) returns (ReadEventsReply);
rpc OpenSink(OpenSinkRequest) returns (OpenSinkReply);
rpc CloseSink(CloseSinkRequest) returns (CloseSinkReply);
}

View File

@ -0,0 +1,6 @@
#![allow(unreachable_pub)]
#![allow(clippy::enum_variant_names)]
#![allow(missing_docs)]
#[rustfmt::skip]
pub(crate) mod simulation;

View File

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,48 @@
use crate::simulation::ActionKey;
use crate::time::MonotonicTime;
use crate::util::indexed_priority_queue::{IndexedPriorityQueue, InsertKey};
pub(crate) type KeyRegistryId = InsertKey;
/// A collection of `ActionKey`s indexed by a unique identifier.
#[derive(Default)]
pub(crate) struct KeyRegistry {
keys: IndexedPriorityQueue<MonotonicTime, ActionKey>,
}
impl KeyRegistry {
/// Inserts an `ActionKey` into the registry.
///
/// The provided expiration deadline is the latest time at which the key may
/// still be active.
pub(crate) fn insert_key(
&mut self,
action_key: ActionKey,
expiration: MonotonicTime,
) -> KeyRegistryId {
self.keys.insert(expiration, action_key)
}
/// Inserts a non-expiring `ActionKey` into the registry.
pub(crate) fn insert_eternal_key(&mut self, action_key: ActionKey) -> KeyRegistryId {
self.keys.insert(MonotonicTime::MAX, action_key)
}
/// Removes an `ActionKey` from the registry and returns it.
///
/// Returns `None` if the key was not found in the registry.
pub(crate) fn extract_key(&mut self, key_id: KeyRegistryId) -> Option<ActionKey> {
self.keys.extract(key_id).map(|(_, key)| key)
}
/// Remove keys with an expiration deadline strictly predating the argument.
pub(crate) fn remove_expired_keys(&mut self, now: MonotonicTime) {
while let Some(expiration) = self.keys.peek_key() {
if *expiration >= now {
return;
}
self.keys.pull();
}
}
}

193
asynchronix/src/grpc/run.rs Normal file
View File

@ -0,0 +1,193 @@
//! gRPC simulation service.
use std::net::SocketAddr;
use std::sync::Mutex;
use std::sync::MutexGuard;
use serde::de::DeserializeOwned;
use tonic::{transport::Server, Request, Response, Status};
use crate::registry::EndpointRegistry;
use crate::simulation::SimInit;
use super::codegen::simulation::*;
use super::key_registry::KeyRegistry;
use super::services::InitService;
use super::services::{ControllerService, MonitorService};
/// Runs a gRPC simulation server.
///
/// The first argument is a closure that takes an initialization configuration
/// and is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
run_service(GrpcSimulationService::new(sim_gen), addr)
}
/// Monomorphization of the networking code.
///
/// Keeping this as a separate monomorphized fragment can even triple
/// compilation speed for incremental release builds.
fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the controller and monitor services can be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_io()
.build()?;
rt.block_on(async move {
Server::builder()
.add_service(simulation_server::SimulationServer::new(service))
.serve(addr)
.await?;
Ok(())
})
}
struct GrpcSimulationService {
init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>,
monitor_service: Mutex<MonitorService>,
}
impl GrpcSimulationService {
/// Creates a new `GrpcSimulationService` without any active simulation.
///
/// The argument is a closure that takes an initialization configuration and
/// is called every time the simulation is (re)started by the remote client.
/// It must create a new `SimInit` object complemented by a registry that
/// exposes the public event and query interface.
pub(crate) fn new<F, I>(sim_gen: F) -> Self
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
Self {
init_service: Mutex::new(InitService::new(sim_gen)),
controller_service: Mutex::new(ControllerService::NotStarted),
monitor_service: Mutex::new(MonitorService::NotStarted),
}
}
/// Locks the initializer and returns the mutex guard.
fn initializer(&self) -> MutexGuard<'_, InitService> {
self.init_service.lock().unwrap()
}
/// Locks the controller and returns the mutex guard.
fn controller(&self) -> MutexGuard<'_, ControllerService> {
self.controller_service.lock().unwrap()
}
/// Locks the monitor and returns the mutex guard.
fn monitor(&self) -> MutexGuard<'_, MonitorService> {
self.monitor_service.lock().unwrap()
}
}
#[tonic::async_trait]
impl simulation_server::Simulation for GrpcSimulationService {
async fn init(&self, request: Request<InitRequest>) -> Result<Response<InitReply>, Status> {
let request = request.into_inner();
let (reply, bench) = self.initializer().init(request);
if let Some((simulation, endpoint_registry)) = bench {
*self.controller() = ControllerService::Started {
simulation,
event_source_registry: endpoint_registry.event_source_registry,
query_source_registry: endpoint_registry.query_source_registry,
key_registry: KeyRegistry::default(),
};
*self.monitor() = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry,
};
}
Ok(Response::new(reply))
}
async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().time(request)))
}
async fn step(&self, request: Request<StepRequest>) -> Result<Response<StepReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().step(request)))
}
async fn step_until(
&self,
request: Request<StepUntilRequest>,
) -> Result<Response<StepUntilReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().step_until(request)))
}
async fn schedule_event(
&self,
request: Request<ScheduleEventRequest>,
) -> Result<Response<ScheduleEventReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().schedule_event(request)))
}
async fn cancel_event(
&self,
request: Request<CancelEventRequest>,
) -> Result<Response<CancelEventReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().cancel_event(request)))
}
async fn process_event(
&self,
request: Request<ProcessEventRequest>,
) -> Result<Response<ProcessEventReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().process_event(request)))
}
async fn process_query(
&self,
request: Request<ProcessQueryRequest>,
) -> Result<Response<ProcessQueryReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().process_query(request)))
}
async fn read_events(
&self,
request: Request<ReadEventsRequest>,
) -> Result<Response<ReadEventsReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.monitor().read_events(request)))
}
async fn open_sink(
&self,
request: Request<OpenSinkRequest>,
) -> Result<Response<OpenSinkReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.monitor().open_sink(request)))
}
async fn close_sink(
&self,
request: Request<CloseSinkRequest>,
) -> Result<Response<CloseSinkReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.monitor().close_sink(request)))
}
}

View File

@ -0,0 +1,96 @@
mod controller_service;
mod init_service;
mod monitor_service;
use std::time::Duration;
use prost_types::Timestamp;
use tai_time::MonotonicTime;
use super::codegen::simulation::{Error, ErrorCode};
pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
pub(crate) use monitor_service::MonitorService;
/// Transforms an error code and a message into a Protobuf error.
fn to_error(code: ErrorCode, message: impl Into<String>) -> Error {
Error {
code: code as i32,
message: message.into(),
}
}
/// An error returned when a simulation was not started.
fn simulation_not_started_error() -> Error {
to_error(
ErrorCode::SimulationNotStarted,
"the simulation was not started",
)
}
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
///
/// This will fail if the time is outside the protobuf-specified range for
/// timestamps (0001-01-01 00:00:00 to 9999-12-31 23:59:59).
pub(crate) fn monotonic_to_timestamp(monotonic_time: MonotonicTime) -> Option<Timestamp> {
// Unix timestamp for 0001-01-01 00:00:00, the minimum accepted by
// protobuf's specification for the `Timestamp` type.
const MIN_SECS: i64 = -62135596800;
// Unix timestamp for 9999-12-31 23:59:59, the maximum accepted by
// protobuf's specification for the `Timestamp` type.
const MAX_SECS: i64 = 253402300799;
let secs = monotonic_time.as_secs();
if !(MIN_SECS..=MAX_SECS).contains(&secs) {
return None;
}
Some(Timestamp {
seconds: secs,
nanos: monotonic_time.subsec_nanos() as i32,
})
}
/// Attempts a cast from a protobuf `Timestamp` to a `MonotonicTime`.
///
/// This should never fail provided that the `Timestamp` complies with the
/// protobuf specification. It can only fail if the nanosecond part is negative
/// or greater than 999'999'999.
pub(crate) fn timestamp_to_monotonic(timestamp: Timestamp) -> Option<MonotonicTime> {
let nanos: u32 = timestamp.nanos.try_into().ok()?;
MonotonicTime::new(timestamp.seconds, nanos)
}
/// Attempts a cast from a protobuf `Duration` to a `std::time::Duration`.
///
/// If the `Duration` complies with the protobuf specification, this can only
/// fail if the duration is negative.
pub(crate) fn to_positive_duration(duration: prost_types::Duration) -> Option<Duration> {
if duration.seconds < 0 || duration.nanos < 0 {
return None;
}
Some(Duration::new(
duration.seconds as u64,
duration.nanos as u32,
))
}
/// Attempts a cast from a protobuf `Duration` to a strictly positive
/// `std::time::Duration`.
///
/// If the `Duration` complies with the protobuf specification, this can only
/// fail if the duration is negative or null.
pub(crate) fn to_strictly_positive_duration(duration: prost_types::Duration) -> Option<Duration> {
if duration.seconds < 0 || duration.nanos < 0 || (duration.seconds == 0 && duration.nanos == 0)
{
return None;
}
Some(Duration::new(
duration.seconds as u64,
duration.nanos as u32,
))
}

View File

@ -0,0 +1,401 @@
use std::fmt;
use prost_types::Timestamp;
use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::{EventSourceRegistry, QuerySourceRegistry};
use crate::simulation::Simulation;
use super::super::codegen::simulation::*;
use super::{
monotonic_to_timestamp, simulation_not_started_error, timestamp_to_monotonic, to_error,
to_positive_duration, to_strictly_positive_duration,
};
/// Protobuf-based simulation manager.
///
/// A `ControllerService` enables the management of the lifecycle of a
/// simulation.
///
/// Its methods map the various RPC simulation control service methods defined
/// in `simulation.proto`.
#[allow(clippy::large_enum_variant)]
pub(crate) enum ControllerService {
NotStarted,
Started {
simulation: Simulation,
event_source_registry: EventSourceRegistry,
query_source_registry: QuerySourceRegistry,
key_registry: KeyRegistry,
},
}
impl ControllerService {
/// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply {
let reply = match self {
Self::Started { simulation, .. } => {
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
time_reply::Result::Time(timestamp)
} else {
time_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()),
};
TimeReply {
result: Some(reply),
}
}
/// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other events scheduled for the same time.
///
/// Processing is gated by a (possibly blocking) call to
/// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the
/// configured simulation clock. This method blocks until all newly
/// processed events have completed.
pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply {
let reply = match self {
Self::Started { simulation, .. } => {
simulation.step();
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
step_reply::Result::Time(timestamp)
} else {
step_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
Self::NotStarted => step_reply::Result::Error(simulation_not_started_error()),
};
StepReply {
result: Some(reply),
}
}
/// Iteratively advances the simulation time until the specified deadline,
/// as if by calling
/// [`Simulation::step()`](crate::simulation::Simulation::step) repeatedly.
///
/// This method blocks until all events scheduled up to the specified target
/// time have completed. The simulation time upon completion is equal to the
/// specified target time, whether or not an event was scheduled for that
/// time.
pub(crate) fn step_until(&mut self, request: StepUntilRequest) -> StepUntilReply {
let reply = match self {
Self::Started { simulation, .. } => move || -> Result<Timestamp, Error> {
let deadline = request.deadline.ok_or(to_error(
ErrorCode::MissingArgument,
"missing deadline argument",
))?;
match deadline {
step_until_request::Deadline::Time(time) => {
let time = timestamp_to_monotonic(time).ok_or(to_error(
ErrorCode::InvalidTime,
"out-of-range nanosecond field",
))?;
simulation.step_until(time).map_err(|_| {
to_error(
ErrorCode::InvalidTime,
"the specified deadline lies in the past",
)
})?;
}
step_until_request::Deadline::Duration(duration) => {
let duration = to_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDuration,
"the specified deadline lies in the past",
))?;
simulation.step_by(duration);
}
};
let timestamp = monotonic_to_timestamp(simulation.time()).ok_or(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))?;
Ok(timestamp)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
StepUntilReply {
result: Some(match reply {
Ok(timestamp) => step_until_reply::Result::Time(timestamp),
Err(error) => step_until_reply::Result::Error(error),
}),
}
}
/// Schedules an event at a future time.
pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply {
let reply = match self {
Self::Started {
simulation,
event_source_registry,
key_registry,
..
} => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name;
let event = &request.event;
let with_key = request.with_key;
let period = request
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or(to_error(
ErrorCode::InvalidDuration,
"the specified event period is not strictly positive",
))
})
.transpose()?;
let deadline = request.deadline.ok_or(to_error(
ErrorCode::MissingArgument,
"missing deadline argument",
))?;
let deadline = match deadline {
schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time)
.ok_or(to_error(
ErrorCode::InvalidTime,
"out-of-range nanosecond field",
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDuration,
"the specified scheduling deadline is not in the future",
))?;
simulation.time() + duration
}
};
let source = event_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no event source is registered with the name '{}'".to_string(),
))?;
let (action, action_key) = match (with_key, period) {
(false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
let key_id = action_key.map(|action_key| {
// Free stale keys from the registry.
key_registry.remove_expired_keys(simulation.time());
if period.is_some() {
key_registry.insert_eternal_key(action_key)
} else {
key_registry.insert_key(action_key, deadline)
}
});
simulation.process(action);
Ok(key_id)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
ScheduleEventReply {
result: Some(match reply {
Ok(Some(key_id)) => {
let (subkey1, subkey2) = key_id.into_raw_parts();
schedule_event_reply::Result::Key(EventKey {
subkey1: subkey1
.try_into()
.expect("action key index is too large to be serialized"),
subkey2,
})
}
Ok(None) => schedule_event_reply::Result::Empty(()),
Err(error) => schedule_event_reply::Result::Error(error),
}),
}
}
/// Cancels a keyed event.
pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply {
let reply = match self {
Self::Started {
simulation,
key_registry,
..
} => move || -> Result<(), Error> {
let key = request
.key
.ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?;
let subkey1: usize = key
.subkey1
.try_into()
.map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?;
let subkey2 = key.subkey2;
let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2);
key_registry.remove_expired_keys(simulation.time());
let key = key_registry.extract_key(key_id).ok_or(to_error(
ErrorCode::InvalidKey,
"invalid or expired event key",
))?;
key.cancel();
Ok(())
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
CancelEventReply {
result: Some(match reply {
Ok(()) => cancel_event_reply::Result::Empty(()),
Err(error) => cancel_event_reply::Result::Error(error),
}),
}
}
/// Broadcasts an event from an event source immediately, blocking until
/// completion.
///
/// Simulation time remains unchanged.
pub(crate) fn process_event(&mut self, request: ProcessEventRequest) -> ProcessEventReply {
let reply = match self {
Self::Started {
simulation,
event_source_registry,
..
} => move || -> Result<(), Error> {
let source_name = &request.source_name;
let event = &request.event;
let source = event_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;
let event = source.event(event).map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
simulation.process(event);
Ok(())
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
ProcessEventReply {
result: Some(match reply {
Ok(()) => process_event_reply::Result::Empty(()),
Err(error) => process_event_reply::Result::Error(error),
}),
}
}
/// Broadcasts a query from a query source immediately, blocking until
/// completion.
///
/// Simulation time remains unchanged.
pub(crate) fn process_query(&mut self, request: ProcessQueryRequest) -> ProcessQueryReply {
let reply = match self {
Self::Started {
simulation,
query_source_registry,
..
} => move || -> Result<Vec<Vec<u8>>, Error> {
let source_name = &request.source_name;
let request = &request.request;
let source = query_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;
let (query, mut promise) = source.query(request).map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the request could not be deserialized as type '{}': {}",
source.request_type_name(),
e
),
)
})?;
simulation.process(query);
let replies = promise.take_collect().ok_or(to_error(
ErrorCode::InternalError,
"a reply to the query was expected but none was available".to_string(),
))?;
replies.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the reply could not be serialized as type '{}': {}",
source.reply_type_name(),
e
),
)
})
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
match reply {
Ok(replies) => ProcessQueryReply {
replies,
result: Some(process_query_reply::Result::Empty(())),
},
Err(error) => ProcessQueryReply {
replies: Vec::new(),
result: Some(process_query_reply::Result::Error(error)),
},
}
}
}
impl fmt::Debug for ControllerService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ControllerService").finish_non_exhaustive()
}
}

View File

@ -0,0 +1,87 @@
use ciborium;
use serde::de::DeserializeOwned;
use crate::registry::EndpointRegistry;
use crate::simulation::SimInit;
use crate::simulation::Simulation;
use super::{timestamp_to_monotonic, to_error};
use super::super::codegen::simulation::*;
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SimGen = Box<
dyn FnMut(&[u8]) -> Result<(SimInit, EndpointRegistry), DeserializationError> + Send + 'static,
>;
/// Protobuf-based simulation initializer.
///
/// An `InitService` creates a new simulation bench based on a serialized initialization configuration.
///
/// It maps the `Init` method defined in `simulation.proto`.
pub(crate) struct InitService {
sim_gen: SimGen,
}
impl InitService {
/// Creates a new `InitService`.
///
/// The argument is a closure that takes a CBOR-serialized initialization
/// configuration and is called every time the simulation is (re)started by
/// the remote client. It must create a new `SimInit` object complemented by
/// a registry that exposes the public event and query interface.
pub(crate) fn new<F, I>(mut sim_gen: F) -> Self
where
F: FnMut(I) -> (SimInit, EndpointRegistry) + Send + 'static,
I: DeserializeOwned,
{
// Wrap `sim_gen` so it accepts a serialized init configuration.
let sim_gen = move |serialized_cfg: &[u8]| -> Result<(SimInit, EndpointRegistry), DeserializationError> {
let cfg = ciborium::from_reader(serialized_cfg)?;
Ok(sim_gen(cfg))
};
Self {
sim_gen: Box::new(sim_gen),
}
}
/// Initializes the simulation based on the specified configuration.
pub(crate) fn init(
&mut self,
request: InitRequest,
) -> (InitReply, Option<(Simulation, EndpointRegistry)>) {
let start_time = request.time.unwrap_or_default();
let reply = (self.sim_gen)(&request.cfg)
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the initialization configuration could not be deserialized: {}",
e
),
)
})
.and_then(|(sim_init, registry)| {
timestamp_to_monotonic(start_time)
.ok_or_else(|| {
to_error(ErrorCode::InvalidTime, "out-of-range nanosecond field")
})
.map(|start_time| (sim_init.init(start_time), registry))
});
let (reply, bench) = match reply {
Ok(bench) => (init_reply::Result::Empty(()), Some(bench)),
Err(e) => (init_reply::Result::Error(e), None),
};
(
InitReply {
result: Some(reply),
},
bench,
)
}
}

View File

@ -0,0 +1,121 @@
use std::fmt;
use crate::registry::EventSinkRegistry;
use super::super::codegen::simulation::*;
use super::{simulation_not_started_error, to_error};
/// Protobuf-based simulation monitor.
///
/// A `MonitorService` enables the monitoring of the event sinks of a
/// [`Simulation`](crate::simulation::Simulation).
///
/// Its methods map the various RPC monitoring service methods defined in
/// `simulation.proto`.
pub(crate) enum MonitorService {
Started {
event_sink_registry: EventSinkRegistry,
},
NotStarted,
}
impl MonitorService {
/// Read all events from an event sink.
pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply {
let reply = match self {
Self::Started {
event_sink_registry,
} => move || -> Result<Vec<Vec<u8>>, Error> {
let sink_name = &request.sink_name;
let sink = event_sink_registry.get_mut(sink_name).ok_or(to_error(
ErrorCode::SinkNotFound,
format!("no sink is registered with the name '{}'", sink_name),
))?;
sink.collect().map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be serialized from type '{}': {}",
sink.event_type_name(),
e
),
)
})
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
match reply {
Ok(events) => ReadEventsReply {
events,
result: Some(read_events_reply::Result::Empty(())),
},
Err(error) => ReadEventsReply {
events: Vec::new(),
result: Some(read_events_reply::Result::Error(error)),
},
}
}
/// Opens an event sink.
pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply {
let reply = match self {
Self::Started {
event_sink_registry,
} => {
let sink_name = &request.sink_name;
if let Some(sink) = event_sink_registry.get_mut(sink_name) {
sink.open();
open_sink_reply::Result::Empty(())
} else {
open_sink_reply::Result::Error(to_error(
ErrorCode::SinkNotFound,
format!("no sink is registered with the name '{}'", sink_name),
))
}
}
Self::NotStarted => open_sink_reply::Result::Error(simulation_not_started_error()),
};
OpenSinkReply {
result: Some(reply),
}
}
/// Closes an event sink.
pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply {
let reply = match self {
Self::Started {
event_sink_registry,
} => {
let sink_name = &request.sink_name;
if let Some(sink) = event_sink_registry.get_mut(sink_name) {
sink.close();
close_sink_reply::Result::Empty(())
} else {
close_sink_reply::Result::Error(to_error(
ErrorCode::SinkNotFound,
format!("no sink is registered with the name '{}'", sink_name),
))
}
}
Self::NotStarted => close_sink_reply::Result::Error(simulation_not_started_error()),
};
CloseSinkReply {
result: Some(reply),
}
}
}
impl fmt::Debug for MonitorService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SimulationService").finish_non_exhaustive()
}
}