1
0
forked from ROMEO/nexosim

OVarious gRPC improvements

This patch in particular allows asynchronous gRPC access to the scheduler
and to the monitoring functions.
This commit is contained in:
Serge Barral 2025-01-09 17:04:44 +01:00
parent 43407741eb
commit 7a95a4d0f4
14 changed files with 377 additions and 281 deletions

View File

@ -20,11 +20,27 @@ description = """
A high performance asychronous compute framework for system simulation.
"""
categories = ["simulation", "aerospace", "science"]
keywords = ["simulation", "discrete-event", "systems", "cyberphysical", "real-time"]
keywords = [
"simulation",
"discrete-event",
"systems",
"cyberphysical",
"real-time",
]
[features]
# gRPC service.
grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"]
grpc = [
"dep:bytes",
"dep:ciborium",
"dep:prost",
"dep:prost-types",
"dep:serde",
"dep:tonic",
"dep:tokio",
"dep:tonic",
"tai-time/serde",
]
tracing = ["dep:tracing", "dep:tracing-subscriber"]
# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking.
@ -54,15 +70,24 @@ ciborium = { version = "0.2.2", optional = true }
prost = { version = "0.13", optional = true }
prost-types = { version = "0.13", optional = true }
serde = { version = "1", optional = true }
tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true }
tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true }
tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true }
tracing-subscriber = { version= "0.3.18", optional = true }
tokio = { version = "1.0", features = [
"net",
"rt-multi-thread",
], optional = true }
tonic = { version = "0.12", default-features = false, features = [
"codegen",
"prost",
"server",
], optional = true }
tracing = { version = "0.1.40", default-features = false, features = [
"std",
], optional = true }
tracing-subscriber = { version = "0.3.18", optional = true }
[dev-dependencies]
futures-util = "0.3"
futures-executor = "0.3"
tracing-subscriber = { version= "0.3.18", features=["env-filter"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
[target.'cfg(nexosim_loom)'.dev-dependencies]
loom = "0.7"
@ -74,7 +99,10 @@ tonic-build = { version = "0.12" }
[lints.rust]
# `nexosim_loom` flag: run loom-based tests.
# `nexosim_grpc_codegen` flag: regenerate gRPC code from .proto definitions.
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(nexosim_loom)', 'cfg(nexosim_grpc_codegen)'] }
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(nexosim_loom)',
'cfg(nexosim_grpc_codegen)',
] }
[package.metadata.docs.rs]
all-features = true

View File

