1
0
forked from ROMEO/nexosim

Merge pull request #3 from asynchronics/fix/grpc-and-codegen-format

Fix/grpc and codegen format
This commit is contained in:
Serge Barral 2024-04-26 16:14:59 +02:00 committed by GitHub
commit e7c0c5f217
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 170 additions and 81 deletions

View File

@ -58,7 +58,7 @@ rmp-serde = { version = "1.1", optional = true }
serde = { version = "1", optional = true }
# gRPC dependencies.
tokio = { version = "1.0", optional = true }
tokio = { version = "1.0", features=["net"], optional = true }
tonic = { version = "0.11", optional = true }

View File

@ -33,18 +33,21 @@ pub trait EventSinkStream: Iterator {
/// Events that were previously in the stream remain available.
fn close(&mut self);
/// This is a stop-gap method that shadows `Iterator::try_fold` until the
/// latter can be implemented by user-defined types on stable Rust.
/// This is a stop-gap method that serves the exact same purpose as
/// `Iterator::try_fold` but is specialized for `Result` rather than the
/// `Try` trait so it can be implemented on stable Rust.
///
/// It serves the exact same purpose as `Iterator::try_fold` but is
/// specialized for `Result` to avoid depending on the unstable `Try` trait.
/// It makes it possible to provide a faster implementation when the event
/// sink stream can be iterated over more rapidly than by repeatably calling
/// `Iterator::next`, for instance if the implementation of the stream
/// relies on a mutex that must be locked on each call.
///
/// Implementors may elect to override the default implementation when the
/// event sink stream can be iterated over more rapidly than by repeatably
/// calling `Iterator::next`, for instance if the implementation of the
/// stream relies on a mutex that must be locked on each call.
/// It is not publicly implementable because it may be removed at any time
/// once the `Try` trait is stabilized, without regard for backward
/// compatibility.
#[doc(hidden)]
fn try_fold<B, F, E>(&mut self, init: B, f: F) -> Result<B, E>
#[allow(private_interfaces)]
fn __try_fold<B, F, E>(&mut self, init: B, f: F) -> Result<B, E>
where
Self: Sized,
F: FnMut(B, Self::Item) -> Result<B, E>,

View File

@ -86,7 +86,9 @@ impl<T: Send + 'static> EventSinkStream for EventBuffer<T> {
self.inner.is_open.store(false, Ordering::Relaxed);
}
fn try_fold<B, F, E>(&mut self, init: B, f: F) -> Result<B, E>
#[doc(hidden)]
#[allow(private_interfaces)]
fn __try_fold<B, F, E>(&mut self, init: B, f: F) -> Result<B, E>
where
Self: Sized,
F: FnMut(B, Self::Item) -> Result<B, E>,

View File

