1
0
forked from ROMEO/nexosim
This commit is contained in:
Serge Barral
2024-06-11 11:27:16 +02:00
parent a7e691c002
commit f731d40add
9 changed files with 859 additions and 218 deletions

View File

@ -1,13 +1,17 @@
//! Simulation management through remote procedure calls. //! Simulation management through remote procedure calls.
mod codegen; mod codegen;
mod endpoint_registry;
#[cfg(feature = "grpc-service")] #[cfg(feature = "grpc-service")]
pub mod grpc; pub mod grpc;
mod key_registry; mod key_registry;
mod monitoring_service;
mod simulation_service; mod simulation_service;
mod sink_registry;
mod source_registry;
#[cfg(feature = "wasm-service")] #[cfg(feature = "wasm-service")]
pub mod wasm; pub mod wasm;
pub use endpoint_registry::EndpointRegistry; pub use monitoring_service::MonitoringService;
pub use simulation_service::SimulationService; pub use simulation_service::SimulationService;
pub use sink_registry::SinkRegistry;
pub use source_registry::SourceRegistry;

View File

@ -1,3 +1,6 @@
mod management_service;
mod monitoring_service;
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
@ -8,10 +11,14 @@ use prost_types::Timestamp;
use tai_time::MonotonicTime; use tai_time::MonotonicTime;
use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId}; use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::rpc::EndpointRegistry; use crate::rpc::{SinkRegistry, SourceRegistry};
use crate::simulation::{SimInit, Simulation}; use crate::simulation::{SimInit, Simulation};
use super::codegen::simulation::*; use super::codegen::simulation::*;
use super::sink_registry;
use management_service::ManagementService;
use monitoring_service::MonitoringService;
/// Protobuf-based simulation manager. /// Protobuf-based simulation manager.
/// ///
@ -23,8 +30,8 @@ use super::codegen::simulation::*;
/// Its methods map the various RPC service methods defined in /// Its methods map the various RPC service methods defined in
/// `simulation.proto`. /// `simulation.proto`.
pub struct SimulationService { pub struct SimulationService {
sim_gen: Box<dyn FnMut() -> (SimInit, EndpointRegistry) + Send + 'static>, sim_gen: Box<dyn FnMut() -> (SimInit, SourceRegistry, SinkRegistry) + Send + 'static>,
sim_context: Option<(Simulation, EndpointRegistry, KeyRegistry)>, services: Option<(ManagementService, MonitoringService)>,
} }
impl SimulationService { impl SimulationService {
@ -36,16 +43,16 @@ impl SimulationService {
/// interface. /// interface.
pub fn new<F>(sim_gen: F) -> Self pub fn new<F>(sim_gen: F) -> Self
where where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, F: FnMut() -> (SimInit, SourceRegistry, SinkRegistry) + Send + 'static,
{ {
Self { Self {
sim_gen: Box::new(sim_gen), sim_gen: Box::new(sim_gen),
sim_context: None, services: None,
} }
} }
/// Processes an encoded `AnyRequest` message and returns an encoded reply. /// Processes an encoded `AnyRequest` message and returns an encoded reply.
pub fn process_request<B>(&mut self, request_buf: B) -> Result<Vec<u8>, InvalidRequest> /*pub fn process_request<B>(&mut self, request_buf: B) -> Result<Vec<u8>, InvalidRequest>
where where
B: Buf, B: Buf,
{ {
@ -75,15 +82,6 @@ impl SimulationService {
any_request::Request::ProcessQueryRequest(request) => { any_request::Request::ProcessQueryRequest(request) => {
Ok(self.process_query(request).encode_to_vec()) Ok(self.process_query(request).encode_to_vec())
} }
any_request::Request::ReadEventsRequest(request) => {
Ok(self.read_events(request).encode_to_vec())
}
any_request::Request::OpenSinkRequest(request) => {
Ok(self.open_sink(request).encode_to_vec())
}
any_request::Request::CloseSinkRequest(request) => {
Ok(self.close_sink(request).encode_to_vec())
}
}, },
Ok(AnyRequest { request: None }) => Err(InvalidRequest { Ok(AnyRequest { request: None }) => Err(InvalidRequest {
description: "the message did not contain any request".to_string(), description: "the message did not contain any request".to_string(),
@ -92,7 +90,7 @@ impl SimulationService {
description: format!("bad request: {}", err), description: format!("bad request: {}", err),
}), }),
} }
} }*/
/// Initialize a simulation with the provided time. /// Initialize a simulation with the provided time.
/// ///
@ -104,9 +102,12 @@ impl SimulationService {
pub(crate) fn init(&mut self, request: InitRequest) -> InitReply { pub(crate) fn init(&mut self, request: InitRequest) -> InitReply {
let start_time = request.time.unwrap_or_default(); let start_time = request.time.unwrap_or_default();
let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) { let reply = if let Some(start_time) = timestamp_to_monotonic(start_time) {
let (sim_init, endpoint_registry) = (self.sim_gen)(); let (sim_init, source_registry, sink_registry) = (self.sim_gen)();
let simulation = sim_init.init(start_time); let simulation = sim_init.init(start_time);
self.sim_context = Some((simulation, endpoint_registry, KeyRegistry::default())); self.services = Some((
ManagementService::new(simulation, source_registry, KeyRegistry::default()),
MonitoringService::new(sink_registry),
));
init_reply::Result::Empty(()) init_reply::Result::Empty(())
} else { } else {
@ -122,31 +123,18 @@ impl SimulationService {
} }
/// Returns the current simulation time. /// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply { pub(crate) fn time(&mut self, request: TimeRequest) -> TimeReply {
let reply = match &self.sim_context { if let Some((management_service, ..)) = &mut self.services {
Some((simulation, ..)) => { management_service.time(request)
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
time_reply::Result::Time(timestamp)
} else { } else {
time_reply::Result::Error(Error {
code: ErrorCode::SimulationTimeOutOfRange as i32,
message: "the final simulation time is out of range".to_string(),
})
}
}
None => time_reply::Result::Error(Error {
code: ErrorCode::SimulationNotStarted as i32,
message: "the simulation was not started".to_string(),
}),
};
TimeReply { TimeReply {
result: Some(reply), result: Some(time_reply::Result::Error(simulation_not_started_error())),
}
} }
} }
/// Advances simulation time to that of the next scheduled event, processing /// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other event scheduled for the same time. /// that event as well as all other events scheduled for the same time.
/// ///
/// Processing is gated by a (possibly blocking) call to /// Processing is gated by a (possibly blocking) call to
/// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the
@ -500,113 +488,6 @@ impl SimulationService {
}, },
} }
} }
/// Read all events from an event sink.
pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply {
let reply = move || -> Result<Vec<Vec<u8>>, (ErrorCode, String)> {
let sink_name = &request.sink_name;
let (_, registry, _) = self.sim_context.as_mut().ok_or((
ErrorCode::SimulationNotStarted,
"the simulation was not started".to_string(),
))?;
let sink = registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
sink.collect().map_err(|_| {
(
ErrorCode::InvalidMessage,
format!(
"the event could not be serialized from type '{}'",
sink.event_type_name()
),
)
})
}();
match reply {
Ok(events) => ReadEventsReply {
events,
result: Some(read_events_reply::Result::Empty(())),
},
Err((code, message)) => ReadEventsReply {
events: Vec::new(),
result: Some(read_events_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
/// Opens an event sink.
pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply {
let reply = move || -> Result<(), (ErrorCode, String)> {
let sink_name = &request.sink_name;
let (_, registry, _) = self.sim_context.as_mut().ok_or((
ErrorCode::SimulationNotStarted,
"the simulation was not started".to_string(),
))?;
let sink = registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
sink.open();
Ok(())
}();
match reply {
Ok(()) => OpenSinkReply {
result: Some(open_sink_reply::Result::Empty(())),
},
Err((code, message)) => OpenSinkReply {
result: Some(open_sink_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
/// Closes an event sink.
pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply {
let reply = move || -> Result<(), (ErrorCode, String)> {
let sink_name = &request.sink_name;
let (_, registry, _) = self.sim_context.as_mut().ok_or((
ErrorCode::SimulationNotStarted,
"the simulation was not started".to_string(),
))?;
let sink = registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
sink.close();
Ok(())
}();
match reply {
Ok(()) => CloseSinkReply {
result: Some(close_sink_reply::Result::Empty(())),
},
Err((code, message)) => CloseSinkReply {
result: Some(close_sink_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
} }
impl fmt::Debug for SimulationService { impl fmt::Debug for SimulationService {
@ -628,6 +509,14 @@ impl fmt::Display for InvalidRequest {
impl error::Error for InvalidRequest {} impl error::Error for InvalidRequest {}
/// An error returned when a simulation was not started.
fn simulation_not_started_error() -> Error {
Error {
code: ErrorCode::SimulationNotStarted as i32,
message: "the simulation was not started".to_string(),
}
}
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`. /// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
/// ///
/// This will fail if the time is outside the protobuf-specified range for /// This will fail if the time is outside the protobuf-specified range for

View File

@ -0,0 +1,451 @@
use std::fmt;
use std::time::Duration;
use prost_types::Timestamp;
use tai_time::MonotonicTime;
use crate::rpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::rpc::SourceRegistry;
use crate::simulation::Simulation;
use super::super::codegen::simulation::*;
/// Protobuf-based simulation manager.
///
/// A `ManagementService` enables the management of the lifecycle of a
/// simulation.
///
/// Its methods map the various RPC management service methods defined in
/// `simulation.proto`.
pub(crate) struct ManagementService {
simulation: Simulation,
source_registry: SourceRegistry,
key_registry: KeyRegistry,
}
impl ManagementService {
/// Creates a new `ManagementService`.
pub(crate) fn new(
simulation: Simulation,
source_registry: SourceRegistry,
key_registry: KeyRegistry,
) -> Self {
Self {
simulation,
source_registry,
key_registry,
}
}
/// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply {
let reply = if let Some(timestamp) = monotonic_to_timestamp(self.simulation.time()) {
time_reply::Result::Time(timestamp)
} else {
time_reply::Result::Error(Error {
code: ErrorCode::SimulationTimeOutOfRange as i32,
message: "the final simulation time is out of range".to_string(),
})
};
TimeReply {
result: Some(reply),
}
}
/// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other event scheduled for the same time.
///
/// Processing is gated by a (possibly blocking) call to
/// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the
/// configured simulation clock. This method blocks until all newly
/// processed events have completed.
pub(crate) fn step(&mut self, _request: StepRequest) -> StepReply {
self.simulation.step();
let reply = if let Some(timestamp) = monotonic_to_timestamp(self.simulation.time()) {
step_reply::Result::Time(timestamp)
} else {
step_reply::Result::Error(Error {
code: ErrorCode::SimulationTimeOutOfRange as i32,
message: "the final simulation time is out of range".to_string(),
})
};
StepReply {
result: Some(reply),
}
}
/// Iteratively advances the simulation time until the specified deadline,
/// as if by calling
/// [`Simulation::step()`](crate::simulation::Simulation::step) repeatedly.
///
/// This method blocks until all events scheduled up to the specified target
/// time have completed. The simulation time upon completion is equal to the
/// specified target time, whether or not an event was scheduled for that
/// time.
pub(crate) fn step_until(&mut self, request: StepUntilRequest) -> StepUntilReply {
let reply = move || -> Result<Timestamp, (ErrorCode, &str)> {
let deadline = request
.deadline
.ok_or((ErrorCode::MissingArgument, "missing deadline argument"))?;
match deadline {
step_until_request::Deadline::Time(time) => {
let time = timestamp_to_monotonic(time)
.ok_or((ErrorCode::InvalidTime, "out-of-range nanosecond field"))?;
self.simulation.step_until(time).map_err(|_| {
(
ErrorCode::InvalidTime,
"the specified deadline lies in the past",
)
})?;
}
step_until_request::Deadline::Duration(duration) => {
let duration = to_positive_duration(duration).ok_or((
ErrorCode::InvalidDuration,
"the specified deadline lies in the past",
))?;
self.simulation.step_by(duration);
}
};
let timestamp = monotonic_to_timestamp(self.simulation.time()).ok_or((
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))?;
Ok(timestamp)
}();
StepUntilReply {
result: Some(match reply {
Ok(timestamp) => step_until_reply::Result::Time(timestamp),
Err((code, message)) => step_until_reply::Result::Error(Error {
code: code as i32,
message: message.to_string(),
}),
}),
}
}
/// Schedules an event at a future time.
pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply {
let reply = move || -> Result<Option<KeyRegistryId>, (ErrorCode, String)> {
let source_name = &request.source_name;
let msgpack_event = &request.event;
let with_key = request.with_key;
let period = request
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or((
ErrorCode::InvalidDuration,
"the specified event period is not strictly positive".to_string(),
))
})
.transpose()?;
let deadline = request.deadline.ok_or((
ErrorCode::MissingArgument,
"missing deadline argument".to_string(),
))?;
let deadline = match deadline {
schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time)
.ok_or((
ErrorCode::InvalidTime,
"out-of-range nanosecond field".to_string(),
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or((
ErrorCode::InvalidDuration,
"the specified scheduling deadline is not in the future".to_string(),
))?;
self.simulation.time() + duration
}
};
let source = self
.source_registry
.get_event_source_mut(source_name)
.ok_or((
ErrorCode::SourceNotFound,
"no event source is registered with the name '{}'".to_string(),
))?;
let (action, action_key) = match (with_key, period) {
(false, None) => source.event(msgpack_event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, msgpack_event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(msgpack_event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, msgpack_event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|_| {
(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}'",
source.event_type_name()
),
)
})?;
let key_id = action_key.map(|action_key| {
// Free stale keys from the registry.
self.key_registry
.remove_expired_keys(self.simulation.time());
if period.is_some() {
self.key_registry.insert_eternal_key(action_key)
} else {
self.key_registry.insert_key(action_key, deadline)
}
});
self.simulation.process(action);
Ok(key_id)
}();
ScheduleEventReply {
result: Some(match reply {
Ok(Some(key_id)) => {
let (subkey1, subkey2) = key_id.into_raw_parts();
schedule_event_reply::Result::Key(EventKey {
subkey1: subkey1
.try_into()
.expect("action key index is too large to be serialized"),
subkey2,
})
}
Ok(None) => schedule_event_reply::Result::Empty(()),
Err((code, message)) => schedule_event_reply::Result::Error(Error {
code: code as i32,
message,
}),
}),
}
}
/// Cancels a keyed event.
pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply {
let reply = move || -> Result<(), (ErrorCode, String)> {
let key = request.key.ok_or((
ErrorCode::MissingArgument,
"missing key argument".to_string(),
))?;
let subkey1: usize = key
.subkey1
.try_into()
.map_err(|_| (ErrorCode::InvalidKey, "invalid event key".to_string()))?;
let subkey2 = key.subkey2;
let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2);
self.key_registry
.remove_expired_keys(self.simulation.time());
let key = self.key_registry.extract_key(key_id).ok_or((
ErrorCode::InvalidKey,
"invalid or expired event key".to_string(),
))?;
key.cancel();
Ok(())
}();
CancelEventReply {
result: Some(match reply {
Ok(()) => cancel_event_reply::Result::Empty(()),
Err((code, message)) => cancel_event_reply::Result::Error(Error {
code: code as i32,
message,
}),
}),
}
}
/// Broadcasts an event from an event source immediately, blocking until
/// completion.
///
/// Simulation time remains unchanged.
pub(crate) fn process_event(&mut self, request: ProcessEventRequest) -> ProcessEventReply {
let reply = move || -> Result<(), (ErrorCode, String)> {
let source_name = &request.source_name;
let msgpack_event = &request.event;
let source = self
.source_registry
.get_event_source_mut(source_name)
.ok_or((
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;
let event = source.event(msgpack_event).map_err(|_| {
(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}'",
source.event_type_name()
),
)
})?;
self.simulation.process(event);
Ok(())
}();
ProcessEventReply {
result: Some(match reply {
Ok(()) => process_event_reply::Result::Empty(()),
Err((code, message)) => process_event_reply::Result::Error(Error {
code: code as i32,
message,
}),
}),
}
}
/// Broadcasts an event from an event source immediately, blocking until
/// completion.
///
/// Simulation time remains unchanged.
pub(crate) fn process_query(&mut self, request: ProcessQueryRequest) -> ProcessQueryReply {
let reply = move || -> Result<Vec<Vec<u8>>, (ErrorCode, String)> {
let source_name = &request.source_name;
let msgpack_request = &request.request;
let source = self
.source_registry
.get_query_source_mut(source_name)
.ok_or((
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;
let (query, mut promise) = source.query(msgpack_request).map_err(|_| {
(
ErrorCode::InvalidMessage,
format!(
"the request could not be deserialized as type '{}'",
source.request_type_name()
),
)
})?;
self.simulation.process(query);
let replies = promise.take_collect().ok_or((
ErrorCode::InternalError,
"a reply to the query was expected but none was available".to_string(),
))?;
replies.map_err(|_| {
(
ErrorCode::InvalidMessage,
format!(
"the reply could not be serialized as type '{}'",
source.reply_type_name()
),
)
})
}();
match reply {
Ok(replies) => ProcessQueryReply {
replies,
result: Some(process_query_reply::Result::Empty(())),
},
Err((code, message)) => ProcessQueryReply {
replies: Vec::new(),
result: Some(process_query_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
}
impl fmt::Debug for ManagementService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ManagementService").finish_non_exhaustive()
}
}
/// Attempts a cast from a `MonotonicTime` to a protobuf `Timestamp`.
///
/// This will fail if the time is outside the protobuf-specified range for
/// timestamps (0001-01-01 00:00:00 to 9999-12-31 23:59:59).
fn monotonic_to_timestamp(monotonic_time: MonotonicTime) -> Option<Timestamp> {
// Unix timestamp for 0001-01-01 00:00:00, the minimum accepted by
// protobuf's specification for the `Timestamp` type.
const MIN_SECS: i64 = -62135596800;
// Unix timestamp for 9999-12-31 23:59:59, the maximum accepted by
// protobuf's specification for the `Timestamp` type.
const MAX_SECS: i64 = 253402300799;
let secs = monotonic_time.as_secs();
if !(MIN_SECS..=MAX_SECS).contains(&secs) {
return None;
}
Some(Timestamp {
seconds: secs,
nanos: monotonic_time.subsec_nanos() as i32,
})
}
/// Attempts a cast from a protobuf `Timestamp` to a `MonotonicTime`.
///
/// This should never fail provided that the `Timestamp` complies with the
/// protobuf specification. It can only fail if the nanosecond part is negative
/// or greater than 999'999'999.
fn timestamp_to_monotonic(timestamp: Timestamp) -> Option<MonotonicTime> {
let nanos: u32 = timestamp.nanos.try_into().ok()?;
MonotonicTime::new(timestamp.seconds, nanos)
}
/// Attempts a cast from a protobuf `Duration` to a `std::time::Duration`.
///
/// If the `Duration` complies with the protobuf specification, this can only
/// fail if the duration is negative.
fn to_positive_duration(duration: prost_types::Duration) -> Option<Duration> {
if duration.seconds < 0 || duration.nanos < 0 {
return None;
}
Some(Duration::new(
duration.seconds as u64,
duration.nanos as u32,
))
}
/// Attempts a cast from a protobuf `Duration` to a strictly positive
/// `std::time::Duration`.
///
/// If the `Duration` complies with the protobuf specification, this can only
/// fail if the duration is negative or null.
fn to_strictly_positive_duration(duration: prost_types::Duration) -> Option<Duration> {
if duration.seconds < 0 || duration.nanos < 0 || (duration.seconds == 0 && duration.nanos == 0)
{
return None;
}
Some(Duration::new(
duration.seconds as u64,
duration.nanos as u32,
))
}

View File

@ -0,0 +1,122 @@
use std::error;
use std::fmt;
use crate::rpc::SinkRegistry;
use super::super::codegen::simulation::*;
/// Protobuf-based simulation monitor.
///
/// A `MonitoringService` enables the monitoring of the event sinks of a
/// [`Simulation`](crate::simulation::Simulation).
///
/// Its methods map the various RPC monitoring service methods defined in
/// `simulation.proto`.
pub(crate) struct MonitoringService {
sink_registry: SinkRegistry,
}
impl MonitoringService {
/// Creates a new `MonitoringService`.
pub(crate) fn new(sink_registry: SinkRegistry) -> Self {
Self { sink_registry }
}
/// Read all events from an event sink.
pub(crate) fn read_events(&mut self, request: ReadEventsRequest) -> ReadEventsReply {
let reply = move || -> Result<Vec<Vec<u8>>, (ErrorCode, String)> {
let sink_name = &request.sink_name;
let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
sink.collect().map_err(|_| {
(
ErrorCode::InvalidMessage,
format!(
"the event could not be serialized from type '{}'",
sink.event_type_name()
),
)
})
}();
match reply {
Ok(events) => ReadEventsReply {
events,
result: Some(read_events_reply::Result::Empty(())),
},
Err((code, message)) => ReadEventsReply {
events: Vec::new(),
result: Some(read_events_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
/// Opens an event sink.
pub(crate) fn open_sink(&mut self, request: OpenSinkRequest) -> OpenSinkReply {
let reply = move || -> Result<(), (ErrorCode, String)> {
let sink_name = &request.sink_name;
let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
sink.open();
Ok(())
}();
match reply {
Ok(()) => OpenSinkReply {
result: Some(open_sink_reply::Result::Empty(())),
},
Err((code, message)) => OpenSinkReply {
result: Some(open_sink_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
/// Closes an event sink.
pub(crate) fn close_sink(&mut self, request: CloseSinkRequest) -> CloseSinkReply {
let reply = move || -> Result<(), (ErrorCode, String)> {
let sink_name = &request.sink_name;
let sink = self.sink_registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
sink.close();
Ok(())
}();
match reply {
Ok(()) => CloseSinkReply {
result: Some(close_sink_reply::Result::Empty(())),
},
Err((code, message)) => CloseSinkReply {
result: Some(close_sink_reply::Result::Error(Error {
code: code as i32,
message,
})),
},
}
}
}
impl fmt::Debug for MonitoringService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SimulationService").finish_non_exhaustive()
}
}

View File

@ -14,75 +14,16 @@ use crate::simulation::{Action, ActionKey};
/// A registry that holds all sources and sinks meant to be accessed through /// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls. /// remote procedure calls.
#[derive(Default)] #[derive(Default)]
pub struct EndpointRegistry { pub struct SinkRegistry {
event_sources: HashMap<String, Box<dyn EventSourceAny>>,
query_sources: HashMap<String, Box<dyn QuerySourceAny>>,
sinks: HashMap<String, Box<dyn EventSinkStreamAny>>, sinks: HashMap<String, Box<dyn EventSinkStreamAny>>,
} }
impl EndpointRegistry { impl SinkRegistry {
/// Creates an empty `EndpointRegistry`. /// Creates an empty `EndpointRegistry`.
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
/// Adds an event source to the registry.
///
/// If the specified name is already in use for another event source, the source
/// provided as argument is returned in the error.
pub fn add_event_source<T>(
&mut self,
source: EventSource<T>,
name: impl Into<String>,
) -> Result<(), EventSource<T>>
where
T: DeserializeOwned + Clone + Send + 'static,
{
match self.event_sources.entry(name.into()) {
Entry::Vacant(s) => {
s.insert(Box::new(source));
Ok(())
}
Entry::Occupied(_) => Err(source),
}
}
/// Returns a mutable reference to the specified event source if it is in
/// the registry.
pub(crate) fn get_event_source_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> {
self.event_sources.get_mut(name).map(|s| s.as_mut())
}
/// Adds a query source to the registry.
///
/// If the specified name is already in use for another query source, the
/// source provided as argument is returned in the error.
pub fn add_query_source<T, R>(
&mut self,
source: QuerySource<T, R>,
name: impl Into<String>,
) -> Result<(), QuerySource<T, R>>
where
T: DeserializeOwned + Clone + Send + 'static,
R: Serialize + Send + 'static,
{
match self.query_sources.entry(name.into()) {
Entry::Vacant(s) => {
s.insert(Box::new(source));
Ok(())
}
Entry::Occupied(_) => Err(source),
}
}
/// Returns a mutable reference to the specified query source if it is in
/// the registry.
pub(crate) fn get_query_source_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> {
self.query_sources.get_mut(name).map(|s| s.as_mut())
}
/// Adds a sink to the registry. /// Adds a sink to the registry.
/// ///
/// If the specified name is already in use for another sink, the sink /// If the specified name is already in use for another sink, the sink
@ -109,14 +50,9 @@ impl EndpointRegistry {
} }
} }
impl fmt::Debug for EndpointRegistry { impl fmt::Debug for SinkRegistry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!( write!(f, "SinkRegistry ({} sinks)", self.sinks.len())
f,
"EndpointRegistry ({} sources, {} sinks)",
self.event_sources.len(),
self.sinks.len()
)
} }
} }

View File

@ -0,0 +1,237 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use rmp_serde::decode::Error as RmpDecodeError;
use rmp_serde::encode::Error as RmpEncodeError;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::ports::{EventSource, QuerySource, ReplyReceiver};
use crate::simulation::{Action, ActionKey};
/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
pub struct SourceRegistry {
event_sources: HashMap<String, Box<dyn EventSourceAny>>,
query_sources: HashMap<String, Box<dyn QuerySourceAny>>,
}
impl SourceRegistry {
/// Creates an empty `EndpointRegistry`.
pub fn new() -> Self {
Self::default()
}
/// Adds an event source to the registry.
///
/// If the specified name is already in use for another event source, the source
/// provided as argument is returned in the error.
pub fn add_event_source<T>(
&mut self,
source: EventSource<T>,
name: impl Into<String>,
) -> Result<(), EventSource<T>>
where
T: DeserializeOwned + Clone + Send + 'static,
{
match self.event_sources.entry(name.into()) {
Entry::Vacant(s) => {
s.insert(Box::new(source));
Ok(())
}
Entry::Occupied(_) => Err(source),
}
}
/// Returns a mutable reference to the specified event source if it is in
/// the registry.
pub(crate) fn get_event_source_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> {
self.event_sources.get_mut(name).map(|s| s.as_mut())
}
/// Adds a query source to the registry.
///
/// If the specified name is already in use for another query source, the
/// source provided as argument is returned in the error.
pub fn add_query_source<T, R>(
&mut self,
source: QuerySource<T, R>,
name: impl Into<String>,
) -> Result<(), QuerySource<T, R>>
where
T: DeserializeOwned + Clone + Send + 'static,
R: Serialize + Send + 'static,
{
match self.query_sources.entry(name.into()) {
Entry::Vacant(s) => {
s.insert(Box::new(source));
Ok(())
}
Entry::Occupied(_) => Err(source),
}
}
/// Returns a mutable reference to the specified query source if it is in
/// the registry.
pub(crate) fn get_query_source_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> {
self.query_sources.get_mut(name).map(|s| s.as_mut())
}
}
impl fmt::Debug for SourceRegistry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"SourceRegistry ({} event sources, {} query sources)",
self.event_sources.len(),
self.query_sources.len()
)
}
}
/// A type-erased `EventSource` that operates on MessagePack-encoded serialized
/// events.
pub(crate) trait EventSourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts an event to all
/// connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError>;
/// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>;
/// Returns a periodically recurring action which, when processed,
/// broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<Action, RmpDecodeError>;
/// Returns a cancellable, periodically recurring action and a cancellation
/// key; when processed, the action broadcasts an event to all connected
/// input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn keyed_periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError>;
/// Human-readable name of the event type, as returned by
/// `any::type_name()`.
fn event_type_name(&self) -> &'static str;
}
impl<T> EventSourceAny for EventSource<T>
where
T: DeserializeOwned + Clone + Send + 'static,
{
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg))
}
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg))
}
fn periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<Action, RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg))
}
fn keyed_periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg))
}
fn event_type_name(&self) -> &'static str {
std::any::type_name::<T>()
}
}
/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized
/// queries and returns MessagePack-encoded replies.
pub(crate) trait QuerySourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports.
///
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn query(
&mut self,
msgpack_arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError>;
/// Human-readable name of the request type, as returned by
/// `any::type_name()`.
fn request_type_name(&self) -> &'static str;
/// Human-readable name of the reply type, as returned by
/// `any::type_name()`.
fn reply_type_name(&self) -> &'static str;
}
impl<T, R> QuerySourceAny for QuerySource<T, R>
where
T: DeserializeOwned + Clone + Send + 'static,
R: Serialize + Send + 'static,
{
fn query(
&mut self,
msgpack_arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| {
let (action, reply_recv) = self.query(arg);
let reply_recv: Box<dyn ReplyReceiverAny> = Box::new(reply_recv);
(action, reply_recv)
})
}
fn request_type_name(&self) -> &'static str {
std::any::type_name::<T>()
}
fn reply_type_name(&self) -> &'static str {
std::any::type_name::<R>()
}
}
/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies..
pub(crate) trait ReplyReceiverAny {
/// Take the replies, if any, encode them and collect them in a vector.
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>>;
}
impl<R: Serialize + 'static> ReplyReceiverAny for ReplyReceiver<R> {
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>> {
let replies = self.take()?;
let encoded_replies = (move || {
let mut encoded_replies = Vec::new();
for reply in replies {
let encoded_reply = rmp_serde::to_vec_named(&reply)?;
encoded_replies.push(encoded_reply);
}
Ok(encoded_replies)
})();
Some(encoded_replies)
}
}

View File

@ -20,7 +20,7 @@
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use super::{EndpointRegistry, SimulationService}; use super::{SimulationService, SinkRegistry};
use crate::simulation::SimInit; use crate::simulation::SimInit;
/// A simulation service that can be used from JavaScript. /// A simulation service that can be used from JavaScript.
@ -75,7 +75,7 @@ impl WasmSimulationService {
/// interface. /// interface.
pub fn new<F>(sim_gen: F) -> Self pub fn new<F>(sim_gen: F) -> Self
where where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, F: FnMut() -> (SimInit, SinkRegistry) + Send + 'static,
{ {
Self(SimulationService::new(sim_gen)) Self(SimulationService::new(sim_gen))
} }

View File

@ -218,7 +218,7 @@ impl Simulation {
} }
/// Advances simulation time to that of the next scheduled event, processing /// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other event scheduled for the same time. /// that event as well as all other events scheduled for the same time.
/// ///
/// Processing is gated by a (possibly blocking) call to /// Processing is gated by a (possibly blocking) call to
/// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured /// [`Clock::synchronize()`](crate::time::Clock::synchronize) on the configured

View File

@ -1,3 +1,5 @@
//! Cached read-write lock.
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use crate::loom_exports::sync::atomic::{AtomicUsize, Ordering}; use crate::loom_exports::sync::atomic::{AtomicUsize, Ordering};