1
0
forked from ROMEO/nexosim

Replace MessagePack by CBOR

CBOR looks very similar but seems more future-proof as it was
standardized by the IETF in RFC 8949.
This commit is contained in:
Serge Barral
2024-06-19 12:00:59 +02:00
parent 4039d96127
commit 8ec5cd9e9b
6 changed files with 84 additions and 66 deletions

View File

@ -2,11 +2,13 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use rmp_serde::encode::Error as RmpEncodeError;
use ciborium;
use serde::Serialize;
use crate::ports::EventSinkStream;
type SerializationError = ciborium::ser::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
@ -58,7 +60,7 @@ pub(crate) trait EventSinkStreamAny: Send + 'static {
fn close(&mut self);
/// Encode and collect all events in a vector.
fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError>;
fn collect(&mut self) -> Result<Vec<Vec<u8>>, SerializationError>;
}
impl<E> EventSinkStreamAny for E
@ -78,10 +80,11 @@ where
self.close();
}
fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError> {
fn collect(&mut self) -> Result<Vec<Vec<u8>>, SerializationError> {
self.__try_fold(Vec::new(), |mut encoded_events, event| {
rmp_serde::to_vec_named(&event).map(|encoded_event| {
encoded_events.push(encoded_event);
let mut buffer = Vec::new();
ciborium::into_writer(&event, &mut buffer).map(|_| {
encoded_events.push(buffer);
encoded_events
})

View File

@ -3,12 +3,14 @@ use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use rmp_serde::decode::Error as RmpDecodeError;
use ciborium;
use serde::de::DeserializeOwned;
use crate::ports::EventSource;
use crate::simulation::{Action, ActionKey};
type DeserializationError = ciborium::de::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
@ -50,41 +52,43 @@ impl fmt::Debug for EventSourceRegistry {
}
}
/// A type-erased `EventSource` that operates on MessagePack-encoded serialized
/// events.
/// A type-erased `EventSource` that operates on CBOR-encoded serialized events.
pub(crate) trait EventSourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts an event to all
/// connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError>;
/// The argument is expected to conform to the serde CBOR encoding.
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>;
/// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>;
/// The argument is expected to conform to the serde CBOR encoding.
fn keyed_event(
&mut self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;
/// Returns a periodically recurring action which, when processed,
/// broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
/// The argument is expected to conform to the serde CBOR encoding.
fn periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<Action, RmpDecodeError>;
serialized_arg: &[u8],
) -> Result<Action, DeserializationError>;
/// Returns a cancellable, periodically recurring action and a cancellation
/// key; when processed, the action broadcasts an event to all connected
/// input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
/// The argument is expected to conform to the serde CBOR encoding.
fn keyed_periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError>;
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;
/// Human-readable name of the event type, as returned by
/// `any::type_name()`.
@ -95,25 +99,28 @@ impl<T> EventSourceAny for EventSource<T>
where
T: DeserializeOwned + Clone + Send + 'static,
{
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg))
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.event(arg))
}
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg))
fn keyed_event(
&mut self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg))
}
fn periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<Action, RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg))
serialized_arg: &[u8],
) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg))
}
fn keyed_periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg))
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_periodic_event(period, arg))
}
fn event_type_name(&self) -> &'static str {
std::any::type_name::<T>()

View File

@ -2,14 +2,16 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use rmp_serde::decode::Error as RmpDecodeError;
use rmp_serde::encode::Error as RmpEncodeError;
use ciborium;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::ports::{QuerySource, ReplyReceiver};
use crate::simulation::Action;
type DeserializationError = ciborium::de::Error<std::io::Error>;
type SerializationError = ciborium::ser::Error<std::io::Error>;
/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
@ -52,18 +54,18 @@ impl fmt::Debug for QuerySourceRegistry {
}
}
/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized
/// queries and returns MessagePack-encoded replies.
/// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries
/// and returns CBOR-encoded replies.
pub(crate) trait QuerySourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports.
///
///
/// The argument is expected to conform to the serde MessagePack encoding.
/// The argument is expected to conform to the serde CBOR encoding.
fn query(
&mut self,
msgpack_arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError>;
arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError>;
/// Human-readable name of the request type, as returned by
/// `any::type_name()`.
@ -81,9 +83,9 @@ where
{
fn query(
&mut self,
msgpack_arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| {
arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError> {
ciborium::from_reader(arg).map(|arg| {
let (action, reply_recv) = self.query(arg);
let reply_recv: Box<dyn ReplyReceiverAny> = Box::new(reply_recv);
@ -100,20 +102,21 @@ where
}
}
/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies..
/// A type-erased `ReplyReceiver` that returns CBOR-encoded replies.
pub(crate) trait ReplyReceiverAny {
/// Take the replies, if any, encode them and collect them in a vector.
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>>;
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, SerializationError>>;
}
impl<R: Serialize + 'static> ReplyReceiverAny for ReplyReceiver<R> {
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>> {
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, SerializationError>> {
let replies = self.take()?;
let encoded_replies = (move || {
let mut encoded_replies = Vec::new();
for reply in replies {
let encoded_reply = rmp_serde::to_vec_named(&reply)?;
let mut encoded_reply = Vec::new();
ciborium::into_writer(&reply, &mut encoded_reply)?;
encoded_replies.push(encoded_reply);
}