From 8ec5cd9e9b1fd3d6e96966232137b371499a69bc Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 19 Jun 2024 12:00:59 +0200 Subject: [PATCH] Replace MessagePack by CBOR CBOR looks very similar but seems more future-proof as it was standardized by the IETF in RFC 8949. --- asynchronix/Cargo.toml | 4 +- .../src/registry/event_sink_registry.rs | 13 +++-- .../src/registry/event_source_registry.rs | 53 +++++++++++-------- .../src/registry/query_source_registry.rs | 31 ++++++----- .../src/rpc/services/controller_service.rs | 42 ++++++++------- .../src/rpc/services/monitor_service.rs | 7 +-- 6 files changed, 84 insertions(+), 66 deletions(-) diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 851ca50..5886394 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -23,7 +23,7 @@ autotests = false [features] # Remote procedure call API. -rpc = ["dep:rmp-serde", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"] +rpc = ["dep:ciborium", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"] # This feature forces protobuf/gRPC code (re-)generation. rpc-codegen = ["dep:tonic-build"] # gRPC service. @@ -57,7 +57,7 @@ tai-time = "0.3" bytes = { version = "1", default-features = false, optional = true } prost = { version = "0.12", optional = true } prost-types = { version = "0.12", optional = true } -rmp-serde = { version = "1.1", optional = true } +ciborium = { version = "0.2.2", optional = true } serde = { version = "1", optional = true } # gRPC service dependencies. diff --git a/asynchronix/src/registry/event_sink_registry.rs b/asynchronix/src/registry/event_sink_registry.rs index bac0ea8..a848f48 100644 --- a/asynchronix/src/registry/event_sink_registry.rs +++ b/asynchronix/src/registry/event_sink_registry.rs @@ -2,11 +2,13 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; -use rmp_serde::encode::Error as RmpEncodeError; +use ciborium; use serde::Serialize; use crate::ports::EventSinkStream; +type SerializationError = ciborium::ser::Error; + /// A registry that holds all sources and sinks meant to be accessed through /// remote procedure calls. #[derive(Default)] @@ -58,7 +60,7 @@ pub(crate) trait EventSinkStreamAny: Send + 'static { fn close(&mut self); /// Encode and collect all events in a vector. - fn collect(&mut self) -> Result>, RmpEncodeError>; + fn collect(&mut self) -> Result>, SerializationError>; } impl EventSinkStreamAny for E @@ -78,10 +80,11 @@ where self.close(); } - fn collect(&mut self) -> Result>, RmpEncodeError> { + fn collect(&mut self) -> Result>, SerializationError> { self.__try_fold(Vec::new(), |mut encoded_events, event| { - rmp_serde::to_vec_named(&event).map(|encoded_event| { - encoded_events.push(encoded_event); + let mut buffer = Vec::new(); + ciborium::into_writer(&event, &mut buffer).map(|_| { + encoded_events.push(buffer); encoded_events }) diff --git a/asynchronix/src/registry/event_source_registry.rs b/asynchronix/src/registry/event_source_registry.rs index fa63e27..f22c044 100644 --- a/asynchronix/src/registry/event_source_registry.rs +++ b/asynchronix/src/registry/event_source_registry.rs @@ -3,12 +3,14 @@ use std::collections::HashMap; use std::fmt; use std::time::Duration; -use rmp_serde::decode::Error as RmpDecodeError; +use ciborium; use serde::de::DeserializeOwned; use crate::ports::EventSource; use crate::simulation::{Action, ActionKey}; +type DeserializationError = ciborium::de::Error; + /// A registry that holds all sources and sinks meant to be accessed through /// remote procedure calls. #[derive(Default)] @@ -50,41 +52,43 @@ impl fmt::Debug for EventSourceRegistry { } } -/// A type-erased `EventSource` that operates on MessagePack-encoded serialized -/// events. +/// A type-erased `EventSource` that operates on CBOR-encoded serialized events. pub(crate) trait EventSourceAny: Send + 'static { /// Returns an action which, when processed, broadcasts an event to all /// connected input ports. /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn event(&mut self, msgpack_arg: &[u8]) -> Result; + /// The argument is expected to conform to the serde CBOR encoding. + fn event(&mut self, serialized_arg: &[u8]) -> Result; /// Returns a cancellable action and a cancellation key; when processed, the /// action broadcasts an event to all connected input ports. /// - /// The argument is expected to conform to the serde MessagePack encoding. - fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>; + /// The argument is expected to conform to the serde CBOR encoding. + fn keyed_event( + &mut self, + serialized_arg: &[u8], + ) -> Result<(Action, ActionKey), DeserializationError>; /// Returns a periodically recurring action which, when processed, /// broadcasts an event to all connected input ports. /// - /// The argument is expected to conform to the serde MessagePack encoding. + /// The argument is expected to conform to the serde CBOR encoding. fn periodic_event( &mut self, period: Duration, - msgpack_arg: &[u8], - ) -> Result; + serialized_arg: &[u8], + ) -> Result; /// Returns a cancellable, periodically recurring action and a cancellation /// key; when processed, the action broadcasts an event to all connected /// input ports. /// - /// The argument is expected to conform to the serde MessagePack encoding. + /// The argument is expected to conform to the serde CBOR encoding. fn keyed_periodic_event( &mut self, period: Duration, - msgpack_arg: &[u8], - ) -> Result<(Action, ActionKey), RmpDecodeError>; + serialized_arg: &[u8], + ) -> Result<(Action, ActionKey), DeserializationError>; /// Human-readable name of the event type, as returned by /// `any::type_name()`. @@ -95,25 +99,28 @@ impl EventSourceAny for EventSource where T: DeserializeOwned + Clone + Send + 'static, { - fn event(&mut self, msgpack_arg: &[u8]) -> Result { - rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg)) + fn event(&mut self, serialized_arg: &[u8]) -> Result { + ciborium::from_reader(serialized_arg).map(|arg| self.event(arg)) } - fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg)) + fn keyed_event( + &mut self, + serialized_arg: &[u8], + ) -> Result<(Action, ActionKey), DeserializationError> { + ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg)) } fn periodic_event( &mut self, period: Duration, - msgpack_arg: &[u8], - ) -> Result { - rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg)) + serialized_arg: &[u8], + ) -> Result { + ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg)) } fn keyed_periodic_event( &mut self, period: Duration, - msgpack_arg: &[u8], - ) -> Result<(Action, ActionKey), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg)) + serialized_arg: &[u8], + ) -> Result<(Action, ActionKey), DeserializationError> { + ciborium::from_reader(serialized_arg).map(|arg| self.keyed_periodic_event(period, arg)) } fn event_type_name(&self) -> &'static str { std::any::type_name::() diff --git a/asynchronix/src/registry/query_source_registry.rs b/asynchronix/src/registry/query_source_registry.rs index fb18233..b12cacb 100644 --- a/asynchronix/src/registry/query_source_registry.rs +++ b/asynchronix/src/registry/query_source_registry.rs @@ -2,14 +2,16 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; -use rmp_serde::decode::Error as RmpDecodeError; -use rmp_serde::encode::Error as RmpEncodeError; +use ciborium; use serde::de::DeserializeOwned; use serde::Serialize; use crate::ports::{QuerySource, ReplyReceiver}; use crate::simulation::Action; +type DeserializationError = ciborium::de::Error; +type SerializationError = ciborium::ser::Error; + /// A registry that holds all sources and sinks meant to be accessed through /// remote procedure calls. #[derive(Default)] @@ -52,18 +54,18 @@ impl fmt::Debug for QuerySourceRegistry { } } -/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized -/// queries and returns MessagePack-encoded replies. +/// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries +/// and returns CBOR-encoded replies. pub(crate) trait QuerySourceAny: Send + 'static { /// Returns an action which, when processed, broadcasts a query to all /// connected replier ports. /// /// - /// The argument is expected to conform to the serde MessagePack encoding. + /// The argument is expected to conform to the serde CBOR encoding. fn query( &mut self, - msgpack_arg: &[u8], - ) -> Result<(Action, Box), RmpDecodeError>; + arg: &[u8], + ) -> Result<(Action, Box), DeserializationError>; /// Human-readable name of the request type, as returned by /// `any::type_name()`. @@ -81,9 +83,9 @@ where { fn query( &mut self, - msgpack_arg: &[u8], - ) -> Result<(Action, Box), RmpDecodeError> { - rmp_serde::from_read(msgpack_arg).map(|arg| { + arg: &[u8], + ) -> Result<(Action, Box), DeserializationError> { + ciborium::from_reader(arg).map(|arg| { let (action, reply_recv) = self.query(arg); let reply_recv: Box = Box::new(reply_recv); @@ -100,20 +102,21 @@ where } } -/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies.. +/// A type-erased `ReplyReceiver` that returns CBOR-encoded replies. pub(crate) trait ReplyReceiverAny { /// Take the replies, if any, encode them and collect them in a vector. - fn take_collect(&mut self) -> Option>, RmpEncodeError>>; + fn take_collect(&mut self) -> Option>, SerializationError>>; } impl ReplyReceiverAny for ReplyReceiver { - fn take_collect(&mut self) -> Option>, RmpEncodeError>> { + fn take_collect(&mut self) -> Option>, SerializationError>> { let replies = self.take()?; let encoded_replies = (move || { let mut encoded_replies = Vec::new(); for reply in replies { - let encoded_reply = rmp_serde::to_vec_named(&reply)?; + let mut encoded_reply = Vec::new(); + ciborium::into_writer(&reply, &mut encoded_reply)?; encoded_replies.push(encoded_reply); } diff --git a/asynchronix/src/rpc/services/controller_service.rs b/asynchronix/src/rpc/services/controller_service.rs index 76d6a79..037946b 100644 --- a/asynchronix/src/rpc/services/controller_service.rs +++ b/asynchronix/src/rpc/services/controller_service.rs @@ -149,7 +149,7 @@ impl ControllerService { .. } => move || -> Result, Error> { let source_name = &request.source_name; - let msgpack_event = &request.event; + let event = &request.event; let with_key = request.with_key; let period = request .period @@ -188,23 +188,24 @@ impl ControllerService { ))?; let (action, action_key) = match (with_key, period) { - (false, None) => source.event(msgpack_event).map(|action| (action, None)), + (false, None) => source.event(event).map(|action| (action, None)), (false, Some(period)) => source - .periodic_event(period, msgpack_event) + .periodic_event(period, event) .map(|action| (action, None)), (true, None) => source - .keyed_event(msgpack_event) + .keyed_event(event) .map(|(action, key)| (action, Some(key))), (true, Some(period)) => source - .keyed_periodic_event(period, msgpack_event) + .keyed_periodic_event(period, event) .map(|(action, key)| (action, Some(key))), } - .map_err(|_| { + .map_err(|e| { to_error( ErrorCode::InvalidMessage, format!( - "the event could not be deserialized as type '{}'", - source.event_type_name() + "the event could not be deserialized as type '{}': {}", + source.event_type_name(), + e ), ) })?; @@ -296,19 +297,20 @@ impl ControllerService { .. } => move || -> Result<(), Error> { let source_name = &request.source_name; - let msgpack_event = &request.event; + let event = &request.event; let source = event_source_registry.get_mut(source_name).ok_or(to_error( ErrorCode::SourceNotFound, "no source is registered with the name '{}'".to_string(), ))?; - let event = source.event(msgpack_event).map_err(|_| { + let event = source.event(event).map_err(|e| { to_error( ErrorCode::InvalidMessage, format!( - "the event could not be deserialized as type '{}'", - source.event_type_name() + "the event could not be deserialized as type '{}': {}", + source.event_type_name(), + e ), ) })?; @@ -340,19 +342,20 @@ impl ControllerService { .. } => move || -> Result>, Error> { let source_name = &request.source_name; - let msgpack_request = &request.request; + let request = &request.request; let source = query_source_registry.get_mut(source_name).ok_or(to_error( ErrorCode::SourceNotFound, "no source is registered with the name '{}'".to_string(), ))?; - let (query, mut promise) = source.query(msgpack_request).map_err(|_| { + let (query, mut promise) = source.query(request).map_err(|e| { to_error( ErrorCode::InvalidMessage, format!( - "the request could not be deserialized as type '{}'", - source.request_type_name() + "the request could not be deserialized as type '{}': {}", + source.request_type_name(), + e ), ) })?; @@ -364,12 +367,13 @@ impl ControllerService { "a reply to the query was expected but none was available".to_string(), ))?; - replies.map_err(|_| { + replies.map_err(|e| { to_error( ErrorCode::InvalidMessage, format!( - "the reply could not be serialized as type '{}'", - source.reply_type_name() + "the reply could not be serialized as type '{}': {}", + source.reply_type_name(), + e ), ) }) diff --git a/asynchronix/src/rpc/services/monitor_service.rs b/asynchronix/src/rpc/services/monitor_service.rs index b1e18f5..b6f1fc1 100644 --- a/asynchronix/src/rpc/services/monitor_service.rs +++ b/asynchronix/src/rpc/services/monitor_service.rs @@ -33,12 +33,13 @@ impl MonitorService { format!("no sink is registered with the name '{}'", sink_name), ))?; - sink.collect().map_err(|_| { + sink.collect().map_err(|e| { to_error( ErrorCode::InvalidMessage, format!( - "the event could not be serialized from type '{}'", - sink.event_type_name() + "the event could not be serialized from type '{}': {}", + sink.event_type_name(), + e ), ) })