1
0
forked from ROMEO/nexosim

Merge pull request #26 from asynchronics/feature/cbor-instead-of-msgpack

Replace MessagePack by CBOR
This commit is contained in:
Serge Barral 2024-06-19 12:07:26 +02:00 committed by GitHub
commit cb7caa10e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 84 additions and 66 deletions

View File

@ -23,7 +23,7 @@ autotests = false
[features] [features]
# Remote procedure call API. # Remote procedure call API.
rpc = ["dep:rmp-serde", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"] rpc = ["dep:ciborium", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"]
# This feature forces protobuf/gRPC code (re-)generation. # This feature forces protobuf/gRPC code (re-)generation.
rpc-codegen = ["dep:tonic-build"] rpc-codegen = ["dep:tonic-build"]
# gRPC service. # gRPC service.
@ -57,7 +57,7 @@ tai-time = "0.3"
bytes = { version = "1", default-features = false, optional = true } bytes = { version = "1", default-features = false, optional = true }
prost = { version = "0.12", optional = true } prost = { version = "0.12", optional = true }
prost-types = { version = "0.12", optional = true } prost-types = { version = "0.12", optional = true }
rmp-serde = { version = "1.1", optional = true } ciborium = { version = "0.2.2", optional = true }
serde = { version = "1", optional = true } serde = { version = "1", optional = true }
# gRPC service dependencies. # gRPC service dependencies.

View File

