1
0
forked from ROMEO/nexosim

Merge pull request #73 from asynchronics/feature/grpc_rework

Feature/grpc rework
This commit is contained in:
Jauhien Piatlicki 2025-01-12 22:22:22 +01:00 committed by GitHub
commit 2e0653e1e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 635 additions and 459 deletions

View File

@ -20,11 +20,27 @@ description = """
A high performance asychronous compute framework for system simulation. A high performance asychronous compute framework for system simulation.
""" """
categories = ["simulation", "aerospace", "science"] categories = ["simulation", "aerospace", "science"]
keywords = ["simulation", "discrete-event", "systems", "cyberphysical", "real-time"] keywords = [
"simulation",
"discrete-event",
"systems",
"cyberphysical",
"real-time",
]
[features] [features]
# gRPC service. # gRPC service.
grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"] grpc = [
"dep:bytes",
"dep:ciborium",
"dep:prost",
"dep:prost-types",
"dep:serde",
"dep:tonic",
"dep:tokio",
"dep:tonic",
"tai-time/serde",
]
tracing = ["dep:tracing", "dep:tracing-subscriber"] tracing = ["dep:tracing", "dep:tracing-subscriber"]
# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking. # DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking.
@ -54,15 +70,24 @@ ciborium = { version = "0.2.2", optional = true }
prost = { version = "0.13", optional = true } prost = { version = "0.13", optional = true }
prost-types = { version = "0.13", optional = true } prost-types = { version = "0.13", optional = true }
serde = { version = "1", optional = true } serde = { version = "1", optional = true }
tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true } tokio = { version = "1.0", features = [
tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true } "net",
tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true } "rt-multi-thread",
tracing-subscriber = { version= "0.3.18", optional = true } ], optional = true }
tonic = { version = "0.12", default-features = false, features = [
"codegen",
"prost",
"server",
], optional = true }
tracing = { version = "0.1.40", default-features = false, features = [
"std",
], optional = true }
tracing-subscriber = { version = "0.3.18", optional = true }
[dev-dependencies] [dev-dependencies]
futures-util = "0.3" futures-util = "0.3"
futures-executor = "0.3" futures-executor = "0.3"
tracing-subscriber = { version= "0.3.18", features=["env-filter"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
[target.'cfg(nexosim_loom)'.dev-dependencies] [target.'cfg(nexosim_loom)'.dev-dependencies]
loom = "0.7" loom = "0.7"
@ -74,7 +99,10 @@ tonic-build = { version = "0.12" }
[lints.rust] [lints.rust]
# `nexosim_loom` flag: run loom-based tests. # `nexosim_loom` flag: run loom-based tests.
# `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions. # `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions.
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(nexosim_loom)', 'cfg(nexosim_grpc_codegen)'] } unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(nexosim_loom)',
'cfg(nexosim_grpc_codegen)',
] }
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true

View File

@ -1,7 +1,7 @@
// The main simulation protocol. // The main simulation protocol.
syntax = "proto3"; syntax = "proto3";
package simulation; package simulation.v1;
import "google/protobuf/duration.proto"; import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
@ -9,24 +9,25 @@ import "google/protobuf/empty.proto";
enum ErrorCode { enum ErrorCode {
INTERNAL_ERROR = 0; INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1; MISSING_ARGUMENT = 1;
SIMULATION_TERMINATED = 2; INVALID_TIME = 2;
SIMULATION_DEADLOCK = 3; INVALID_PERIOD = 3;
SIMULATION_MESSAGE_LOSS = 4; INVALID_DEADLINE = 4;
SIMULATION_NO_RECIPIENT = 5; INVALID_MESSAGE = 5;
SIMULATION_PANIC = 6; INVALID_KEY = 6;
SIMULATION_TIMEOUT = 7; INITIALIZER_PANIC = 10;
SIMULATION_OUT_OF_SYNC = 8; SIMULATION_NOT_STARTED = 11;
SIMULATION_BAD_QUERY = 9; SIMULATION_TERMINATED = 12;
SIMULATION_TIME_OUT_OF_RANGE = 10; SIMULATION_DEADLOCK = 13;
MISSING_ARGUMENT = 20; SIMULATION_MESSAGE_LOSS = 14;
INVALID_TIME = 30; SIMULATION_NO_RECIPIENT = 15;
INVALID_PERIOD = 31; SIMULATION_PANIC = 16;
INVALID_DEADLINE = 32; SIMULATION_TIMEOUT = 17;
INVALID_MESSAGE = 33; SIMULATION_OUT_OF_SYNC = 18;
INVALID_KEY = 34; SIMULATION_BAD_QUERY = 19;
SOURCE_NOT_FOUND = 40; SIMULATION_TIME_OUT_OF_RANGE = 20;
SINK_NOT_FOUND = 41; SOURCE_NOT_FOUND = 30;
SINK_NOT_FOUND = 31;
} }
message Error { message Error {
@ -39,9 +40,7 @@ message EventKey {
uint64 subkey2 = 2; uint64 subkey2 = 2;
} }
message InitRequest { message InitRequest { bytes cfg = 2; }
bytes cfg = 2;
}
message InitReply { message InitReply {
oneof result { // Always returns exactly 1 variant. oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1; google.protobuf.Empty empty = 1;

View File

@ -3,4 +3,5 @@
#![allow(missing_docs)] #![allow(missing_docs)]
#[rustfmt::skip] #[rustfmt::skip]
#[path = "codegen/simulation.v1.rs"]
pub(crate) mod simulation; pub(crate) mod simulation;

View File

@ -299,10 +299,7 @@ pub mod close_sink_reply {
#[derive(Clone, PartialEq, ::prost::Message)] #[derive(Clone, PartialEq, ::prost::Message)]
pub struct AnyRequest { pub struct AnyRequest {
/// Expects exactly 1 variant. /// Expects exactly 1 variant.
#[prost( #[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")]
oneof = "any_request::Request",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11"
)]
pub request: ::core::option::Option<any_request::Request>, pub request: ::core::option::Option<any_request::Request>,
} }
/// Nested message and enum types in `AnyRequest`. /// Nested message and enum types in `AnyRequest`.
@ -338,24 +335,25 @@ pub mod any_request {
#[repr(i32)] #[repr(i32)]
pub enum ErrorCode { pub enum ErrorCode {
InternalError = 0, InternalError = 0,
SimulationNotStarted = 1, MissingArgument = 1,
SimulationTerminated = 2, InvalidTime = 2,
SimulationDeadlock = 3, InvalidPeriod = 3,
SimulationMessageLoss = 4, InvalidDeadline = 4,
SimulationNoRecipient = 5, InvalidMessage = 5,
SimulationPanic = 6, InvalidKey = 6,
SimulationTimeout = 7, InitializerPanic = 10,
SimulationOutOfSync = 8, SimulationNotStarted = 11,
SimulationBadQuery = 9, SimulationTerminated = 12,
SimulationTimeOutOfRange = 10, SimulationDeadlock = 13,
MissingArgument = 20, SimulationMessageLoss = 14,
InvalidTime = 30, SimulationNoRecipient = 15,
InvalidPeriod = 31, SimulationPanic = 16,
InvalidDeadline = 32, SimulationTimeout = 17,
InvalidMessage = 33, SimulationOutOfSync = 18,
InvalidKey = 34, SimulationBadQuery = 19,
SourceNotFound = 40, SimulationTimeOutOfRange = 20,
SinkNotFound = 41, SourceNotFound = 30,
SinkNotFound = 31,
} }
impl ErrorCode { impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition. /// String value of the enum field names used in the ProtoBuf definition.
@ -365,6 +363,13 @@ impl ErrorCode {
pub fn as_str_name(&self) -> &'static str { pub fn as_str_name(&self) -> &'static str {
match self { match self {
Self::InternalError => "INTERNAL_ERROR", Self::InternalError => "INTERNAL_ERROR",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::InitializerPanic => "INITIALIZER_PANIC",
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED", Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
Self::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK", Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
@ -375,12 +380,6 @@ impl ErrorCode {
Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
Self::SimulationBadQuery => "SIMULATION_BAD_QUERY", Self::SimulationBadQuery => "SIMULATION_BAD_QUERY",
Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::SourceNotFound => "SOURCE_NOT_FOUND", Self::SourceNotFound => "SOURCE_NOT_FOUND",
Self::SinkNotFound => "SINK_NOT_FOUND", Self::SinkNotFound => "SINK_NOT_FOUND",
} }
@ -389,6 +388,13 @@ impl ErrorCode {
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> { pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value { match value {
"INTERNAL_ERROR" => Some(Self::InternalError), "INTERNAL_ERROR" => Some(Self::InternalError),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"INITIALIZER_PANIC" => Some(Self::InitializerPanic),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
@ -399,12 +405,6 @@ impl ErrorCode {
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
"SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery), "SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound), "SINK_NOT_FOUND" => Some(Self::SinkNotFound),
_ => None, _ => None,
@ -418,7 +418,7 @@ pub mod simulation_server {
dead_code, dead_code,
missing_docs, missing_docs,
clippy::wildcard_imports, clippy::wildcard_imports,
clippy::let_unit_value clippy::let_unit_value,
)] )]
use tonic::codegen::*; use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with SimulationServer. /// Generated trait containing gRPC methods that should be implemented for use with SimulationServer.
@ -443,19 +443,31 @@ pub mod simulation_server {
async fn schedule_event( async fn schedule_event(
&self, &self,
request: tonic::Request<super::ScheduleEventRequest>, request: tonic::Request<super::ScheduleEventRequest>,
) -> std::result::Result<tonic::Response<super::ScheduleEventReply>, tonic::Status>; ) -> std::result::Result<
tonic::Response<super::ScheduleEventReply>,
tonic::Status,
>;
async fn cancel_event( async fn cancel_event(
&self, &self,
request: tonic::Request<super::CancelEventRequest>, request: tonic::Request<super::CancelEventRequest>,
) -> std::result::Result<tonic::Response<super::CancelEventReply>, tonic::Status>; ) -> std::result::Result<
tonic::Response<super::CancelEventReply>,
tonic::Status,
>;
async fn process_event( async fn process_event(
&self, &self,
request: tonic::Request<super::ProcessEventRequest>, request: tonic::Request<super::ProcessEventRequest>,
) -> std::result::Result<tonic::Response<super::ProcessEventReply>, tonic::Status>; ) -> std::result::Result<
tonic::Response<super::ProcessEventReply>,
tonic::Status,
>;
async fn process_query( async fn process_query(
&self, &self,
request: tonic::Request<super::ProcessQueryRequest>, request: tonic::Request<super::ProcessQueryRequest>,
) -> std::result::Result<tonic::Response<super::ProcessQueryReply>, tonic::Status>; ) -> std::result::Result<
tonic::Response<super::ProcessQueryReply>,
tonic::Status,
>;
async fn read_events( async fn read_events(
&self, &self,
request: tonic::Request<super::ReadEventsRequest>, request: tonic::Request<super::ReadEventsRequest>,
@ -490,7 +502,10 @@ pub mod simulation_server {
max_encoding_message_size: None, max_encoding_message_size: None,
} }
} }
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F> pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where where
F: tonic::service::Interceptor, F: tonic::service::Interceptor,
{ {
@ -542,18 +557,24 @@ pub mod simulation_server {
} }
fn call(&mut self, req: http::Request<B>) -> Self::Future { fn call(&mut self, req: http::Request<B>) -> Self::Future {
match req.uri().path() { match req.uri().path() {
"/simulation.Simulation/Init" => { "/simulation.v1.Simulation/Init" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct InitSvc<T: Simulation>(pub Arc<T>); struct InitSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::InitRequest> for InitSvc<T> { impl<T: Simulation> tonic::server::UnaryService<super::InitRequest>
for InitSvc<T> {
type Response = super::InitReply; type Response = super::InitReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::InitRequest>, request: tonic::Request<super::InitRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { <T as Simulation>::init(&inner, request).await }; let fut = async move {
<T as Simulation>::init(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -579,18 +600,24 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/Time" => { "/simulation.v1.Simulation/Time" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct TimeSvc<T: Simulation>(pub Arc<T>); struct TimeSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::TimeRequest> for TimeSvc<T> { impl<T: Simulation> tonic::server::UnaryService<super::TimeRequest>
for TimeSvc<T> {
type Response = super::TimeReply; type Response = super::TimeReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::TimeRequest>, request: tonic::Request<super::TimeRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { <T as Simulation>::time(&inner, request).await }; let fut = async move {
<T as Simulation>::time(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -616,18 +643,24 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/Step" => { "/simulation.v1.Simulation/Step" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct StepSvc<T: Simulation>(pub Arc<T>); struct StepSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::StepRequest> for StepSvc<T> { impl<T: Simulation> tonic::server::UnaryService<super::StepRequest>
for StepSvc<T> {
type Response = super::StepReply; type Response = super::StepReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::StepRequest>, request: tonic::Request<super::StepRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { <T as Simulation>::step(&inner, request).await }; let fut = async move {
<T as Simulation>::step(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -653,19 +686,26 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/StepUntil" => { "/simulation.v1.Simulation/StepUntil" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct StepUntilSvc<T: Simulation>(pub Arc<T>); struct StepUntilSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::StepUntilRequest> for StepUntilSvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::StepUntilRequest>
for StepUntilSvc<T> {
type Response = super::StepUntilReply; type Response = super::StepUntilReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::StepUntilRequest>, request: tonic::Request<super::StepUntilRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = let fut = async move {
async move { <T as Simulation>::step_until(&inner, request).await }; <T as Simulation>::step_until(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -691,14 +731,18 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/ScheduleEvent" => { "/simulation.v1.Simulation/ScheduleEvent" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ScheduleEventSvc<T: Simulation>(pub Arc<T>); struct ScheduleEventSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ScheduleEventRequest> impl<
for ScheduleEventSvc<T> T: Simulation,
{ > tonic::server::UnaryService<super::ScheduleEventRequest>
for ScheduleEventSvc<T> {
type Response = super::ScheduleEventReply; type Response = super::ScheduleEventReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ScheduleEventRequest>, request: tonic::Request<super::ScheduleEventRequest>,
@ -732,12 +776,18 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/CancelEvent" => { "/simulation.v1.Simulation/CancelEvent" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct CancelEventSvc<T: Simulation>(pub Arc<T>); struct CancelEventSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::CancelEventRequest> for CancelEventSvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::CancelEventRequest>
for CancelEventSvc<T> {
type Response = super::CancelEventReply; type Response = super::CancelEventReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::CancelEventRequest>, request: tonic::Request<super::CancelEventRequest>,
@ -771,12 +821,18 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/ProcessEvent" => { "/simulation.v1.Simulation/ProcessEvent" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ProcessEventSvc<T: Simulation>(pub Arc<T>); struct ProcessEventSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ProcessEventRequest> for ProcessEventSvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::ProcessEventRequest>
for ProcessEventSvc<T> {
type Response = super::ProcessEventReply; type Response = super::ProcessEventReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ProcessEventRequest>, request: tonic::Request<super::ProcessEventRequest>,
@ -810,12 +866,18 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/ProcessQuery" => { "/simulation.v1.Simulation/ProcessQuery" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ProcessQuerySvc<T: Simulation>(pub Arc<T>); struct ProcessQuerySvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ProcessQueryRequest> for ProcessQuerySvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::ProcessQueryRequest>
for ProcessQuerySvc<T> {
type Response = super::ProcessQueryReply; type Response = super::ProcessQueryReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ProcessQueryRequest>, request: tonic::Request<super::ProcessQueryRequest>,
@ -849,12 +911,18 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/ReadEvents" => { "/simulation.v1.Simulation/ReadEvents" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ReadEventsSvc<T: Simulation>(pub Arc<T>); struct ReadEventsSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ReadEventsRequest> for ReadEventsSvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::ReadEventsRequest>
for ReadEventsSvc<T> {
type Response = super::ReadEventsReply; type Response = super::ReadEventsReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ReadEventsRequest>, request: tonic::Request<super::ReadEventsRequest>,
@ -888,19 +956,26 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/OpenSink" => { "/simulation.v1.Simulation/OpenSink" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct OpenSinkSvc<T: Simulation>(pub Arc<T>); struct OpenSinkSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::OpenSinkRequest> for OpenSinkSvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::OpenSinkRequest>
for OpenSinkSvc<T> {
type Response = super::OpenSinkReply; type Response = super::OpenSinkReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::OpenSinkRequest>, request: tonic::Request<super::OpenSinkRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = let fut = async move {
async move { <T as Simulation>::open_sink(&inner, request).await }; <T as Simulation>::open_sink(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -926,19 +1001,26 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
"/simulation.Simulation/CloseSink" => { "/simulation.v1.Simulation/CloseSink" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct CloseSinkSvc<T: Simulation>(pub Arc<T>); struct CloseSinkSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::CloseSinkRequest> for CloseSinkSvc<T> { impl<
T: Simulation,
> tonic::server::UnaryService<super::CloseSinkRequest>
for CloseSinkSvc<T> {
type Response = super::CloseSinkReply; type Response = super::CloseSinkReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>; type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::CloseSinkRequest>, request: tonic::Request<super::CloseSinkRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = let fut = async move {
async move { <T as Simulation>::close_sink(&inner, request).await }; <T as Simulation>::close_sink(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -964,19 +1046,23 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
_ => Box::pin(async move { _ => {
let mut response = http::Response::new(empty_body()); Box::pin(async move {
let headers = response.headers_mut(); let mut response = http::Response::new(empty_body());
headers.insert( let headers = response.headers_mut();
tonic::Status::GRPC_STATUS, headers
(tonic::Code::Unimplemented as i32).into(), .insert(
); tonic::Status::GRPC_STATUS,
headers.insert( (tonic::Code::Unimplemented as i32).into(),
http::header::CONTENT_TYPE, );
tonic::metadata::GRPC_CONTENT_TYPE, headers
); .insert(
Ok(response) http::header::CONTENT_TYPE,
}), tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
})
}
} }
} }
} }
@ -993,7 +1079,7 @@ pub mod simulation_server {
} }
} }
/// Generated gRPC service name /// Generated gRPC service name
pub const SERVICE_NAME: &str = "simulation.Simulation"; pub const SERVICE_NAME: &str = "simulation.v1.Simulation";
impl<T> tonic::server::NamedService for SimulationServer<T> { impl<T> tonic::server::NamedService for SimulationServer<T> {
const NAME: &'static str = SERVICE_NAME; const NAME: &'static str = SERVICE_NAME;
} }

View File

@ -13,8 +13,8 @@ pub(crate) struct KeyRegistry {
impl KeyRegistry { impl KeyRegistry {
/// Inserts an `ActionKey` into the registry. /// Inserts an `ActionKey` into the registry.
/// ///
/// The provided expiration deadline is the latest time at which the key may /// The provided expiration deadline is the latest time at which the key is
/// still be active. /// guaranteed to be extractable.
pub(crate) fn insert_key( pub(crate) fn insert_key(
&mut self, &mut self,
action_key: ActionKey, action_key: ActionKey,

View File

@ -1,6 +1,7 @@
//! gRPC simulation service. //! gRPC simulation service.
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::MutexGuard; use std::sync::MutexGuard;
@ -13,7 +14,7 @@ use crate::simulation::{Simulation, SimulationError};
use super::codegen::simulation::*; use super::codegen::simulation::*;
use super::key_registry::KeyRegistry; use super::key_registry::KeyRegistry;
use super::services::InitService; use super::services::InitService;
use super::services::{ControllerService, MonitorService}; use super::services::{ControllerService, MonitorService, SchedulerService};
/// Runs a gRPC simulation server. /// Runs a gRPC simulation server.
/// ///
@ -37,7 +38,8 @@ fn run_service(
service: GrpcSimulationService, service: GrpcSimulationService,
addr: SocketAddr, addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the controller and monitor services can be used // Use 2 threads so that the even if the controller service is blocked due
// to ongoing simulation execution, other services can still be used
// concurrently. // concurrently.
let rt = tokio::runtime::Builder::new_multi_thread() let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) .worker_threads(2)
@ -58,6 +60,7 @@ struct GrpcSimulationService {
init_service: Mutex<InitService>, init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>, controller_service: Mutex<ControllerService>,
monitor_service: Mutex<MonitorService>, monitor_service: Mutex<MonitorService>,
scheduler_service: Mutex<SchedulerService>,
} }
impl GrpcSimulationService { impl GrpcSimulationService {
@ -76,6 +79,7 @@ impl GrpcSimulationService {
init_service: Mutex::new(InitService::new(sim_gen)), init_service: Mutex::new(InitService::new(sim_gen)),
controller_service: Mutex::new(ControllerService::NotStarted), controller_service: Mutex::new(ControllerService::NotStarted),
monitor_service: Mutex::new(MonitorService::NotStarted), monitor_service: Mutex::new(MonitorService::NotStarted),
scheduler_service: Mutex::new(SchedulerService::NotStarted),
} }
} }
@ -93,6 +97,11 @@ impl GrpcSimulationService {
fn monitor(&self) -> MutexGuard<'_, MonitorService> { fn monitor(&self) -> MutexGuard<'_, MonitorService> {
self.monitor_service.lock().unwrap() self.monitor_service.lock().unwrap()
} }
/// Locks the scheduler and returns the mutex guard.
fn scheduler(&self) -> MutexGuard<'_, SchedulerService> {
self.scheduler_service.lock().unwrap()
}
} }
#[tonic::async_trait] #[tonic::async_trait]
@ -103,15 +112,22 @@ impl simulation_server::Simulation for GrpcSimulationService {
let (reply, bench) = self.initializer().init(request); let (reply, bench) = self.initializer().init(request);
if let Some((simulation, scheduler, endpoint_registry)) = bench { if let Some((simulation, scheduler, endpoint_registry)) = bench {
let event_source_registry = Arc::new(endpoint_registry.event_source_registry);
let query_source_registry = endpoint_registry.query_source_registry;
let event_sink_registry = endpoint_registry.event_sink_registry;
*self.controller() = ControllerService::Started { *self.controller() = ControllerService::Started {
simulation, simulation,
scheduler, event_source_registry: event_source_registry.clone(),
event_source_registry: endpoint_registry.event_source_registry, query_source_registry,
query_source_registry: endpoint_registry.query_source_registry,
key_registry: KeyRegistry::default(),
}; };
*self.monitor() = MonitorService::Started { *self.monitor() = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry, event_sink_registry,
};
*self.scheduler() = SchedulerService::Started {
scheduler,
event_source_registry,
key_registry: KeyRegistry::default(),
}; };
} }
@ -120,7 +136,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> { async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> {
let request = request.into_inner(); let request = request.into_inner();
Ok(Response::new(self.controller().time(request))) Ok(Response::new(self.scheduler().time(request)))
} }
async fn step(&self, request: Request<StepRequest>) -> Result<Response<StepReply>, Status> { async fn step(&self, request: Request<StepRequest>) -> Result<Response<StepReply>, Status> {
let request = request.into_inner(); let request = request.into_inner();
@ -141,7 +157,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
) -> Result<Response<ScheduleEventReply>, Status> { ) -> Result<Response<ScheduleEventReply>, Status> {
let request = request.into_inner(); let request = request.into_inner();
Ok(Response::new(self.controller().schedule_event(request))) Ok(Response::new(self.scheduler().schedule_event(request)))
} }
async fn cancel_event( async fn cancel_event(
&self, &self,
@ -149,7 +165,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
) -> Result<Response<CancelEventReply>, Status> { ) -> Result<Response<CancelEventReply>, Status> {
let request = request.into_inner(); let request = request.into_inner();
Ok(Response::new(self.controller().cancel_event(request))) Ok(Response::new(self.scheduler().cancel_event(request)))
} }
async fn process_event( async fn process_event(
&self, &self,

View File

@ -1,6 +1,7 @@
mod controller_service; mod controller_service;
mod init_service; mod init_service;
mod monitor_service; mod monitor_service;
mod scheduler_service;
use std::time::Duration; use std::time::Duration;
@ -13,6 +14,7 @@ use crate::simulation::{ExecutionError, SchedulingError, SimulationError};
pub(crate) use controller_service::ControllerService; pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService; pub(crate) use init_service::InitService;
pub(crate) use monitor_service::MonitorService; pub(crate) use monitor_service::MonitorService;
pub(crate) use scheduler_service::SchedulerService;
/// Transforms an error code and a message into a Protobuf error. /// Transforms an error code and a message into a Protobuf error.
fn to_error(code: ErrorCode, message: impl Into<String>) -> Error { fn to_error(code: ErrorCode, message: impl Into<String>) -> Error {

View File

@ -1,59 +1,32 @@
use std::fmt; use std::fmt;
use std::sync::Arc;
use prost_types::Timestamp; use prost_types::Timestamp;
use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::{EventSourceRegistry, QuerySourceRegistry}; use crate::registry::{EventSourceRegistry, QuerySourceRegistry};
use crate::simulation::{Scheduler, Simulation}; use crate::simulation::Simulation;
use super::super::codegen::simulation::*; use super::super::codegen::simulation::*;
use super::{ use super::{
map_execution_error, map_scheduling_error, monotonic_to_timestamp, map_execution_error, monotonic_to_timestamp, simulation_not_started_error,
simulation_not_started_error, timestamp_to_monotonic, to_error, to_positive_duration, timestamp_to_monotonic, to_error, to_positive_duration,
to_strictly_positive_duration,
}; };
/// Protobuf-based simulation manager. /// Protobuf-based simulation controller.
/// ///
/// A `ControllerService` enables the management of the lifecycle of a /// A `ControllerService` controls the execution of the simulation. Note that
/// simulation. /// all its methods block until execution completes.
///
/// Its methods map the various RPC simulation control service methods defined
/// in `simulation.proto`.
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub(crate) enum ControllerService { pub(crate) enum ControllerService {
NotStarted, NotStarted,
Started { Started {
simulation: Simulation, simulation: Simulation,
scheduler: Scheduler, event_source_registry: Arc<EventSourceRegistry>,
event_source_registry: EventSourceRegistry,
query_source_registry: QuerySourceRegistry, query_source_registry: QuerySourceRegistry,
key_registry: KeyRegistry,
}, },
} }
impl ControllerService { impl ControllerService {
/// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply {
let reply = match self {
Self::Started { simulation, .. } => {
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
time_reply::Result::Time(timestamp)
} else {
time_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()),
};
TimeReply {
result: Some(reply),
}
}
/// Advances simulation time to that of the next scheduled event, processing /// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other events scheduled for the same time. /// that event as well as all other events scheduled for the same time.
/// ///
@ -144,155 +117,6 @@ impl ControllerService {
} }
} }
/// Schedules an event at a future time.
pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply {
let reply = match self {
Self::Started {
simulation,
scheduler,
event_source_registry,
key_registry,
..
} => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name;
let event = &request.event;
let with_key = request.with_key;
let period = request
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or(to_error(
ErrorCode::InvalidPeriod,
"the specified event period is not strictly positive",
))
})
.transpose()?;
let deadline = request.deadline.ok_or(to_error(
ErrorCode::MissingArgument,
"missing deadline argument",
))?;
let deadline = match deadline {
schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time)
.ok_or(to_error(
ErrorCode::InvalidTime,
"out-of-range nanosecond field",
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDeadline,
"the specified scheduling deadline is not in the future",
))?;
simulation.time() + duration
}
};
let source = event_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no event source is registered with the name '{}'".to_string(),
))?;
let (action, action_key) = match (with_key, period) {
(false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
let key_id = action_key.map(|action_key| {
// Free stale keys from the registry.
key_registry.remove_expired_keys(simulation.time());
if period.is_some() {
key_registry.insert_eternal_key(action_key)
} else {
key_registry.insert_key(action_key, deadline)
}
});
scheduler
.schedule(deadline, action)
.map_err(map_scheduling_error)?;
Ok(key_id)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
ScheduleEventReply {
result: Some(match reply {
Ok(Some(key_id)) => {
let (subkey1, subkey2) = key_id.into_raw_parts();
schedule_event_reply::Result::Key(EventKey {
subkey1: subkey1
.try_into()
.expect("action key index is too large to be serialized"),
subkey2,
})
}
Ok(None) => schedule_event_reply::Result::Empty(()),
Err(error) => schedule_event_reply::Result::Error(error),
}),
}
}
/// Cancels a keyed event.
pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply {
let reply = match self {
Self::Started {
simulation,
key_registry,
..
} => move || -> Result<(), Error> {
let key = request
.key
.ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?;
let subkey1: usize = key
.subkey1
.try_into()
.map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?;
let subkey2 = key.subkey2;
let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2);
key_registry.remove_expired_keys(simulation.time());
let key = key_registry.extract_key(key_id).ok_or(to_error(
ErrorCode::InvalidKey,
"invalid or expired event key",
))?;
key.cancel();
Ok(())
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
CancelEventReply {
result: Some(match reply {
Ok(()) => cancel_event_reply::Result::Empty(()),
Err(error) => cancel_event_reply::Result::Error(error),
}),
}
}
/// Broadcasts an event from an event source immediately, blocking until /// Broadcasts an event from an event source immediately, blocking until
/// completion. /// completion.
/// ///
@ -307,7 +131,7 @@ impl ControllerService {
let source_name = &request.source_name; let source_name = &request.source_name;
let event = &request.event; let event = &request.event;
let source = event_source_registry.get_mut(source_name).ok_or(to_error( let source = event_source_registry.get(source_name).ok_or(to_error(
ErrorCode::SourceNotFound, ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(), "no source is registered with the name '{}'".to_string(),
))?; ))?;
@ -350,7 +174,7 @@ impl ControllerService {
let source_name = &request.source_name; let source_name = &request.source_name;
let request = &request.request; let request = &request.request;
let source = query_source_registry.get_mut(source_name).ok_or(to_error( let source = query_source_registry.get(source_name).ok_or(to_error(
ErrorCode::SourceNotFound, ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(), "no source is registered with the name '{}'".to_string(),
))?; ))?;

View File

@ -1,3 +1,5 @@
use std::panic::{self, AssertUnwindSafe};
use ciborium; use ciborium;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@ -16,8 +18,6 @@ type SimGen = Box<dyn FnMut(&[u8]) -> Result<InitResult, DeserializationError> +
/// ///
/// An `InitService` creates a new simulation bench based on a serialized /// An `InitService` creates a new simulation bench based on a serialized
/// initialization configuration. /// initialization configuration.
///
/// It maps the `Init` method defined in `simulation.proto`.
pub(crate) struct InitService { pub(crate) struct InitService {
sim_gen: SimGen, sim_gen: SimGen,
} }
@ -51,17 +51,39 @@ impl InitService {
&mut self, &mut self,
request: InitRequest, request: InitRequest,
) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) { ) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) {
let reply = (self.sim_gen)(&request.cfg) let reply = panic::catch_unwind(AssertUnwindSafe(|| (self.sim_gen)(&request.cfg)))
.map_err(|e| { .map_err(|payload| {
to_error( let panic_msg: Option<&str> = if let Some(s) = payload.downcast_ref::<&str>() {
ErrorCode::InvalidMessage, Some(s)
} else if let Some(s) = payload.downcast_ref::<String>() {
Some(s)
} else {
None
};
let error_msg = if let Some(panic_msg) = panic_msg {
format!( format!(
"the initialization configuration could not be deserialized: {}", "the simulation initializer has panicked with the message `{}`",
e panic_msg
), )
) } else {
String::from("the simulation initializer has panicked")
};
to_error(ErrorCode::InitializerPanic, error_msg)
}) })
.and_then(|init_result| init_result.map_err(map_simulation_error)); .and_then(|res| {
res.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the initializer configuration could not be deserialized: {}",
e
),
)
})
.and_then(|init_result| init_result.map_err(map_simulation_error))
});
let (reply, bench) = match reply { let (reply, bench) = match reply {
Ok((simulation, registry)) => { Ok((simulation, registry)) => {

View File

@ -9,9 +9,6 @@ use super::{simulation_not_started_error, to_error};
/// ///
/// A `MonitorService` enables the monitoring of the event sinks of a /// A `MonitorService` enables the monitoring of the event sinks of a
/// [`Simulation`](crate::simulation::Simulation). /// [`Simulation`](crate::simulation::Simulation).
///
/// Its methods map the various RPC monitoring service methods defined in
/// `simulation.proto`.
pub(crate) enum MonitorService { pub(crate) enum MonitorService {
Started { Started {
event_sink_registry: EventSinkRegistry, event_sink_registry: EventSinkRegistry,

View File

@ -0,0 +1,200 @@
use std::fmt;
use std::sync::Arc;
use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::EventSourceRegistry;
use crate::simulation::Scheduler;
use super::super::codegen::simulation::*;
use super::{
map_scheduling_error, monotonic_to_timestamp, simulation_not_started_error,
timestamp_to_monotonic, to_error, to_strictly_positive_duration,
};
/// Protobuf-based simulation scheduler.
///
/// A `SchedulerService` enables the scheduling of simulation events.
#[allow(clippy::large_enum_variant)]
pub(crate) enum SchedulerService {
NotStarted,
Started {
scheduler: Scheduler,
event_source_registry: Arc<EventSourceRegistry>,
key_registry: KeyRegistry,
},
}
impl SchedulerService {
/// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply {
let reply = match self {
Self::Started { scheduler, .. } => {
if let Some(timestamp) = monotonic_to_timestamp(scheduler.time()) {
time_reply::Result::Time(timestamp)
} else {
time_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()),
};
TimeReply {
result: Some(reply),
}
}
/// Schedules an event at a future time.
pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply {
let reply = match self {
Self::Started {
scheduler,
event_source_registry,
key_registry,
} => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name;
let event = &request.event;
let with_key = request.with_key;
let period = request
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or(to_error(
ErrorCode::InvalidPeriod,
"the specified event period is not strictly positive",
))
})
.transpose()?;
let source = event_source_registry.get(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no event source is registered with the name '{}'".to_string(),
))?;
let (action, action_key) = match (with_key, period) {
(false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
let deadline = request.deadline.ok_or(to_error(
ErrorCode::MissingArgument,
"missing deadline argument",
))?;
let deadline = match deadline {
schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time)
.ok_or(to_error(
ErrorCode::InvalidTime,
"out-of-range nanosecond field",
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDeadline,
"the specified scheduling deadline is not in the future",
))?;
scheduler.time() + duration
}
};
let key_id = action_key.map(|action_key| {
key_registry.remove_expired_keys(scheduler.time());
if period.is_some() {
key_registry.insert_eternal_key(action_key)
} else {
key_registry.insert_key(action_key, deadline)
}
});
scheduler
.schedule(deadline, action)
.map_err(map_scheduling_error)?;
Ok(key_id)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
ScheduleEventReply {
result: Some(match reply {
Ok(Some(key_id)) => {
let (subkey1, subkey2) = key_id.into_raw_parts();
schedule_event_reply::Result::Key(EventKey {
subkey1: subkey1
.try_into()
.expect("action key index is too large to be serialized"),
subkey2,
})
}
Ok(None) => schedule_event_reply::Result::Empty(()),
Err(error) => schedule_event_reply::Result::Error(error),
}),
}
}
/// Cancels a keyed event.
pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply {
let reply = match self {
Self::Started {
scheduler,
key_registry,
..
} => move || -> Result<(), Error> {
let key = request
.key
.ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?;
let subkey1: usize = key
.subkey1
.try_into()
.map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?;
let subkey2 = key.subkey2;
let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2);
key_registry.remove_expired_keys(scheduler.time());
let key = key_registry.extract_key(key_id).ok_or(to_error(
ErrorCode::InvalidKey,
"invalid or expired event key",
))?;
key.cancel();
Ok(())
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
CancelEventReply {
result: Some(match reply {
Ok(()) => cancel_event_reply::Result::Empty(()),
Err(error) => cancel_event_reply::Result::Error(error),
}),
}
}
}
impl fmt::Debug for SchedulerService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SchedulerService").finish_non_exhaustive()
}
}

View File

@ -2,7 +2,7 @@ mod broadcaster;
mod sender; mod sender;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use crate::model::Model; use crate::model::Model;
@ -28,7 +28,7 @@ use super::ReplierFn;
/// however, to be instantiated as a member of a model, but rather as a /// however, to be instantiated as a member of a model, but rather as a
/// simulation control endpoint instantiated during bench assembly. /// simulation control endpoint instantiated during bench assembly.
pub struct EventSource<T: Clone + Send + 'static> { pub struct EventSource<T: Clone + Send + 'static> {
broadcaster: Arc<Mutex<EventBroadcaster<T>>>, broadcaster: EventBroadcaster<T>,
} }
impl<T: Clone + Send + 'static> EventSource<T> { impl<T: Clone + Send + 'static> EventSource<T> {
@ -46,11 +46,11 @@ impl<T: Clone + Send + 'static> EventSource<T> {
pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>) pub fn connect<M, F, S>(&mut self, input: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone, F: for<'a> InputFn<'a, M, T, S> + Clone + Sync,
S: Send + 'static, S: Send + Sync + 'static,
{ {
let sender = Box::new(InputSender::new(input, address.into().0)); let sender = Box::new(InputSender::new(input, address.into().0));
self.broadcaster.lock().unwrap().add(sender); self.broadcaster.add(sender);
} }
/// Adds an auto-converting connection to an input port of the model /// Adds an auto-converting connection to an input port of the model
@ -65,13 +65,13 @@ impl<T: Clone + Send + 'static> EventSource<T> {
pub fn map_connect<M, C, F, U, S>(&mut self, map: C, input: F, address: impl Into<Address<M>>) pub fn map_connect<M, C, F, U, S>(&mut self, map: C, input: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> U + Send + 'static, C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone, F: for<'a> InputFn<'a, M, U, S> + Sync + Clone,
U: Send + 'static, U: Send + 'static,
S: Send + 'static, S: Send + Sync + 'static,
{ {
let sender = Box::new(MapInputSender::new(map, input, address.into().0)); let sender = Box::new(MapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender); self.broadcaster.add(sender);
} }
/// Adds an auto-converting, filtered connection to an input port of the /// Adds an auto-converting, filtered connection to an input port of the
@ -90,22 +90,19 @@ impl<T: Clone + Send + 'static> EventSource<T> {
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) where ) where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static, C: for<'a> Fn(&'a T) -> Option<U> + Send + Sync + 'static,
F: for<'a> InputFn<'a, M, U, S> + Clone, F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
U: Send + 'static, U: Send + 'static,
S: Send + 'static, S: Send + Sync + 'static,
{ {
let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0)); let sender = Box::new(FilterMapInputSender::new(map, input, address.into().0));
self.broadcaster.lock().unwrap().add(sender); self.broadcaster.add(sender);
} }
/// Returns an action which, when processed, broadcasts an event to all /// Returns an action which, when processed, broadcasts an event to all
/// connected input ports. /// connected input ports.
/// pub fn event(&self, arg: T) -> Action {
/// Note that the action broadcasts the event to those models that are let fut = self.broadcaster.broadcast(arg);
/// connected to the event source at the time the action is processed.
pub fn event(&mut self, arg: T) -> Action {
let fut = self.broadcaster.lock().unwrap().broadcast(arg);
let fut = async { let fut = async {
fut.await.unwrap_or_throw(); fut.await.unwrap_or_throw();
}; };
@ -115,12 +112,9 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// Returns a cancellable action and a cancellation key; when processed, the /// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports. /// action broadcasts an event to all connected input ports.
/// pub fn keyed_event(&self, arg: T) -> (Action, ActionKey) {
/// Note that the action broadcasts the event to those models that are
/// connected to the event source at the time the action is processed.
pub fn keyed_event(&mut self, arg: T) -> (Action, ActionKey) {
let action_key = ActionKey::new(); let action_key = ActionKey::new();
let fut = self.broadcaster.lock().unwrap().broadcast(arg); let fut = self.broadcaster.broadcast(arg);
let action = Action::new(KeyedOnceAction::new( let action = Action::new(KeyedOnceAction::new(
// Cancellation is ignored once the action is already spawned on the // Cancellation is ignored once the action is already spawned on the
@ -139,15 +133,12 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// Returns a periodically recurring action which, when processed, /// Returns a periodically recurring action which, when processed,
/// broadcasts an event to all connected input ports. /// broadcasts an event to all connected input ports.
/// pub fn periodic_event(self: &Arc<Self>, period: Duration, arg: T) -> Action {
/// Note that the action broadcasts the event to those models that are let source = self.clone();
/// connected to the event source at the time the action is processed.
pub fn periodic_event(&mut self, period: Duration, arg: T) -> Action {
let broadcaster = self.broadcaster.clone();
Action::new(PeriodicAction::new( Action::new(PeriodicAction::new(
|| async move { || async move {
let fut = broadcaster.lock().unwrap().broadcast(arg); let fut = source.broadcaster.broadcast(arg);
fut.await.unwrap_or_throw(); fut.await.unwrap_or_throw();
}, },
period, period,
@ -157,12 +148,9 @@ impl<T: Clone + Send + 'static> EventSource<T> {
/// Returns a cancellable, periodically recurring action and a cancellation /// Returns a cancellable, periodically recurring action and a cancellation
/// key; when processed, the action broadcasts an event to all connected /// key; when processed, the action broadcasts an event to all connected
/// input ports. /// input ports.
/// pub fn keyed_periodic_event(self: &Arc<Self>, period: Duration, arg: T) -> (Action, ActionKey) {
/// Note that the action broadcasts the event to those models that are
/// connected to the event source at the time the action is processed.
pub fn keyed_periodic_event(&mut self, period: Duration, arg: T) -> (Action, ActionKey) {
let action_key = ActionKey::new(); let action_key = ActionKey::new();
let broadcaster = self.broadcaster.clone(); let source = self.clone();
let action = Action::new(KeyedPeriodicAction::new( let action = Action::new(KeyedPeriodicAction::new(
// Cancellation is ignored once the action is already spawned on the // Cancellation is ignored once the action is already spawned on the
@ -171,7 +159,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
// used outside the simulator, this shouldn't be an issue in // used outside the simulator, this shouldn't be an issue in
// practice. // practice.
|_| async move { |_| async move {
let fut = broadcaster.lock().unwrap().broadcast(arg); let fut = source.broadcaster.broadcast(arg);
fut.await.unwrap_or_throw(); fut.await.unwrap_or_throw();
}, },
period, period,
@ -185,7 +173,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
impl<T: Clone + Send + 'static> Default for EventSource<T> { impl<T: Clone + Send + 'static> Default for EventSource<T> {
fn default() -> Self { fn default() -> Self {
Self { Self {
broadcaster: Arc::new(Mutex::new(EventBroadcaster::default())), broadcaster: EventBroadcaster::default(),
} }
} }
} }
@ -195,7 +183,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for EventSource<T> {
write!( write!(
f, f,
"Event source ({} connected ports)", "Event source ({} connected ports)",
self.broadcaster.lock().unwrap().len() self.broadcaster.len()
) )
} }
} }
@ -208,7 +196,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for EventSource<T> {
/// member of a model, but rather as a simulation monitoring endpoint /// member of a model, but rather as a simulation monitoring endpoint
/// instantiated during bench assembly. /// instantiated during bench assembly.
pub struct QuerySource<T: Clone + Send + 'static, R: Send + 'static> { pub struct QuerySource<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: Arc<Mutex<QueryBroadcaster<T, R>>>, broadcaster: QueryBroadcaster<T, R>,
} }
impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> { impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
@ -226,11 +214,11 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>) pub fn connect<M, F, S>(&mut self, replier: F, address: impl Into<Address<M>>)
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync,
S: Send + 'static, S: Send + Sync + 'static,
{ {
let sender = Box::new(ReplierSender::new(replier, address.into().0)); let sender = Box::new(ReplierSender::new(replier, address.into().0));
self.broadcaster.lock().unwrap().add(sender); self.broadcaster.add(sender);
} }
/// Adds an auto-converting connection to a replier port of the model /// Adds an auto-converting connection to a replier port of the model
@ -251,12 +239,12 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) where ) where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> U + Send + 'static, C: for<'a> Fn(&'a T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
U: Send + 'static, U: Send + 'static,
Q: Send + 'static, Q: Send + 'static,
S: Send + 'static, S: Send + Sync + 'static,
{ {
let sender = Box::new(MapReplierSender::new( let sender = Box::new(MapReplierSender::new(
query_map, query_map,
@ -264,7 +252,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
replier, replier,
address.into().0, address.into().0,
)); ));
self.broadcaster.lock().unwrap().add(sender); self.broadcaster.add(sender);
} }
/// Adds an auto-converting, filtered connection to a replier port of the /// Adds an auto-converting, filtered connection to a replier port of the
@ -285,12 +273,12 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
) where ) where
M: Model, M: Model,
C: for<'a> Fn(&'a T) -> Option<U> + Send + 'static, C: for<'a> Fn(&'a T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
U: Send + 'static, U: Send + 'static,
Q: Send + 'static, Q: Send + 'static,
S: Send + 'static, S: Send + Sync + 'static,
{ {
let sender = Box::new(FilterMapReplierSender::new( let sender = Box::new(FilterMapReplierSender::new(
query_filter_map, query_filter_map,
@ -298,17 +286,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
replier, replier,
address.into().0, address.into().0,
)); ));
self.broadcaster.lock().unwrap().add(sender); self.broadcaster.add(sender);
} }
/// Returns an action which, when processed, broadcasts a query to all /// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports. /// connected replier ports.
/// pub fn query(&self, arg: T) -> (Action, ReplyReceiver<R>) {
/// Note that the action broadcasts the query to those models that are
/// connected to the query source at the time the action is processed.
pub fn query(&mut self, arg: T) -> (Action, ReplyReceiver<R>) {
let (writer, reader) = slot::slot(); let (writer, reader) = slot::slot();
let fut = self.broadcaster.lock().unwrap().broadcast(arg); let fut = self.broadcaster.broadcast(arg);
let fut = async move { let fut = async move {
let replies = fut.await.unwrap_or_throw(); let replies = fut.await.unwrap_or_throw();
let _ = writer.write(replies); let _ = writer.write(replies);
@ -323,7 +308,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
impl<T: Clone + Send + 'static, R: Send + 'static> Default for QuerySource<T, R> { impl<T: Clone + Send + 'static, R: Send + 'static> Default for QuerySource<T, R> {
fn default() -> Self { fn default() -> Self {
Self { Self {
broadcaster: Arc::new(Mutex::new(QueryBroadcaster::default())), broadcaster: QueryBroadcaster::default(),
} }
} }
} }
@ -333,7 +318,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for QuerySource<T,
write!( write!(
f, f,
"Query source ({} connected ports)", "Query source ({} connected ports)",
self.broadcaster.lock().unwrap().len() self.broadcaster.len()
) )
} }
} }

View File

@ -48,11 +48,11 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
/// Return a list of futures broadcasting an event or query to multiple /// Return a list of futures broadcasting an event or query to multiple
/// addresses. /// addresses.
fn futures(&mut self, arg: T) -> Vec<SenderFutureState<R>> { fn futures(&self, arg: T) -> Vec<SenderFutureState<R>> {
let mut future_states = Vec::new(); let mut future_states = Vec::new();
// Broadcast the message and collect all futures. // Broadcast the message and collect all futures.
let mut iter = self.senders.iter_mut(); let mut iter = self.senders.iter();
while let Some(sender) = iter.next() { while let Some(sender) = iter.next() {
// Move the argument for the last future to avoid undue cloning. // Move the argument for the last future to avoid undue cloning.
if iter.len() == 0 { if iter.len() == 0 {
@ -107,17 +107,14 @@ impl<T: Clone + Send> EventBroadcaster<T> {
} }
/// Broadcasts an event to all addresses. /// Broadcasts an event to all addresses.
pub(super) fn broadcast( pub(super) fn broadcast(&self, arg: T) -> impl Future<Output = Result<(), SendError>> + Send {
&mut self,
arg: T,
) -> impl Future<Output = Result<(), SendError>> + Send {
enum Fut<F1, F2> { enum Fut<F1, F2> {
Empty, Empty,
Single(F1), Single(F1),
Multiple(F2), Multiple(F2),
} }
let fut = match self.inner.senders.as_mut_slice() { let fut = match self.inner.senders.as_slice() {
// No sender. // No sender.
[] => Fut::Empty, [] => Fut::Empty,
// One sender at most. // One sender at most.
@ -184,7 +181,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
/// Broadcasts an event to all addresses. /// Broadcasts an event to all addresses.
pub(super) fn broadcast( pub(super) fn broadcast(
&mut self, &self,
arg: T, arg: T,
) -> impl Future<Output = Result<ReplyIterator<R>, SendError>> + Send { ) -> impl Future<Output = Result<ReplyIterator<R>, SendError>> + Send {
enum Fut<F1, F2> { enum Fut<F1, F2> {
@ -193,7 +190,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
Multiple(F2), Multiple(F2),
} }
let fut = match self.inner.senders.as_mut_slice() { let fut = match self.inner.senders.as_slice() {
// No sender. // No sender.
[] => Fut::Empty, [] => Fut::Empty,
// One sender at most. // One sender at most.
@ -731,6 +728,8 @@ mod tests {
#[cfg(all(test, nexosim_loom))] #[cfg(all(test, nexosim_loom))]
mod tests { mod tests {
use std::sync::Mutex;
use futures_channel::mpsc; use futures_channel::mpsc;
use futures_util::StreamExt; use futures_util::StreamExt;
@ -746,14 +745,14 @@ mod tests {
struct TestEvent<R> { struct TestEvent<R> {
// The receiver is actually used only once in tests, so it is moved out // The receiver is actually used only once in tests, so it is moved out
// of the `Option` on first use. // of the `Option` on first use.
receiver: Option<mpsc::UnboundedReceiver<Option<R>>>, receiver: Mutex<Option<mpsc::UnboundedReceiver<Option<R>>>>,
} }
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> { impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
fn send( fn send(
&mut self, &self,
_arg: &(), _arg: &(),
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> { ) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
let receiver = self.receiver.take().unwrap(); let receiver = self.receiver.lock().unwrap().take().unwrap();
Some(Box::pin(async move { Some(Box::pin(async move {
let mut stream = Box::pin(receiver.filter_map(|item| async { item })); let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
@ -782,7 +781,7 @@ mod tests {
( (
TestEvent { TestEvent {
receiver: Some(receiver), receiver: Mutex::new(Some(receiver)),
}, },
TestEventWaker { sender }, TestEventWaker { sender },
) )

View File

@ -14,12 +14,12 @@ use crate::ports::{InputFn, ReplierFn};
pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>; pub(super) type SenderFuture<R> = Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>;
/// An event or query sender abstracting over the target model and input method. /// An event or query sender abstracting over the target model and input method.
pub(super) trait Sender<T, R>: Send { pub(super) trait Sender<T, R>: Send + Sync {
/// Asynchronously sends a message using a reference to the message. /// Asynchronously sends a message using a reference to the message.
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>>; fn send(&self, arg: &T) -> Option<SenderFuture<R>>;
/// Asynchronously sends an owned message. /// Asynchronously sends an owned message.
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> { fn send_owned(&self, arg: T) -> Option<SenderFuture<R>> {
self.send(&arg) self.send(&arg)
} }
} }
@ -52,15 +52,15 @@ where
impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S> impl<M, F, T, S> Sender<T, ()> for InputSender<M, F, T, S>
where where
M: Model, M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone, F: for<'a> InputFn<'a, M, T, S> + Clone + Sync,
T: Clone + Send + 'static, T: Clone + Send + 'static,
S: Send, S: Send + Sync,
{ {
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> { fn send(&self, arg: &T) -> Option<SenderFuture<()>> {
self.send_owned(arg.clone()) self.send_owned(arg.clone())
} }
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<()>> { fn send_owned(&self, arg: T) -> Option<SenderFuture<()>> {
let func = self.func.clone(); let func = self.func.clone();
let sender = self.sender.clone(); let sender = self.sender.clone();
@ -108,13 +108,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S> impl<M, C, F, T, U, S> Sender<T, ()> for MapInputSender<M, C, F, T, U, S>
where where
M: Model, M: Model,
C: Fn(&T) -> U + Send, C: Fn(&T) -> U + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone, F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
T: Send + 'static, T: Send + 'static,
U: Send + 'static, U: Send + 'static,
S: Send, S: Send + Sync,
{ {
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> { fn send(&self, arg: &T) -> Option<SenderFuture<()>> {
let func = self.func.clone(); let func = self.func.clone();
let arg = (self.map)(arg); let arg = (self.map)(arg);
let sender = self.sender.clone(); let sender = self.sender.clone();
@ -163,13 +163,13 @@ where
impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S> impl<M, C, F, T, U, S> Sender<T, ()> for FilterMapInputSender<M, C, F, T, U, S>
where where
M: Model, M: Model,
C: Fn(&T) -> Option<U> + Send, C: Fn(&T) -> Option<U> + Send + Sync,
F: for<'a> InputFn<'a, M, U, S> + Clone, F: for<'a> InputFn<'a, M, U, S> + Clone + Sync,
T: Send + 'static, T: Send + 'static,
U: Send + 'static, U: Send + 'static,
S: Send, S: Send + Sync,
{ {
fn send(&mut self, arg: &T) -> Option<SenderFuture<()>> { fn send(&self, arg: &T) -> Option<SenderFuture<()>> {
(self.filter_map)(arg).map(|arg| { (self.filter_map)(arg).map(|arg| {
let func = self.func.clone(); let func = self.func.clone();
let sender = self.sender.clone(); let sender = self.sender.clone();
@ -215,16 +215,16 @@ where
impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S> impl<M, F, T, R, S> Sender<T, R> for ReplierSender<M, F, T, R, S>
where where
M: Model, M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone + Sync,
T: Clone + Send + 'static, T: Clone + Send + 'static,
R: Send + 'static, R: Send + 'static,
S: Send, S: Send + Sync,
{ {
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> { fn send(&self, arg: &T) -> Option<SenderFuture<R>> {
self.send_owned(arg.clone()) self.send_owned(arg.clone())
} }
fn send_owned(&mut self, arg: T) -> Option<SenderFuture<R>> { fn send_owned(&self, arg: T) -> Option<SenderFuture<R>> {
let func = self.func.clone(); let func = self.func.clone();
let sender = self.sender.clone(); let sender = self.sender.clone();
let (reply_sender, reply_receiver) = oneshot::channel(); let (reply_sender, reply_receiver) = oneshot::channel();
@ -283,16 +283,16 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S> impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for MapReplierSender<M, C, D, F, T, R, U, Q, S>
where where
M: Model, M: Model,
C: Fn(&T) -> U + Send, C: Fn(&T) -> U + Send + Sync,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
T: Send + 'static, T: Send + 'static,
R: Send + 'static, R: Send + 'static,
U: Send + 'static, U: Send + 'static,
Q: Send + 'static, Q: Send + 'static,
S: Send, S: Send + Sync,
{ {
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> { fn send(&self, arg: &T) -> Option<SenderFuture<R>> {
let func = self.func.clone(); let func = self.func.clone();
let arg = (self.query_map)(arg); let arg = (self.query_map)(arg);
let sender = self.sender.clone(); let sender = self.sender.clone();
@ -358,16 +358,16 @@ where
impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S> impl<M, C, D, F, T, R, U, Q, S> Sender<T, R> for FilterMapReplierSender<M, C, D, F, T, R, U, Q, S>
where where
M: Model, M: Model,
C: Fn(&T) -> Option<U> + Send, C: Fn(&T) -> Option<U> + Send + Sync,
D: Fn(Q) -> R + Send + Sync + 'static, D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone + Sync,
T: Send + 'static, T: Send + 'static,
R: Send + 'static, R: Send + 'static,
U: Send + 'static, U: Send + 'static,
Q: Send + 'static, Q: Send + 'static,
S: Send, S: Send + Sync,
{ {
fn send(&mut self, arg: &T) -> Option<SenderFuture<R>> { fn send(&self, arg: &T) -> Option<SenderFuture<R>> {
(self.query_filter_map)(arg).map(|arg| { (self.query_filter_map)(arg).map(|arg| {
let func = self.func.clone(); let func = self.func.clone();
let sender = self.sender.clone(); let sender = self.sender.clone();

View File

@ -9,8 +9,8 @@ use crate::ports::EventSinkStream;
type SerializationError = ciborium::ser::Error<std::io::Error>; type SerializationError = ciborium::ser::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through /// A registry that holds all sinks meant to be accessed through remote
/// remote procedure calls. /// procedure calls.
#[derive(Default)] #[derive(Default)]
pub(crate) struct EventSinkRegistry(HashMap<String, Box<dyn EventSinkStreamAny>>); pub(crate) struct EventSinkRegistry(HashMap<String, Box<dyn EventSinkStreamAny>>);

View File

@ -1,6 +1,7 @@
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use ciborium; use ciborium;
@ -31,7 +32,7 @@ impl EventSourceRegistry {
{ {
match self.0.entry(name.into()) { match self.0.entry(name.into()) {
Entry::Vacant(s) => { Entry::Vacant(s) => {
s.insert(Box::new(source)); s.insert(Box::new(Arc::new(source)));
Ok(()) Ok(())
} }
@ -41,8 +42,8 @@ impl EventSourceRegistry {
/// Returns a mutable reference to the specified event source if it is in /// Returns a mutable reference to the specified event source if it is in
/// the registry. /// the registry.
pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> { pub(crate) fn get(&self, name: &str) -> Option<&dyn EventSourceAny> {
self.0.get_mut(name).map(|s| s.as_mut()) self.0.get(name).map(|s| s.as_ref())
} }
} }
@ -53,19 +54,19 @@ impl fmt::Debug for EventSourceRegistry {
} }
/// A type-erased `EventSource` that operates on CBOR-encoded serialized events. /// A type-erased `EventSource` that operates on CBOR-encoded serialized events.
pub(crate) trait EventSourceAny: Send + 'static { pub(crate) trait EventSourceAny: Send + Sync + 'static {
/// Returns an action which, when processed, broadcasts an event to all /// Returns an action which, when processed, broadcasts an event to all
/// connected input ports. /// connected input ports.
/// ///
/// The argument is expected to conform to the serde CBOR encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>; fn event(&self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>;
/// Returns a cancellable action and a cancellation key; when processed, the /// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports. /// action broadcasts an event to all connected input ports.
/// ///
/// The argument is expected to conform to the serde CBOR encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn keyed_event( fn keyed_event(
&mut self, &self,
serialized_arg: &[u8], serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>; ) -> Result<(Action, ActionKey), DeserializationError>;
@ -74,7 +75,7 @@ pub(crate) trait EventSourceAny: Send + 'static {
/// ///
/// The argument is expected to conform to the serde CBOR encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn periodic_event( fn periodic_event(
&mut self, &self,
period: Duration, period: Duration,
serialized_arg: &[u8], serialized_arg: &[u8],
) -> Result<Action, DeserializationError>; ) -> Result<Action, DeserializationError>;
@ -85,7 +86,7 @@ pub(crate) trait EventSourceAny: Send + 'static {
/// ///
/// The argument is expected to conform to the serde CBOR encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn keyed_periodic_event( fn keyed_periodic_event(
&mut self, &self,
period: Duration, period: Duration,
serialized_arg: &[u8], serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>; ) -> Result<(Action, ActionKey), DeserializationError>;
@ -95,28 +96,29 @@ pub(crate) trait EventSourceAny: Send + 'static {
fn event_type_name(&self) -> &'static str; fn event_type_name(&self) -> &'static str;
} }
impl<T> EventSourceAny for EventSource<T> impl<T> EventSourceAny for Arc<EventSource<T>>
where where
T: DeserializeOwned + Clone + Send + 'static, T: DeserializeOwned + Clone + Send + 'static,
{ {
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> { fn event(&self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.event(arg)) ciborium::from_reader(serialized_arg).map(|arg| EventSource::event(self, arg))
} }
fn keyed_event( fn keyed_event(
&mut self, &self,
serialized_arg: &[u8], serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> { ) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg)) ciborium::from_reader(serialized_arg).map(|arg| EventSource::keyed_event(self, arg))
} }
fn periodic_event( fn periodic_event(
&mut self, &self,
period: Duration, period: Duration,
serialized_arg: &[u8], serialized_arg: &[u8],
) -> Result<Action, DeserializationError> { ) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg)) ciborium::from_reader(serialized_arg)
.map(|arg| EventSource::periodic_event(self, period, arg))
} }
fn keyed_periodic_event( fn keyed_periodic_event(
&mut self, &self,
period: Duration, period: Duration,
serialized_arg: &[u8], serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> { ) -> Result<(Action, ActionKey), DeserializationError> {

View File

@ -43,8 +43,8 @@ impl QuerySourceRegistry {
/// Returns a mutable reference to the specified query source if it is in /// Returns a mutable reference to the specified query source if it is in
/// the registry. /// the registry.
pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> { pub(crate) fn get(&self, name: &str) -> Option<&dyn QuerySourceAny> {
self.0.get_mut(name).map(|s| s.as_mut()) self.0.get(name).map(|s| s.as_ref())
} }
} }
@ -56,14 +56,14 @@ impl fmt::Debug for QuerySourceRegistry {
/// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries /// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries
/// and returns CBOR-encoded replies. /// and returns CBOR-encoded replies.
pub(crate) trait QuerySourceAny: Send + 'static { pub(crate) trait QuerySourceAny: Send + Sync + 'static {
/// Returns an action which, when processed, broadcasts a query to all /// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports. /// connected replier ports.
/// ///
/// ///
/// The argument is expected to conform to the serde CBOR encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn query( fn query(
&mut self, &self,
arg: &[u8], arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError>; ) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError>;
@ -82,7 +82,7 @@ where
R: Serialize + Send + 'static, R: Serialize + Send + 'static,
{ {
fn query( fn query(
&mut self, &self,
arg: &[u8], arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError> { ) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError> {
ciborium::from_reader(arg).map(|arg| { ciborium::from_reader(arg).map(|arg| {

View File

@ -33,6 +33,9 @@ impl Scheduler {
/// Returns the current simulation time. /// Returns the current simulation time.
/// ///
/// Beware that, if the scheduler runs in a separate thread as the
/// simulation, the time may change concurrently.
///
/// # Examples /// # Examples
/// ///
/// ``` /// ```

View File

@ -16,6 +16,18 @@ pub trait Clock: Send {
fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus; fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus;
} }
impl<C: Clock + ?Sized> Clock for &mut C {
fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus {
(**self).synchronize(deadline)
}
}
impl<C: Clock + ?Sized> Clock for Box<C> {
fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus {
(**self).synchronize(deadline)
}
}
/// The current synchronization status of a clock. /// The current synchronization status of a clock.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum SyncStatus { pub enum SyncStatus {