@ -9,24 +9,25 @@ import "google/protobuf/empty.proto";
enum ErrorCode {
INTERNAL_ERROR = 0;
SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3;
SIMULATION_MESSAGE_LOSS = 4;
SIMULATION_NO_RECIPIENT = 5;
SIMULATION_PANIC = 6;
SIMULATION_TIMEOUT = 7;
SIMULATION_OUT_OF_SYNC = 8;
SIMULATION_BAD_QUERY = 9;
SIMULATION_TIME_OUT_OF_RANGE = 10;
MISSING_ARGUMENT = 20;
INVALID_TIME = 30;
INVALID_PERIOD = 31;
INVALID_DEADLINE = 32;
INVALID_MESSAGE = 33;
INVALID_KEY = 34;
SOURCE_NOT_FOUND = 40;
SINK_NOT_FOUND = 41;
MISSING_ARGUMENT = 1;
INVALID_TIME = 2;
INVALID_PERIOD = 3;
INVALID_DEADLINE = 4;
INVALID_MESSAGE = 5;
INVALID_KEY = 6;
INITIALIZER_PANIC = 10;
SIMULATION_NOT_STARTED = 11;
SIMULATION_TERMINATED = 12;
SIMULATION_DEADLOCK = 13;
SIMULATION_MESSAGE_LOSS = 14;
SIMULATION_NO_RECIPIENT = 15;
SIMULATION_PANIC = 16;
SIMULATION_TIMEOUT = 17;
SIMULATION_OUT_OF_SYNC = 18;
SIMULATION_BAD_QUERY = 19;
SIMULATION_TIME_OUT_OF_RANGE = 20;
SOURCE_NOT_FOUND = 30;
SINK_NOT_FOUND = 31;
}
message Error {
@ -39,9 +40,7 @@ message EventKey {
uint64 subkey2 = 2;
}
message InitRequest {
bytes cfg = 2;
}
message InitRequest { bytes cfg = 2; }
message InitReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;

View File

@ -335,24 +335,25 @@ pub mod any_request {
#[repr(i32)]
pub enum ErrorCode {
InternalError = 0,
SimulationNotStarted = 1,
SimulationTerminated = 2,
SimulationDeadlock = 3,
SimulationMessageLoss = 4,
SimulationNoRecipient = 5,
SimulationPanic = 6,
SimulationTimeout = 7,
SimulationOutOfSync = 8,
SimulationBadQuery = 9,
SimulationTimeOutOfRange = 10,
MissingArgument = 20,
InvalidTime = 30,
InvalidPeriod = 31,
InvalidDeadline = 32,
InvalidMessage = 33,
InvalidKey = 34,
SourceNotFound = 40,
SinkNotFound = 41,
MissingArgument = 1,
InvalidTime = 2,
InvalidPeriod = 3,
InvalidDeadline = 4,
InvalidMessage = 5,
InvalidKey = 6,
InitializerPanic = 10,
SimulationNotStarted = 11,
SimulationTerminated = 12,
SimulationDeadlock = 13,
SimulationMessageLoss = 14,
SimulationNoRecipient = 15,
SimulationPanic = 16,
SimulationTimeout = 17,
SimulationOutOfSync = 18,
SimulationBadQuery = 19,
SimulationTimeOutOfRange = 20,
SourceNotFound = 30,
SinkNotFound = 31,
}
impl ErrorCode {
/// String value of the enum field names used in the ProtoBuf definition.
@ -362,6 +363,13 @@ impl ErrorCode {
pub fn as_str_name(&self) -> &'static str {
match self {
Self::InternalError => "INTERNAL_ERROR",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::InitializerPanic => "INITIALIZER_PANIC",
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
@ -372,12 +380,6 @@ impl ErrorCode {
Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
Self::SimulationBadQuery => "SIMULATION_BAD_QUERY",
Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::SourceNotFound => "SOURCE_NOT_FOUND",
Self::SinkNotFound => "SINK_NOT_FOUND",
}
@ -386,6 +388,13 @@ impl ErrorCode {
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"INTERNAL_ERROR" => Some(Self::InternalError),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"INITIALIZER_PANIC" => Some(Self::InitializerPanic),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
@ -396,12 +405,6 @@ impl ErrorCode {
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
"SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery),
"SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange),
"MISSING_ARGUMENT" => Some(Self::MissingArgument),
"INVALID_TIME" => Some(Self::InvalidTime),
"INVALID_PERIOD" => Some(Self::InvalidPeriod),
"INVALID_DEADLINE" => Some(Self::InvalidDeadline),
"INVALID_MESSAGE" => Some(Self::InvalidMessage),
"INVALID_KEY" => Some(Self::InvalidKey),
"SOURCE_NOT_FOUND" => Some(Self::SourceNotFound),
"SINK_NOT_FOUND" => Some(Self::SinkNotFound),
_ => None,

View File

@ -13,8 +13,8 @@ pub(crate) struct KeyRegistry {
impl KeyRegistry {
/// Inserts an `ActionKey` into the registry.
///
/// The provided expiration deadline is the latest time at which the key may
/// still be active.
/// The provided expiration deadline is the latest time at which the key is
/// guaranteed to be extractable.
pub(crate) fn insert_key(
&mut self,
action_key: ActionKey,

View File

@ -1,6 +1,7 @@
//! gRPC simulation service.
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
@ -13,7 +14,7 @@ use crate::simulation::{Simulation, SimulationError};
use super::codegen::simulation::*;
use super::key_registry::KeyRegistry;
use super::services::InitService;
use super::services::{ControllerService, MonitorService};
use super::services::{ControllerService, MonitorService, SchedulerService};
/// Runs a gRPC simulation server.
///
@ -37,7 +38,8 @@ fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// Use 2 threads so that the controller and monitor services can be used
// Use 2 threads so that the even if the controller service is blocked due
// to ongoing simulation execution, other services can still be used
// concurrently.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
@ -58,6 +60,7 @@ struct GrpcSimulationService {
init_service: Mutex<InitService>,
controller_service: Mutex<ControllerService>,
monitor_service: Mutex<MonitorService>,
scheduler_service: Mutex<SchedulerService>,
}
impl GrpcSimulationService {
@ -76,6 +79,7 @@ impl GrpcSimulationService {
init_service: Mutex::new(InitService::new(sim_gen)),
controller_service: Mutex::new(ControllerService::NotStarted),
monitor_service: Mutex::new(MonitorService::NotStarted),
scheduler_service: Mutex::new(SchedulerService::NotStarted),
}
}
@ -93,6 +97,11 @@ impl GrpcSimulationService {
fn monitor(&self) -> MutexGuard<'_, MonitorService> {
self.monitor_service.lock().unwrap()
}
/// Locks the scheduler and returns the mutex guard.
fn scheduler(&self) -> MutexGuard<'_, SchedulerService> {
self.scheduler_service.lock().unwrap()
}
}
#[tonic::async_trait]
@ -103,15 +112,22 @@ impl simulation_server::Simulation for GrpcSimulationService {
let (reply, bench) = self.initializer().init(request);
if let Some((simulation, scheduler, endpoint_registry)) = bench {
let event_source_registry = Arc::new(endpoint_registry.event_source_registry);
let query_source_registry = endpoint_registry.query_source_registry;
let event_sink_registry = endpoint_registry.event_sink_registry;
*self.controller() = ControllerService::Started {
simulation,
scheduler,
event_source_registry: endpoint_registry.event_source_registry,
query_source_registry: endpoint_registry.query_source_registry,
key_registry: KeyRegistry::default(),
event_source_registry: event_source_registry.clone(),
query_source_registry,
};
*self.monitor() = MonitorService::Started {
event_sink_registry: endpoint_registry.event_sink_registry,
event_sink_registry,
};
*self.scheduler() = SchedulerService::Started {
scheduler,
event_source_registry,
key_registry: KeyRegistry::default(),
};
}
@ -120,7 +136,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
async fn time(&self, request: Request<TimeRequest>) -> Result<Response<TimeReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().time(request)))
Ok(Response::new(self.scheduler().time(request)))
}
async fn step(&self, request: Request<StepRequest>) -> Result<Response<StepReply>, Status> {
let request = request.into_inner();
@ -141,7 +157,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
) -> Result<Response<ScheduleEventReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().schedule_event(request)))
Ok(Response::new(self.scheduler().schedule_event(request)))
}
async fn cancel_event(
&self,
@ -149,7 +165,7 @@ impl simulation_server::Simulation for GrpcSimulationService {
) -> Result<Response<CancelEventReply>, Status> {
let request = request.into_inner();
Ok(Response::new(self.controller().cancel_event(request)))
Ok(Response::new(self.scheduler().cancel_event(request)))
}
async fn process_event(
&self,

View File

@ -1,6 +1,7 @@
mod controller_service;
mod init_service;
mod monitor_service;
mod scheduler_service;
use std::time::Duration;
@ -13,6 +14,7 @@ use crate::simulation::{ExecutionError, SchedulingError, SimulationError};
pub(crate) use controller_service::ControllerService;
pub(crate) use init_service::InitService;
pub(crate) use monitor_service::MonitorService;
pub(crate) use scheduler_service::SchedulerService;
/// Transforms an error code and a message into a Protobuf error.
fn to_error(code: ErrorCode, message: impl Into<String>) -> Error {

View File

@ -1,59 +1,32 @@
use std::fmt;
use std::sync::Arc;
use prost_types::Timestamp;
use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::{EventSourceRegistry, QuerySourceRegistry};
use crate::simulation::{Scheduler, Simulation};
use crate::simulation::Simulation;
use super::super::codegen::simulation::*;
use super::{
map_execution_error, map_scheduling_error, monotonic_to_timestamp,
simulation_not_started_error, timestamp_to_monotonic, to_error, to_positive_duration,
to_strictly_positive_duration,
map_execution_error, monotonic_to_timestamp, simulation_not_started_error,
timestamp_to_monotonic, to_error, to_positive_duration,
};
/// Protobuf-based simulation manager.
/// Protobuf-based simulation controller.
///
/// A `ControllerService` enables the management of the lifecycle of a
/// simulation.
///
/// Its methods map the various RPC simulation control service methods defined
/// in `simulation.proto`.
/// A `ControllerService` controls the execution of the simulation. Note that
/// all its methods block until execution completes.
#[allow(clippy::large_enum_variant)]
pub(crate) enum ControllerService {
NotStarted,
Started {
simulation: Simulation,
scheduler: Scheduler,
event_source_registry: EventSourceRegistry,
event_source_registry: Arc<EventSourceRegistry>,
query_source_registry: QuerySourceRegistry,
key_registry: KeyRegistry,
},
}
impl ControllerService {
/// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply {
let reply = match self {
Self::Started { simulation, .. } => {
if let Some(timestamp) = monotonic_to_timestamp(simulation.time()) {
time_reply::Result::Time(timestamp)
} else {
time_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()),
};
TimeReply {
result: Some(reply),
}
}
/// Advances simulation time to that of the next scheduled event, processing
/// that event as well as all other events scheduled for the same time.
///
@ -144,155 +117,6 @@ impl ControllerService {
}
}
/// Schedules an event at a future time.
pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply {
let reply = match self {
Self::Started {
simulation,
scheduler,
event_source_registry,
key_registry,
..
} => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name;
let event = &request.event;
let with_key = request.with_key;
let period = request
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or(to_error(
ErrorCode::InvalidPeriod,
"the specified event period is not strictly positive",
))
})
.transpose()?;
let deadline = request.deadline.ok_or(to_error(
ErrorCode::MissingArgument,
"missing deadline argument",
))?;
let deadline = match deadline {
schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time)
.ok_or(to_error(
ErrorCode::InvalidTime,
"out-of-range nanosecond field",
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDeadline,
"the specified scheduling deadline is not in the future",
))?;
simulation.time() + duration
}
};
let source = event_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no event source is registered with the name '{}'".to_string(),
))?;
let (action, action_key) = match (with_key, period) {
(false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
let key_id = action_key.map(|action_key| {
// Free stale keys from the registry.
key_registry.remove_expired_keys(simulation.time());
if period.is_some() {
key_registry.insert_eternal_key(action_key)
} else {
key_registry.insert_key(action_key, deadline)
}
});
scheduler
.schedule(deadline, action)
.map_err(map_scheduling_error)?;
Ok(key_id)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
ScheduleEventReply {
result: Some(match reply {
Ok(Some(key_id)) => {
let (subkey1, subkey2) = key_id.into_raw_parts();
schedule_event_reply::Result::Key(EventKey {
subkey1: subkey1
.try_into()
.expect("action key index is too large to be serialized"),
subkey2,
})
}
Ok(None) => schedule_event_reply::Result::Empty(()),
Err(error) => schedule_event_reply::Result::Error(error),
}),
}
}
/// Cancels a keyed event.
pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply {
let reply = match self {
Self::Started {
simulation,
key_registry,
..
} => move || -> Result<(), Error> {
let key = request
.key
.ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?;
let subkey1: usize = key
.subkey1
.try_into()
.map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?;
let subkey2 = key.subkey2;
let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2);
key_registry.remove_expired_keys(simulation.time());
let key = key_registry.extract_key(key_id).ok_or(to_error(
ErrorCode::InvalidKey,
"invalid or expired event key",
))?;
key.cancel();
Ok(())
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
CancelEventReply {
result: Some(match reply {
Ok(()) => cancel_event_reply::Result::Empty(()),
Err(error) => cancel_event_reply::Result::Error(error),
}),
}
}
/// Broadcasts an event from an event source immediately, blocking until
/// completion.
///
@ -307,7 +131,7 @@ impl ControllerService {
let source_name = &request.source_name;
let event = &request.event;
let source = event_source_registry.get_mut(source_name).ok_or(to_error(
let source = event_source_registry.get(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;
@ -350,7 +174,7 @@ impl ControllerService {
let source_name = &request.source_name;
let request = &request.request;
let source = query_source_registry.get_mut(source_name).ok_or(to_error(
let source = query_source_registry.get(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;

View File

@ -1,3 +1,5 @@
use std::panic::{self, AssertUnwindSafe};
use ciborium;
use serde::de::DeserializeOwned;
@ -16,8 +18,6 @@ type SimGen = Box<dyn FnMut(&[u8]) -> Result<InitResult, DeserializationError> +
///
/// An `InitService` creates a new simulation bench based on a serialized
/// initialization configuration.
///
/// It maps the `Init` method defined in `simulation.proto`.
pub(crate) struct InitService {
sim_gen: SimGen,
}
@ -51,17 +51,39 @@ impl InitService {
&mut self,
request: InitRequest,
) -> (InitReply, Option<(Simulation, Scheduler, EndpointRegistry)>) {
let reply = (self.sim_gen)(&request.cfg)
.map_err(|e| {
let reply = panic::catch_unwind(AssertUnwindSafe(|| (self.sim_gen)(&request.cfg)))
.map_err(|payload| {
let panic_msg: Option<&str> = if let Some(s) = payload.downcast_ref::<&str>() {
Some(s)
} else if let Some(s) = payload.downcast_ref::<String>() {
Some(s)
} else {
None
};
let error_msg = if let Some(panic_msg) = panic_msg {
format!(
"the simulation initializer has panicked with the message `{}`",
panic_msg
)
} else {
String::from("the simulation initializer has panicked")
};
to_error(ErrorCode::InitializerPanic, error_msg)
})
.and_then(|res| {
res.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the initialization configuration could not be deserialized: {}",
"the initializer configuration could not be deserialized: {}",
e
),
)
})
.and_then(|init_result| init_result.map_err(map_simulation_error));
.and_then(|init_result| init_result.map_err(map_simulation_error))
});
let (reply, bench) = match reply {
Ok((simulation, registry)) => {

View File

@ -9,9 +9,6 @@ use super::{simulation_not_started_error, to_error};
///
/// A `MonitorService` enables the monitoring of the event sinks of a
/// [`Simulation`](crate::simulation::Simulation).
///
/// Its methods map the various RPC monitoring service methods defined in
/// `simulation.proto`.
pub(crate) enum MonitorService {
Started {
event_sink_registry: EventSinkRegistry,

View File

@ -0,0 +1,200 @@
use std::fmt;
use std::sync::Arc;
use crate::grpc::key_registry::{KeyRegistry, KeyRegistryId};
use crate::registry::EventSourceRegistry;
use crate::simulation::Scheduler;
use super::super::codegen::simulation::*;
use super::{
map_scheduling_error, monotonic_to_timestamp, simulation_not_started_error,
timestamp_to_monotonic, to_error, to_strictly_positive_duration,
};
/// Protobuf-based simulation scheduler.
///
/// A `SchedulerService` enables the scheduling of simulation events.
#[allow(clippy::large_enum_variant)]
pub(crate) enum SchedulerService {
NotStarted,
Started {
scheduler: Scheduler,
event_source_registry: Arc<EventSourceRegistry>,
key_registry: KeyRegistry,
},
}
impl SchedulerService {
/// Returns the current simulation time.
pub(crate) fn time(&mut self, _request: TimeRequest) -> TimeReply {
let reply = match self {
Self::Started { scheduler, .. } => {
if let Some(timestamp) = monotonic_to_timestamp(scheduler.time()) {
time_reply::Result::Time(timestamp)
} else {
time_reply::Result::Error(to_error(
ErrorCode::SimulationTimeOutOfRange,
"the final simulation time is out of range",
))
}
}
Self::NotStarted => time_reply::Result::Error(simulation_not_started_error()),
};
TimeReply {
result: Some(reply),
}
}
/// Schedules an event at a future time.
pub(crate) fn schedule_event(&mut self, request: ScheduleEventRequest) -> ScheduleEventReply {
let reply = match self {
Self::Started {
scheduler,
event_source_registry,
key_registry,
} => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name;
let event = &request.event;
let with_key = request.with_key;
let period = request
.period
.map(|period| {
to_strictly_positive_duration(period).ok_or(to_error(
ErrorCode::InvalidPeriod,
"the specified event period is not strictly positive",
))
})
.transpose()?;
let source = event_source_registry.get(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no event source is registered with the name '{}'".to_string(),
))?;
let (action, action_key) = match (with_key, period) {
(false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
let deadline = request.deadline.ok_or(to_error(
ErrorCode::MissingArgument,
"missing deadline argument",
))?;
let deadline = match deadline {
schedule_event_request::Deadline::Time(time) => timestamp_to_monotonic(time)
.ok_or(to_error(
ErrorCode::InvalidTime,
"out-of-range nanosecond field",
))?,
schedule_event_request::Deadline::Duration(duration) => {
let duration = to_strictly_positive_duration(duration).ok_or(to_error(
ErrorCode::InvalidDeadline,
"the specified scheduling deadline is not in the future",
))?;
scheduler.time() + duration
}
};
let key_id = action_key.map(|action_key| {
key_registry.remove_expired_keys(scheduler.time());
if period.is_some() {
key_registry.insert_eternal_key(action_key)
} else {
key_registry.insert_key(action_key, deadline)
}
});
scheduler
.schedule(deadline, action)
.map_err(map_scheduling_error)?;
Ok(key_id)
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
ScheduleEventReply {
result: Some(match reply {
Ok(Some(key_id)) => {
let (subkey1, subkey2) = key_id.into_raw_parts();
schedule_event_reply::Result::Key(EventKey {
subkey1: subkey1
.try_into()
.expect("action key index is too large to be serialized"),
subkey2,
})
}
Ok(None) => schedule_event_reply::Result::Empty(()),
Err(error) => schedule_event_reply::Result::Error(error),
}),
}
}
/// Cancels a keyed event.
pub(crate) fn cancel_event(&mut self, request: CancelEventRequest) -> CancelEventReply {
let reply = match self {
Self::Started {
scheduler,
key_registry,
..
} => move || -> Result<(), Error> {
let key = request
.key
.ok_or(to_error(ErrorCode::MissingArgument, "missing key argument"))?;
let subkey1: usize = key
.subkey1
.try_into()
.map_err(|_| to_error(ErrorCode::InvalidKey, "invalid event key"))?;
let subkey2 = key.subkey2;
let key_id = KeyRegistryId::from_raw_parts(subkey1, subkey2);
key_registry.remove_expired_keys(scheduler.time());
let key = key_registry.extract_key(key_id).ok_or(to_error(
ErrorCode::InvalidKey,
"invalid or expired event key",
))?;
key.cancel();
Ok(())
}(),
Self::NotStarted => Err(simulation_not_started_error()),
};
CancelEventReply {
result: Some(match reply {
Ok(()) => cancel_event_reply::Result::Empty(()),
Err(error) => cancel_event_reply::Result::Error(error),
}),
}
}
}
impl fmt::Debug for SchedulerService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SchedulerService").finish_non_exhaustive()
}
}

View File

@ -728,6 +728,8 @@ mod tests {
#[cfg(all(test, nexosim_loom))]
mod tests {
use std::sync::Mutex;
use futures_channel::mpsc;
use futures_util::StreamExt;
@ -743,14 +745,14 @@ mod tests {
struct TestEvent<R> {
// The receiver is actually used only once in tests, so it is moved out
// of the `Option` on first use.
receiver: Option<mpsc::UnboundedReceiver<Option<R>>>,
receiver: Mutex<Option<mpsc::UnboundedReceiver<Option<R>>>>,
}
impl<R: Send + 'static> Sender<(), R> for TestEvent<R> {
fn send(
&mut self,
&self,
_arg: &(),
) -> Option<Pin<Box<dyn Future<Output = Result<R, SendError>> + Send>>> {
let receiver = self.receiver.take().unwrap();
let receiver = self.receiver.lock().unwrap().take().unwrap();
Some(Box::pin(async move {
let mut stream = Box::pin(receiver.filter_map(|item| async { item }));
@ -779,7 +781,7 @@ mod tests {
(
TestEvent {
receiver: Some(receiver),
receiver: Mutex::new(Some(receiver)),
},
TestEventWaker { sender },
)

View File

@ -42,8 +42,8 @@ impl EventSourceRegistry {
/// Returns a mutable reference to the specified event source if it is in
/// the registry.
pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn EventSourceAny> {
self.0.get_mut(name).map(|s| s.as_mut())
pub(crate) fn get(&self, name: &str) -> Option<&dyn EventSourceAny> {
self.0.get(name).map(|s| s.as_ref())
}
}
@ -54,7 +54,7 @@ impl fmt::Debug for EventSourceRegistry {
}
/// A type-erased `EventSource` that operates on CBOR-encoded serialized events.
pub(crate) trait EventSourceAny: Send + 'static {
pub(crate) trait EventSourceAny: Send + Sync + 'static {
/// Returns an action which, when processed, broadcasts an event to all
/// connected input ports.
///

View File

@ -43,8 +43,8 @@ impl QuerySourceRegistry {
/// Returns a mutable reference to the specified query source if it is in
/// the registry.
pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn QuerySourceAny> {
self.0.get_mut(name).map(|s| s.as_mut())
pub(crate) fn get(&self, name: &str) -> Option<&dyn QuerySourceAny> {
self.0.get(name).map(|s| s.as_ref())
}
}
@ -56,7 +56,7 @@ impl fmt::Debug for QuerySourceRegistry {
/// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries
/// and returns CBOR-encoded replies.
pub(crate) trait QuerySourceAny: Send + 'static {
pub(crate) trait QuerySourceAny: Send + Sync + 'static {
/// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports.
///

View File

@ -33,6 +33,9 @@ impl Scheduler {
/// Returns the current simulation time.
///
/// Beware that, if the scheduler runs in a separate thread as the
/// simulation, the time may change concurrently.
///
/// # Examples
///
/// ```