@ -2,11 +2,13 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use rmp_serde::encode::Error as RmpEncodeError; use ciborium;
use serde::Serialize; use serde::Serialize;
use crate::ports::EventSinkStream; use crate::ports::EventSinkStream;
type SerializationError = ciborium::ser::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through /// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls. /// remote procedure calls.
#[derive(Default)] #[derive(Default)]
@ -58,7 +60,7 @@ pub(crate) trait EventSinkStreamAny: Send + 'static {
fn close(&mut self); fn close(&mut self);
/// Encode and collect all events in a vector. /// Encode and collect all events in a vector.
fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError>; fn collect(&mut self) -> Result<Vec<Vec<u8>>, SerializationError>;
} }
impl<E> EventSinkStreamAny for E impl<E> EventSinkStreamAny for E
@ -78,10 +80,11 @@ where
self.close(); self.close();
} }
fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError> { fn collect(&mut self) -> Result<Vec<Vec<u8>>, SerializationError> {
self.__try_fold(Vec::new(), |mut encoded_events, event| { self.__try_fold(Vec::new(), |mut encoded_events, event| {
rmp_serde::to_vec_named(&event).map(|encoded_event| { let mut buffer = Vec::new();
encoded_events.push(encoded_event); ciborium::into_writer(&event, &mut buffer).map(|_| {
encoded_events.push(buffer);
encoded_events encoded_events
}) })

View File

@ -3,12 +3,14 @@ use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
use rmp_serde::decode::Error as RmpDecodeError; use ciborium;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use crate::ports::EventSource; use crate::ports::EventSource;
use crate::simulation::{Action, ActionKey}; use crate::simulation::{Action, ActionKey};
type DeserializationError = ciborium::de::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through /// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls. /// remote procedure calls.
#[derive(Default)] #[derive(Default)]
@ -50,41 +52,43 @@ impl fmt::Debug for EventSourceRegistry {
} }
} }
/// A type-erased `EventSource` that operates on MessagePack-encoded serialized /// A type-erased `EventSource` that operates on CBOR-encoded serialized events.
/// events.
pub(crate) trait EventSourceAny: Send + 'static { pub(crate) trait EventSourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts an event to all /// Returns an action which, when processed, broadcasts an event to all
/// connected input ports. /// connected input ports.
/// ///
/// The argument is expected to conform to the serde MessagePack encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError>; fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>;
/// Returns a cancellable action and a cancellation key; when processed, the /// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports. /// action broadcasts an event to all connected input ports.
/// ///
/// The argument is expected to conform to the serde MessagePack encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>; fn keyed_event(
&mut self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;
/// Returns a periodically recurring action which, when processed, /// Returns a periodically recurring action which, when processed,
/// broadcasts an event to all connected input ports. /// broadcasts an event to all connected input ports.
/// ///
/// The argument is expected to conform to the serde MessagePack encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn periodic_event( fn periodic_event(
&mut self, &mut self,
period: Duration, period: Duration,
msgpack_arg: &[u8], serialized_arg: &[u8],
) -> Result<Action, RmpDecodeError>; ) -> Result<Action, DeserializationError>;
/// Returns a cancellable, periodically recurring action and a cancellation /// Returns a cancellable, periodically recurring action and a cancellation
/// key; when processed, the action broadcasts an event to all connected /// key; when processed, the action broadcasts an event to all connected
/// input ports. /// input ports.
/// ///
/// The argument is expected to conform to the serde MessagePack encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn keyed_periodic_event( fn keyed_periodic_event(
&mut self, &mut self,
period: Duration, period: Duration,
msgpack_arg: &[u8], serialized_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError>; ) -> Result<(Action, ActionKey), DeserializationError>;
/// Human-readable name of the event type, as returned by /// Human-readable name of the event type, as returned by
/// `any::type_name()`. /// `any::type_name()`.
@ -95,25 +99,28 @@ impl<T> EventSourceAny for EventSource<T>
where where
T: DeserializeOwned + Clone + Send + 'static, T: DeserializeOwned + Clone + Send + 'static,
{ {
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError> { fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg)) ciborium::from_reader(serialized_arg).map(|arg| self.event(arg))
} }
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> { fn keyed_event(
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg)) &mut self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg))
} }
fn periodic_event( fn periodic_event(
&mut self, &mut self,
period: Duration, period: Duration,
msgpack_arg: &[u8], serialized_arg: &[u8],
) -> Result<Action, RmpDecodeError> { ) -> Result<Action, DeserializationError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg)) ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg))
} }
fn keyed_periodic_event( fn keyed_periodic_event(
&mut self, &mut self,
period: Duration, period: Duration,
msgpack_arg: &[u8], serialized_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError> { ) -> Result<(Action, ActionKey), DeserializationError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg)) ciborium::from_reader(serialized_arg).map(|arg| self.keyed_periodic_event(period, arg))
} }
fn event_type_name(&self) -> &'static str { fn event_type_name(&self) -> &'static str {
std::any::type_name::<T>() std::any::type_name::<T>()

View File

@ -2,14 +2,16 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use rmp_serde::decode::Error as RmpDecodeError; use ciborium;
use rmp_serde::encode::Error as RmpEncodeError;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use crate::ports::{QuerySource, ReplyReceiver}; use crate::ports::{QuerySource, ReplyReceiver};
use crate::simulation::Action; use crate::simulation::Action;
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SerializationError = ciborium::ser::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through /// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls. /// remote procedure calls.
#[derive(Default)] #[derive(Default)]
@ -52,18 +54,18 @@ impl fmt::Debug for QuerySourceRegistry {
} }
} }
/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized /// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries
/// queries and returns MessagePack-encoded replies. /// and returns CBOR-encoded replies.
pub(crate) trait QuerySourceAny: Send + 'static { pub(crate) trait QuerySourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts a query to all /// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports. /// connected replier ports.
/// ///
/// ///
/// The argument is expected to conform to the serde MessagePack encoding. /// The argument is expected to conform to the serde CBOR encoding.
fn query( fn query(
&mut self, &mut self,
msgpack_arg: &[u8], arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError>; ) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError>;
/// Human-readable name of the request type, as returned by /// Human-readable name of the request type, as returned by
/// `any::type_name()`. /// `any::type_name()`.
@ -81,9 +83,9 @@ where
{ {
fn query( fn query(
&mut self, &mut self,
msgpack_arg: &[u8], arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError> { ) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError> {
rmp_serde::from_read(msgpack_arg).map(|arg| { ciborium::from_reader(arg).map(|arg| {
let (action, reply_recv) = self.query(arg); let (action, reply_recv) = self.query(arg);
let reply_recv: Box<dyn ReplyReceiverAny> = Box::new(reply_recv); let reply_recv: Box<dyn ReplyReceiverAny> = Box::new(reply_recv);
@ -100,20 +102,21 @@ where
} }
} }
/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies.. /// A type-erased `ReplyReceiver` that returns CBOR-encoded replies.
pub(crate) trait ReplyReceiverAny { pub(crate) trait ReplyReceiverAny {
/// Take the replies, if any, encode them and collect them in a vector. /// Take the replies, if any, encode them and collect them in a vector.
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>>; fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, SerializationError>>;
} }
impl<R: Serialize + 'static> ReplyReceiverAny for ReplyReceiver<R> { impl<R: Serialize + 'static> ReplyReceiverAny for ReplyReceiver<R> {
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>> { fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, SerializationError>> {
let replies = self.take()?; let replies = self.take()?;
let encoded_replies = (move || { let encoded_replies = (move || {
let mut encoded_replies = Vec::new(); let mut encoded_replies = Vec::new();
for reply in replies { for reply in replies {
let encoded_reply = rmp_serde::to_vec_named(&reply)?; let mut encoded_reply = Vec::new();
ciborium::into_writer(&reply, &mut encoded_reply)?;
encoded_replies.push(encoded_reply); encoded_replies.push(encoded_reply);
} }

View File

@ -149,7 +149,7 @@ impl ControllerService {
.. ..
} => move || -> Result<Option<KeyRegistryId>, Error> { } => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name; let source_name = &request.source_name;
let msgpack_event = &request.event; let event = &request.event;
let with_key = request.with_key; let with_key = request.with_key;
let period = request let period = request
.period .period
@ -188,23 +188,24 @@ impl ControllerService {
))?; ))?;
let (action, action_key) = match (with_key, period) { let (action, action_key) = match (with_key, period) {
(false, None) => source.event(msgpack_event).map(|action| (action, None)), (false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source (false, Some(period)) => source
.periodic_event(period, msgpack_event) .periodic_event(period, event)
.map(|action| (action, None)), .map(|action| (action, None)),
(true, None) => source (true, None) => source
.keyed_event(msgpack_event) .keyed_event(event)
.map(|(action, key)| (action, Some(key))), .map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source (true, Some(period)) => source
.keyed_periodic_event(period, msgpack_event) .keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))), .map(|(action, key)| (action, Some(key))),
} }
.map_err(|_| { .map_err(|e| {
to_error( to_error(
ErrorCode::InvalidMessage, ErrorCode::InvalidMessage,
format!( format!(
"the event could not be deserialized as type '{}'", "the event could not be deserialized as type '{}': {}",
source.event_type_name() source.event_type_name(),
e
), ),
) )
})?; })?;
@ -296,19 +297,20 @@ impl ControllerService {
.. ..
} => move || -> Result<(), Error> { } => move || -> Result<(), Error> {
let source_name = &request.source_name; let source_name = &request.source_name;
let msgpack_event = &request.event; let event = &request.event;
let source = event_source_registry.get_mut(source_name).ok_or(to_error( let source = event_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound, ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(), "no source is registered with the name '{}'".to_string(),
))?; ))?;
let event = source.event(msgpack_event).map_err(|_| { let event = source.event(event).map_err(|e| {
to_error( to_error(
ErrorCode::InvalidMessage, ErrorCode::InvalidMessage,
format!( format!(
"the event could not be deserialized as type '{}'", "the event could not be deserialized as type '{}': {}",
source.event_type_name() source.event_type_name(),
e
), ),
) )
})?; })?;
@ -340,19 +342,20 @@ impl ControllerService {
.. ..
} => move || -> Result<Vec<Vec<u8>>, Error> { } => move || -> Result<Vec<Vec<u8>>, Error> {
let source_name = &request.source_name; let source_name = &request.source_name;
let msgpack_request = &request.request; let request = &request.request;
let source = query_source_registry.get_mut(source_name).ok_or(to_error( let source = query_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound, ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(), "no source is registered with the name '{}'".to_string(),
))?; ))?;
let (query, mut promise) = source.query(msgpack_request).map_err(|_| { let (query, mut promise) = source.query(request).map_err(|e| {
to_error( to_error(
ErrorCode::InvalidMessage, ErrorCode::InvalidMessage,
format!( format!(
"the request could not be deserialized as type '{}'", "the request could not be deserialized as type '{}': {}",
source.request_type_name() source.request_type_name(),
e
), ),
) )
})?; })?;
@ -364,12 +367,13 @@ impl ControllerService {
"a reply to the query was expected but none was available".to_string(), "a reply to the query was expected but none was available".to_string(),
))?; ))?;
replies.map_err(|_| { replies.map_err(|e| {
to_error( to_error(
ErrorCode::InvalidMessage, ErrorCode::InvalidMessage,
format!( format!(
"the reply could not be serialized as type '{}'", "the reply could not be serialized as type '{}': {}",
source.reply_type_name() source.reply_type_name(),
e
), ),
) )
}) })

View File

@ -33,12 +33,13 @@ impl MonitorService {
format!("no sink is registered with the name '{}'", sink_name), format!("no sink is registered with the name '{}'", sink_name),
))?; ))?;
sink.collect().map_err(|_| { sink.collect().map_err(|e| {
to_error( to_error(
ErrorCode::InvalidMessage, ErrorCode::InvalidMessage,
format!( format!(
"the event could not be serialized from type '{}'", "the event could not be serialized from type '{}': {}",
sink.event_type_name() sink.event_type_name(),
e
), ),
) )
}) })