@ -31,7 +31,7 @@ message EventKey {
uint64 subkey2 = 2;
}
message InitRequest { optional google.protobuf.Timestamp time = 1; }
message InitRequest { google.protobuf.Timestamp time = 1; }
message InitReply {
oneof result { // Always returns exactly 1 variant.
google.protobuf.Empty empty = 1;
@ -75,8 +75,8 @@ message ScheduleEventRequest {
}
string source_name = 3;
bytes event = 4;
optional google.protobuf.Duration period = 5;
optional bool with_key = 6;
google.protobuf.Duration period = 5;
bool with_key = 6;
}
message ScheduleEventReply {
oneof result { // Always returns exactly 1 variant.

View File

@ -1,5 +1,7 @@
#![allow(unreachable_pub)]
#![allow(clippy::enum_variant_names)]
#[rustfmt::skip]
pub(crate) mod custom_transport;
#[rustfmt::skip]
pub(crate) mod simulation;

View File

@ -11,10 +11,7 @@ pub struct ServerError {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AnyRequest {
/// Expects exactly 1 variant.
#[prost(
oneof = "any_request::Request",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11"
)]
#[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")]
pub request: ::core::option::Option<any_request::Request>,
}
/// Nested message and enum types in `AnyRequest`.
@ -51,10 +48,7 @@ pub mod any_request {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AnyReply {
/// Contains exactly 1 variant.
#[prost(
oneof = "any_reply::Reply",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100"
)]
#[prost(oneof = "any_reply::Reply", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 100")]
pub reply: ::core::option::Option<any_reply::Reply>,
}
/// Nested message and enum types in `AnyReply`.

View File

@ -131,8 +131,8 @@ pub struct ScheduleEventRequest {
pub event: ::prost::alloc::vec::Vec<u8>,
#[prost(message, optional, tag = "5")]
pub period: ::core::option::Option<::prost_types::Duration>,
#[prost(bool, optional, tag = "6")]
pub with_key: ::core::option::Option<bool>,
#[prost(bool, tag = "6")]
pub with_key: bool,
/// Expects exactly 1 variant.
#[prost(oneof = "schedule_event_request::Deadline", tags = "1, 2")]
pub deadline: ::core::option::Option<schedule_event_request::Deadline>,
@ -411,19 +411,31 @@ pub mod simulation_server {
async fn schedule_event(
&self,
request: tonic::Request<super::ScheduleEventRequest>,
) -> std::result::Result<tonic::Response<super::ScheduleEventReply>, tonic::Status>;
) -> std::result::Result<
tonic::Response<super::ScheduleEventReply>,
tonic::Status,
>;
async fn cancel_event(
&self,
request: tonic::Request<super::CancelEventRequest>,
) -> std::result::Result<tonic::Response<super::CancelEventReply>, tonic::Status>;
) -> std::result::Result<
tonic::Response<super::CancelEventReply>,
tonic::Status,
>;
async fn process_event(
&self,
request: tonic::Request<super::ProcessEventRequest>,
) -> std::result::Result<tonic::Response<super::ProcessEventReply>, tonic::Status>;
) -> std::result::Result<
tonic::Response<super::ProcessEventReply>,
tonic::Status,
>;
async fn process_query(
&self,
request: tonic::Request<super::ProcessQueryRequest>,
) -> std::result::Result<tonic::Response<super::ProcessQueryReply>, tonic::Status>;
) -> std::result::Result<
tonic::Response<super::ProcessQueryReply>,
tonic::Status,
>;
async fn read_events(
&self,
request: tonic::Request<super::ReadEventsRequest>,
@ -460,7 +472,10 @@ pub mod simulation_server {
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
@ -516,15 +531,21 @@ pub mod simulation_server {
"/simulation.Simulation/Init" => {
#[allow(non_camel_case_types)]
struct InitSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::InitRequest> for InitSvc<T> {
impl<T: Simulation> tonic::server::UnaryService<super::InitRequest>
for InitSvc<T> {
type Response = super::InitReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::InitRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as Simulation>::init(&inner, request).await };
let fut = async move {
<T as Simulation>::init(&inner, request).await
};
Box::pin(fut)
}
}
@ -554,15 +575,21 @@ pub mod simulation_server {
"/simulation.Simulation/Time" => {
#[allow(non_camel_case_types)]
struct TimeSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::TimeRequest> for TimeSvc<T> {
impl<T: Simulation> tonic::server::UnaryService<super::TimeRequest>
for TimeSvc<T> {
type Response = super::TimeReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::TimeRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as Simulation>::time(&inner, request).await };
let fut = async move {
<T as Simulation>::time(&inner, request).await
};
Box::pin(fut)
}
}
@ -592,15 +619,21 @@ pub mod simulation_server {
"/simulation.Simulation/Step" => {
#[allow(non_camel_case_types)]
struct StepSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::StepRequest> for StepSvc<T> {
impl<T: Simulation> tonic::server::UnaryService<super::StepRequest>
for StepSvc<T> {
type Response = super::StepReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::StepRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as Simulation>::step(&inner, request).await };
let fut = async move {
<T as Simulation>::step(&inner, request).await
};
Box::pin(fut)
}
}
@ -630,16 +663,23 @@ pub mod simulation_server {
"/simulation.Simulation/StepUntil" => {
#[allow(non_camel_case_types)]
struct StepUntilSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::StepUntilRequest> for StepUntilSvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::StepUntilRequest>
for StepUntilSvc<T> {
type Response = super::StepUntilReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::StepUntilRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut =
async move { <T as Simulation>::step_until(&inner, request).await };
let fut = async move {
<T as Simulation>::step_until(&inner, request).await
};
Box::pin(fut)
}
}
@ -669,11 +709,15 @@ pub mod simulation_server {
"/simulation.Simulation/ScheduleEvent" => {
#[allow(non_camel_case_types)]
struct ScheduleEventSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ScheduleEventRequest>
for ScheduleEventSvc<T>
{
impl<
T: Simulation,
> tonic::server::UnaryService<super::ScheduleEventRequest>
for ScheduleEventSvc<T> {
type Response = super::ScheduleEventReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ScheduleEventRequest>,
@ -711,9 +755,15 @@ pub mod simulation_server {
"/simulation.Simulation/CancelEvent" => {
#[allow(non_camel_case_types)]
struct CancelEventSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::CancelEventRequest> for CancelEventSvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::CancelEventRequest>
for CancelEventSvc<T> {
type Response = super::CancelEventReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::CancelEventRequest>,
@ -751,9 +801,15 @@ pub mod simulation_server {
"/simulation.Simulation/ProcessEvent" => {
#[allow(non_camel_case_types)]
struct ProcessEventSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ProcessEventRequest> for ProcessEventSvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::ProcessEventRequest>
for ProcessEventSvc<T> {
type Response = super::ProcessEventReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ProcessEventRequest>,
@ -791,9 +847,15 @@ pub mod simulation_server {
"/simulation.Simulation/ProcessQuery" => {
#[allow(non_camel_case_types)]
struct ProcessQuerySvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ProcessQueryRequest> for ProcessQuerySvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::ProcessQueryRequest>
for ProcessQuerySvc<T> {
type Response = super::ProcessQueryReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ProcessQueryRequest>,
@ -831,9 +893,15 @@ pub mod simulation_server {
"/simulation.Simulation/ReadEvents" => {
#[allow(non_camel_case_types)]
struct ReadEventsSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::ReadEventsRequest> for ReadEventsSvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::ReadEventsRequest>
for ReadEventsSvc<T> {
type Response = super::ReadEventsReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ReadEventsRequest>,
@ -871,16 +939,23 @@ pub mod simulation_server {
"/simulation.Simulation/OpenSink" => {
#[allow(non_camel_case_types)]
struct OpenSinkSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::OpenSinkRequest> for OpenSinkSvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::OpenSinkRequest>
for OpenSinkSvc<T> {
type Response = super::OpenSinkReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::OpenSinkRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut =
async move { <T as Simulation>::open_sink(&inner, request).await };
let fut = async move {
<T as Simulation>::open_sink(&inner, request).await
};
Box::pin(fut)
}
}
@ -910,16 +985,23 @@ pub mod simulation_server {
"/simulation.Simulation/CloseSink" => {
#[allow(non_camel_case_types)]
struct CloseSinkSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::CloseSinkRequest> for CloseSinkSvc<T> {
impl<
T: Simulation,
> tonic::server::UnaryService<super::CloseSinkRequest>
for CloseSinkSvc<T> {
type Response = super::CloseSinkReply;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::CloseSinkRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut =
async move { <T as Simulation>::close_sink(&inner, request).await };
let fut = async move {
<T as Simulation>::close_sink(&inner, request).await
};
Box::pin(fut)
}
}
@ -946,14 +1028,18 @@ pub mod simulation_server {
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
}),
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}

View File

@ -54,10 +54,10 @@ impl EndpointRegistry {
self.event_sources.get_mut(name).map(|s| s.as_mut())
}
/// Adds an query source to the registry.
/// Adds a query source to the registry.
///
/// If the specified name is already in use for another query source, the source
/// provided as argument is returned in the error.
/// If the specified name is already in use for another query source, the
/// source provided as argument is returned in the error.
pub fn add_query_source<T, R>(
&mut self,
source: QuerySource<T, R>,
@ -87,7 +87,7 @@ impl EndpointRegistry {
///
/// If the specified name is already in use for another sink, the sink
/// provided as argument is returned in the error.
pub fn add_sink<S>(&mut self, sink: S, name: impl Into<String>) -> Result<(), S>
pub fn add_event_sink<S>(&mut self, sink: S, name: impl Into<String>) -> Result<(), S>
where
S: EventSinkStream + Send + 'static,
S::Item: Serialize,
@ -104,7 +104,7 @@ impl EndpointRegistry {
/// Returns a mutable reference to the specified sink if it is in the
/// registry.
pub(crate) fn get_sink_mut(&mut self, name: &str) -> Option<&mut dyn EventSinkStreamAny> {
pub(crate) fn get_event_sink_mut(&mut self, name: &str) -> Option<&mut dyn EventSinkStreamAny> {
self.sinks.get_mut(name).map(|s| s.as_mut())
}
}
@ -272,7 +272,7 @@ where
}
fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError> {
EventSinkStream::try_fold(self, Vec::new(), |mut encoded_events, event| {
self.__try_fold(Vec::new(), |mut encoded_events, event| {
rmp_serde::to_vec_named(&event).map(|encoded_event| {
encoded_events.push(encoded_event);

View File

@ -248,7 +248,7 @@ where
let reply = move || -> Result<Option<KeyRegistryId>, (ErrorCode, String)> {
let source_name = &request.source_name;
let msgpack_event = &request.event;
let with_key = request.with_key.unwrap_or_default();
let with_key = request.with_key;
let period = request
.period
.map(|period| {
@ -508,7 +508,7 @@ where
"the simulation was not started".to_string(),
))?;
let sink = registry.get_sink_mut(sink_name).ok_or((
let sink = registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
@ -549,7 +549,7 @@ where
"the simulation was not started".to_string(),
))?;
let sink = registry.get_sink_mut(sink_name).ok_or((
let sink = registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;
@ -582,7 +582,7 @@ where
"the simulation was not started".to_string(),
))?;
let sink = registry.get_sink_mut(sink_name).ok_or((
let sink = registry.get_event_sink_mut(sink_name).ok_or((
ErrorCode::SinkNotFound,
"no sink is registered with the name '{}'".to_string(),
))?;

View File

@ -23,7 +23,9 @@ where
F: FnMut() -> (SimInit, EndpointRegistry) + Send + 'static,
{
// Use a single-threaded server.
let rt = tokio::runtime::Builder::new_current_thread().build()?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
let sim_manager = GrpcServer::new(sim_gen);