diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index a91ff21..9533780 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -58,7 +58,7 @@ rmp-serde = { version = "1.1", optional = true } serde = { version = "1", optional = true } # gRPC dependencies. -tokio = { version = "1.0", optional = true } +tokio = { version = "1.0", features=["net"], optional = true } tonic = { version = "0.11", optional = true } diff --git a/asynchronix/src/ports/sink.rs b/asynchronix/src/ports/sink.rs index 639ed4c..d22d008 100644 --- a/asynchronix/src/ports/sink.rs +++ b/asynchronix/src/ports/sink.rs @@ -33,18 +33,21 @@ pub trait EventSinkStream: Iterator { /// Events that were previously in the stream remain available. fn close(&mut self); - /// This is a stop-gap method that shadows `Iterator::try_fold` until the - /// latter can be implemented by user-defined types on stable Rust. + /// This is a stop-gap method that serves the exact same purpose as + /// `Iterator::try_fold` but is specialized for `Result` rather than the + /// `Try` trait so it can be implemented on stable Rust. /// - /// It serves the exact same purpose as `Iterator::try_fold` but is - /// specialized for `Result` to avoid depending on the unstable `Try` trait. + /// It makes it possible to provide a faster implementation when the event + /// sink stream can be iterated over more rapidly than by repeatably calling + /// `Iterator::next`, for instance if the implementation of the stream + /// relies on a mutex that must be locked on each call. /// - /// Implementors may elect to override the default implementation when the - /// event sink stream can be iterated over more rapidly than by repeatably - /// calling `Iterator::next`, for instance if the implementation of the - /// stream relies on a mutex that must be locked on each call. + /// It is not publicly implementable because it may be removed at any time + /// once the `Try` trait is stabilized, without regard for backward + /// compatibility. #[doc(hidden)] - fn try_fold(&mut self, init: B, f: F) -> Result + #[allow(private_interfaces)] + fn __try_fold(&mut self, init: B, f: F) -> Result where Self: Sized, F: FnMut(B, Self::Item) -> Result, diff --git a/asynchronix/src/ports/sink/event_buffer.rs b/asynchronix/src/ports/sink/event_buffer.rs index 15b89b0..35b25ee 100644 --- a/asynchronix/src/ports/sink/event_buffer.rs +++ b/asynchronix/src/ports/sink/event_buffer.rs @@ -86,7 +86,9 @@ impl EventSinkStream for EventBuffer { self.inner.is_open.store(false, Ordering::Relaxed); } - fn try_fold(&mut self, init: B, f: F) -> Result + #[doc(hidden)] + #[allow(private_interfaces)] + fn __try_fold(&mut self, init: B, f: F) -> Result where Self: Sized, F: FnMut(B, Self::Item) -> Result, diff --git a/asynchronix/src/rpc/api/simulation.proto b/asynchronix/src/rpc/api/simulation.proto index b8982a0..b12d593 100644 --- a/asynchronix/src/rpc/api/simulation.proto +++ b/asynchronix/src/rpc/api/simulation.proto @@ -31,7 +31,7 @@ message EventKey { uint64 subkey2 = 2; } -message InitRequest { optional google.protobuf.Timestamp time = 1; } +message InitRequest { google.protobuf.Timestamp time = 1; } message InitReply { oneof result { // Always returns exactly 1 variant. google.protobuf.Empty empty = 1; @@ -75,8 +75,8 @@ message ScheduleEventRequest { } string source_name = 3; bytes event = 4; - optional google.protobuf.Duration period = 5; - optional bool with_key = 6; + google.protobuf.Duration period = 5; + bool with_key = 6; } message ScheduleEventReply { oneof result { // Always returns exactly 1 variant. diff --git a/asynchronix/src/rpc/codegen.rs b/asynchronix/src/rpc/codegen.rs index 0dfd7c8..3221cbc 100644 --- a/asynchronix/src/rpc/codegen.rs +++ b/asynchronix/src/rpc/codegen.rs @@ -1,5 +1,7 @@ #![allow(unreachable_pub)] #![allow(clippy::enum_variant_names)] +#[rustfmt::skip] pub(crate) mod custom_transport; +#[rustfmt::skip] pub(crate) mod simulation; diff --git a/asynchronix/src/rpc/codegen/custom_transport.rs b/asynchronix/src/rpc/codegen/custom_transport.rs index 43a91bd..61eac9d 100644 --- a/asynchronix/src/rpc/codegen/custom_transport.rs +++ b/asynchronix/src/rpc/codegen/custom_transport.rs @@ -11,10 +11,7 @@ pub struct ServerError { #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnyRequest { /// Expects exactly 1 variant. - #[prost( - oneof = "any_request::Request", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11" - )] + #[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")] pub request: ::core::option::Option, } /// Nested message and enum types in `AnyRequest`. @@ -51,10 +48,7 @@ pub mod any_request { #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnyReply { /// Contains exactly 1 variant. - #[prost( - oneof = "any_reply::Reply", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100" - )] + #[prost(oneof = "any_reply::Reply", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100")] pub reply: ::core::option::Option, } /// Nested message and enum types in `AnyReply`. diff --git a/asynchronix/src/rpc/codegen/simulation.rs b/asynchronix/src/rpc/codegen/simulation.rs index abe0073..aefb660 100644 --- a/asynchronix/src/rpc/codegen/simulation.rs +++ b/asynchronix/src/rpc/codegen/simulation.rs @@ -131,8 +131,8 @@ pub struct ScheduleEventRequest { pub event: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "5")] pub period: ::core::option::Option<::prost_types::Duration>, - #[prost(bool, optional, tag = "6")] - pub with_key: ::core::option::Option, + #[prost(bool, tag = "6")] + pub with_key: bool, /// Expects exactly 1 variant. #[prost(oneof = "schedule_event_request::Deadline", tags = "1, 2")] pub deadline: ::core::option::Option, @@ -411,19 +411,31 @@ pub mod simulation_server { async fn schedule_event( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn cancel_event( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn process_event( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn process_query( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_events( &self, request: tonic::Request, @@ -460,7 +472,10 @@ pub mod simulation_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -516,15 +531,21 @@ pub mod simulation_server { "/simulation.Simulation/Init" => { #[allow(non_camel_case_types)] struct InitSvc(pub Arc); - impl tonic::server::UnaryService for InitSvc { + impl tonic::server::UnaryService + for InitSvc { type Response = super::InitReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::init(&inner, request).await }; + let fut = async move { + ::init(&inner, request).await + }; Box::pin(fut) } } @@ -554,15 +575,21 @@ pub mod simulation_server { "/simulation.Simulation/Time" => { #[allow(non_camel_case_types)] struct TimeSvc(pub Arc); - impl tonic::server::UnaryService for TimeSvc { + impl tonic::server::UnaryService + for TimeSvc { type Response = super::TimeReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::time(&inner, request).await }; + let fut = async move { + ::time(&inner, request).await + }; Box::pin(fut) } } @@ -592,15 +619,21 @@ pub mod simulation_server { "/simulation.Simulation/Step" => { #[allow(non_camel_case_types)] struct StepSvc(pub Arc); - impl tonic::server::UnaryService for StepSvc { + impl tonic::server::UnaryService + for StepSvc { type Response = super::StepReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::step(&inner, request).await }; + let fut = async move { + ::step(&inner, request).await + }; Box::pin(fut) } } @@ -630,16 +663,23 @@ pub mod simulation_server { "/simulation.Simulation/StepUntil" => { #[allow(non_camel_case_types)] struct StepUntilSvc(pub Arc); - impl tonic::server::UnaryService for StepUntilSvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for StepUntilSvc { type Response = super::StepUntilReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::step_until(&inner, request).await }; + let fut = async move { + ::step_until(&inner, request).await + }; Box::pin(fut) } } @@ -669,11 +709,15 @@ pub mod simulation_server { "/simulation.Simulation/ScheduleEvent" => { #[allow(non_camel_case_types)] struct ScheduleEventSvc(pub Arc); - impl tonic::server::UnaryService - for ScheduleEventSvc - { + impl< + T: Simulation, + > tonic::server::UnaryService + for ScheduleEventSvc { type Response = super::ScheduleEventReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -711,9 +755,15 @@ pub mod simulation_server { "/simulation.Simulation/CancelEvent" => { #[allow(non_camel_case_types)] struct CancelEventSvc(pub Arc); - impl tonic::server::UnaryService for CancelEventSvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for CancelEventSvc { type Response = super::CancelEventReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -751,9 +801,15 @@ pub mod simulation_server { "/simulation.Simulation/ProcessEvent" => { #[allow(non_camel_case_types)] struct ProcessEventSvc(pub Arc); - impl tonic::server::UnaryService for ProcessEventSvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for ProcessEventSvc { type Response = super::ProcessEventReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -791,9 +847,15 @@ pub mod simulation_server { "/simulation.Simulation/ProcessQuery" => { #[allow(non_camel_case_types)] struct ProcessQuerySvc(pub Arc); - impl tonic::server::UnaryService for ProcessQuerySvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for ProcessQuerySvc { type Response = super::ProcessQueryReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -831,9 +893,15 @@ pub mod simulation_server { "/simulation.Simulation/ReadEvents" => { #[allow(non_camel_case_types)] struct ReadEventsSvc(pub Arc); - impl tonic::server::UnaryService for ReadEventsSvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for ReadEventsSvc { type Response = super::ReadEventsReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -871,16 +939,23 @@ pub mod simulation_server { "/simulation.Simulation/OpenSink" => { #[allow(non_camel_case_types)] struct OpenSinkSvc(pub Arc); - impl tonic::server::UnaryService for OpenSinkSvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for OpenSinkSvc { type Response = super::OpenSinkReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::open_sink(&inner, request).await }; + let fut = async move { + ::open_sink(&inner, request).await + }; Box::pin(fut) } } @@ -910,16 +985,23 @@ pub mod simulation_server { "/simulation.Simulation/CloseSink" => { #[allow(non_camel_case_types)] struct CloseSinkSvc(pub Arc); - impl tonic::server::UnaryService for CloseSinkSvc { + impl< + T: Simulation, + > tonic::server::UnaryService + for CloseSinkSvc { type Response = super::CloseSinkReply; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::close_sink(&inner, request).await }; + let fut = async move { + ::close_sink(&inner, request).await + }; Box::pin(fut) } } @@ -946,14 +1028,18 @@ pub mod simulation_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/asynchronix/src/rpc/endpoint_registry.rs b/asynchronix/src/rpc/endpoint_registry.rs index 9ded8e5..67b5821 100644 --- a/asynchronix/src/rpc/endpoint_registry.rs +++ b/asynchronix/src/rpc/endpoint_registry.rs @@ -54,10 +54,10 @@ impl EndpointRegistry { self.event_sources.get_mut(name).map(|s| s.as_mut()) } - /// Adds an query source to the registry. + /// Adds a query source to the registry. /// - /// If the specified name is already in use for another query source, the source - /// provided as argument is returned in the error. + /// If the specified name is already in use for another query source, the + /// source provided as argument is returned in the error. pub fn add_query_source( &mut self, source: QuerySource, @@ -87,7 +87,7 @@ impl EndpointRegistry { /// /// If the specified name is already in use for another sink, the sink /// provided as argument is returned in the error. - pub fn add_sink(&mut self, sink: S, name: impl Into) -> Result<(), S> + pub fn add_event_sink(&mut self, sink: S, name: impl Into) -> Result<(), S> where S: EventSinkStream + Send + 'static, S::Item: Serialize, @@ -104,7 +104,7 @@ impl EndpointRegistry { /// Returns a mutable reference to the specified sink if it is in the /// registry. - pub(crate) fn get_sink_mut(&mut self, name: &str) -> Option<&mut dyn EventSinkStreamAny> { + pub(crate) fn get_event_sink_mut(&mut self, name: &str) -> Option<&mut dyn EventSinkStreamAny> { self.sinks.get_mut(name).map(|s| s.as_mut()) } } @@ -272,7 +272,7 @@ where } fn collect(&mut self) -> Result>, RmpEncodeError> { - EventSinkStream::try_fold(self, Vec::new(), |mut encoded_events, event| { + self.__try_fold(Vec::new(), |mut encoded_events, event| { rmp_serde::to_vec_named(&event).map(|encoded_event| { encoded_events.push(encoded_event); diff --git a/asynchronix/src/rpc/generic_server.rs b/asynchronix/src/rpc/generic_server.rs index edeb06d..6f54a93 100644 --- a/asynchronix/src/rpc/generic_server.rs +++ b/asynchronix/src/rpc/generic_server.rs @@ -248,7 +248,7 @@ where let reply = move || -> Result, (ErrorCode, String)> { let source_name = &request.source_name; let msgpack_event = &request.event; - let with_key = request.with_key.unwrap_or_default(); + let with_key = request.with_key; let period = request .period .map(|period| { @@ -508,7 +508,7 @@ where "the simulation was not started".to_string(), ))?; - let sink = registry.get_sink_mut(sink_name).ok_or(( + let sink = registry.get_event_sink_mut(sink_name).ok_or(( ErrorCode::SinkNotFound, "no sink is registered with the name '{}'".to_string(), ))?; @@ -549,7 +549,7 @@ where "the simulation was not started".to_string(), ))?; - let sink = registry.get_sink_mut(sink_name).ok_or(( + let sink = registry.get_event_sink_mut(sink_name).ok_or(( ErrorCode::SinkNotFound, "no sink is registered with the name '{}'".to_string(), ))?; @@ -582,7 +582,7 @@ where "the simulation was not started".to_string(), ))?; - let sink = registry.get_sink_mut(sink_name).ok_or(( + let sink = registry.get_event_sink_mut(sink_name).ok_or(( ErrorCode::SinkNotFound, "no sink is registered with the name '{}'".to_string(), ))?; diff --git a/asynchronix/src/rpc/grpc.rs b/asynchronix/src/rpc/grpc.rs index 1cb2960..94809e9 100644 --- a/asynchronix/src/rpc/grpc.rs +++ b/asynchronix/src/rpc/grpc.rs @@ -23,7 +23,9 @@ where F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static, { // Use a single-threaded server. - let rt = tokio::runtime::Builder::new_current_thread().build()?; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .build()?; let sim_manager = GrpcServer::new(sim_gen);