forked from ROMEO/nexosim
Merge pull request #71 from asynchronics/combinator
Add replier adaptor
This commit is contained in:
commit
4623765ca2
276
nexosim-util/examples/replier_adaptor.rs
Normal file
276
nexosim-util/examples/replier_adaptor.rs
Normal file
@ -0,0 +1,276 @@
|
|||||||
|
//! Example: RIU acquiring data from sensor.
|
||||||
|
//!
|
||||||
|
//! This example demonstrates in particular:
|
||||||
|
//!
|
||||||
|
//! * the use of replier port adaptor,
|
||||||
|
//! * periodic model self-scheduling.
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! ┌────────┐ ┌─────────┐ Sensor TC ┌─────┐
|
||||||
|
//! Set temperature ●────►│ │ ◄Sensor TC │ │◄────────────┤ │
|
||||||
|
//! │ Sensor │◄►────────────►◄│ Adaptor │ Sensor TM │ RIU ├────► RIU TM
|
||||||
|
//! Set illuminance ●────►│ │ Sensor TM► │ ├────────────►│ │
|
||||||
|
//! └────────┘ └─────────┘ └─────┘
|
||||||
|
//! ```
|
||||||
|
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use nexosim::model::{Context, InitializedModel, Model};
|
||||||
|
use nexosim::ports::{EventBuffer, Output};
|
||||||
|
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
|
||||||
|
use nexosim::time::MonotonicTime;
|
||||||
|
use nexosim_util::combinators::ReplierAdaptor;
|
||||||
|
|
||||||
|
const DELTA: Duration = Duration::from_millis(2);
|
||||||
|
const PERIOD: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
/// Sensor TC.
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub enum SensorTc {
|
||||||
|
GetTemp,
|
||||||
|
GetIllum,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sensor TM.
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub enum SensorTm {
|
||||||
|
Temp(f64),
|
||||||
|
Illum(f64),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sensor model.
|
||||||
|
pub struct Sensor {
|
||||||
|
/// Temperature [deg C] -- internal state.
|
||||||
|
temp: f64,
|
||||||
|
|
||||||
|
/// Illuminance [lx] -- internal state.
|
||||||
|
illum: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sensor {
|
||||||
|
/// Creates a sensor model.
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
temp: 0.0,
|
||||||
|
illum: 0.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets sensor temperature [deg C].
|
||||||
|
pub async fn set_temp(&mut self, temp: f64) {
|
||||||
|
self.temp = temp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets sensor illuminance [lx].
|
||||||
|
pub async fn set_illum(&mut self, illum: f64) {
|
||||||
|
self.illum = illum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Processes sensor TC -- input port.
|
||||||
|
pub async fn process_tc(&mut self, tc: SensorTc) -> SensorTm {
|
||||||
|
match tc {
|
||||||
|
SensorTc::GetTemp => SensorTm::Temp(self.temp),
|
||||||
|
SensorTc::GetIllum => SensorTm::Illum(self.illum),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Model for Sensor {}
|
||||||
|
|
||||||
|
/// Internal TM field.
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct TmField<T>
|
||||||
|
where
|
||||||
|
T: Clone + Debug + PartialEq,
|
||||||
|
{
|
||||||
|
/// TM value.
|
||||||
|
pub value: T,
|
||||||
|
|
||||||
|
/// TM readiness flag.
|
||||||
|
pub ready: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RIU TM.
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct RiuTm {
|
||||||
|
/// Temperature [deg C].
|
||||||
|
temp: f64,
|
||||||
|
|
||||||
|
/// Iluminance [lx].
|
||||||
|
illum: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RIU model.
|
||||||
|
pub struct Riu {
|
||||||
|
/// Sensor TC -- output port.
|
||||||
|
pub sensor_tc: Output<SensorTc>,
|
||||||
|
|
||||||
|
/// RIU TM -- output port.
|
||||||
|
pub tm: Output<RiuTm>,
|
||||||
|
|
||||||
|
/// Temperature [deg C] -- internal state.
|
||||||
|
temp: TmField<f64>,
|
||||||
|
|
||||||
|
/// Illuminance [lx] -- internal state.
|
||||||
|
illum: TmField<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Riu {
|
||||||
|
/// Creates an RIU model.
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
sensor_tc: Output::new(),
|
||||||
|
tm: Output::new(),
|
||||||
|
temp: TmField {
|
||||||
|
value: 0.0,
|
||||||
|
ready: true,
|
||||||
|
},
|
||||||
|
illum: TmField {
|
||||||
|
value: 0.0,
|
||||||
|
ready: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Processes sensor TM -- input port.
|
||||||
|
pub async fn sensor_tm(&mut self, tm: SensorTm) {
|
||||||
|
match tm {
|
||||||
|
SensorTm::Temp(temp) => {
|
||||||
|
self.temp = TmField {
|
||||||
|
value: temp,
|
||||||
|
ready: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SensorTm::Illum(illum) => {
|
||||||
|
self.illum = TmField {
|
||||||
|
value: illum,
|
||||||
|
ready: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.temp.ready && self.illum.ready {
|
||||||
|
self.report().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts sensor TM acquisition -- periodic activity.
|
||||||
|
async fn acquire(&mut self) {
|
||||||
|
self.temp.ready = false;
|
||||||
|
self.illum.ready = false;
|
||||||
|
self.sensor_tc.send(SensorTc::GetTemp).await;
|
||||||
|
self.sensor_tc.send(SensorTc::GetIllum).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reports RIU TM.
|
||||||
|
async fn report(&mut self) {
|
||||||
|
self.tm
|
||||||
|
.send(RiuTm {
|
||||||
|
temp: self.temp.value,
|
||||||
|
illum: self.illum.value,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Model for Riu {
|
||||||
|
/// Initializes model.
|
||||||
|
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
|
||||||
|
// Schedule periodic acquisition.
|
||||||
|
cx.schedule_periodic_event(DELTA, PERIOD, Riu::acquire, ())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
self.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<(), SimulationError> {
|
||||||
|
// ---------------
|
||||||
|
// Bench assembly.
|
||||||
|
// ---------------
|
||||||
|
|
||||||
|
// Models.
|
||||||
|
let sensor = Sensor::new();
|
||||||
|
let mut riu = Riu::new();
|
||||||
|
let mut sensor_adaptor = ReplierAdaptor::new();
|
||||||
|
|
||||||
|
// Mailboxes.
|
||||||
|
let sensor_mbox = Mailbox::new();
|
||||||
|
let riu_mbox = Mailbox::new();
|
||||||
|
let sensor_adaptor_mbox = Mailbox::new();
|
||||||
|
|
||||||
|
// Connections.
|
||||||
|
riu.sensor_tc
|
||||||
|
.connect(ReplierAdaptor::input, &sensor_adaptor_mbox);
|
||||||
|
sensor_adaptor.output.connect(Riu::sensor_tm, &riu_mbox);
|
||||||
|
sensor_adaptor
|
||||||
|
.requestor
|
||||||
|
.connect(Sensor::process_tc, &sensor_mbox);
|
||||||
|
|
||||||
|
// Model handles for simulation.
|
||||||
|
let mut tm = EventBuffer::new();
|
||||||
|
let sensor_addr = sensor_mbox.address();
|
||||||
|
|
||||||
|
riu.tm.connect_sink(&tm);
|
||||||
|
|
||||||
|
// Start time (arbitrary since models do not depend on absolute time).
|
||||||
|
let t0 = MonotonicTime::EPOCH;
|
||||||
|
|
||||||
|
// Assembly and initialization.
|
||||||
|
let mut simu = SimInit::new()
|
||||||
|
.add_model(sensor, sensor_mbox, "sensor")
|
||||||
|
.add_model(riu, riu_mbox, "riu")
|
||||||
|
.add_model(sensor_adaptor, sensor_adaptor_mbox, "sensor_adaptor")
|
||||||
|
.init(t0)?
|
||||||
|
.0;
|
||||||
|
|
||||||
|
// ----------
|
||||||
|
// Simulation.
|
||||||
|
// ----------
|
||||||
|
|
||||||
|
// Initial state: no RIU TM.
|
||||||
|
assert_eq!(tm.next(), None);
|
||||||
|
|
||||||
|
simu.step_until(Duration::from_millis(1200))?;
|
||||||
|
|
||||||
|
// RIU TM generated.
|
||||||
|
assert_eq!(
|
||||||
|
tm.next(),
|
||||||
|
Some(RiuTm {
|
||||||
|
temp: 0.0,
|
||||||
|
illum: 0.0
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Consume all RIU TM generated so far.
|
||||||
|
while tm.next().is_some() {}
|
||||||
|
|
||||||
|
// Set temperature and wait for RIU TM.
|
||||||
|
simu.process_event(Sensor::set_temp, 2.0, &sensor_addr)?;
|
||||||
|
|
||||||
|
simu.step_until(Duration::from_millis(1000))?;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tm.next(),
|
||||||
|
Some(RiuTm {
|
||||||
|
temp: 2.0,
|
||||||
|
illum: 0.0
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set illuminance and wait for RIU TM.
|
||||||
|
simu.process_event(Sensor::set_illum, 3.0, &sensor_addr)?;
|
||||||
|
|
||||||
|
simu.step_until(Duration::from_millis(1000))?;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tm.next(),
|
||||||
|
Some(RiuTm {
|
||||||
|
temp: 2.0,
|
||||||
|
illum: 3.0
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
47
nexosim-util/src/combinators.rs
Normal file
47
nexosim-util/src/combinators.rs
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
//! Connector combinators.
|
||||||
|
//!
|
||||||
|
//! This module contains combinator types useful for simulation bench assembly.
|
||||||
|
//!
|
||||||
|
|
||||||
|
use nexosim::model::Model;
|
||||||
|
use nexosim::ports::{Output, Requestor};
|
||||||
|
|
||||||
|
/// A replier adaptor.
|
||||||
|
///
|
||||||
|
/// `ReplierAdaptor` generic model is aimed to connect pair of input/output
|
||||||
|
/// ports to a replier ports.
|
||||||
|
///
|
||||||
|
/// Model input is propagated to all the connected replier ports and their
|
||||||
|
/// answers are written to the model output.
|
||||||
|
pub struct ReplierAdaptor<T: Clone + Send + 'static, R: Clone + Send + 'static> {
|
||||||
|
/// Requestor port to be connected to replier port.
|
||||||
|
pub requestor: Requestor<T, R>,
|
||||||
|
|
||||||
|
/// Output port to be connected to input port.
|
||||||
|
pub output: Output<R>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone + Send + 'static, R: Clone + Send + 'static> ReplierAdaptor<T, R> {
|
||||||
|
/// Creates a `ReplierAdaptor` model.
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Input port.
|
||||||
|
pub async fn input(&mut self, data: T) {
|
||||||
|
for res in self.requestor.send(data).await {
|
||||||
|
self.output.send(res).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone + Send + 'static, R: Clone + Send + 'static> Model for ReplierAdaptor<T, R> {}
|
||||||
|
|
||||||
|
impl<T: Clone + Send + 'static, R: Clone + Send + 'static> Default for ReplierAdaptor<T, R> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
requestor: Requestor::new(),
|
||||||
|
output: Output::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1 +1,2 @@
|
|||||||
|
pub mod combinators;
|
||||||
pub mod observables;
|
pub mod observables;
|
||||||
|
@ -33,7 +33,7 @@ pub(super) struct MessageBorrow<'a, T: ?Sized> {
|
|||||||
stamp: usize,
|
stamp: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: ?Sized> Deref for MessageBorrow<'a, T> {
|
impl<T: ?Sized> Deref for MessageBorrow<'_, T> {
|
||||||
type Target = T;
|
type Target = T;
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
@ -41,13 +41,13 @@ impl<'a, T: ?Sized> Deref for MessageBorrow<'a, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: ?Sized> DerefMut for MessageBorrow<'a, T> {
|
impl<T: ?Sized> DerefMut for MessageBorrow<'_, T> {
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
&mut self.msg
|
&mut self.msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: ?Sized> Drop for MessageBorrow<'a, T> {
|
impl<T: ?Sized> Drop for MessageBorrow<'_, T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let slot = &self.queue.buffer[self.index];
|
let slot = &self.queue.buffer[self.index];
|
||||||
|
|
||||||
@ -67,7 +67,7 @@ impl<'a, T: ?Sized> Drop for MessageBorrow<'a, T> {
|
|||||||
slot.stamp.store(self.stamp, Ordering::Release);
|
slot.stamp.store(self.stamp, Ordering::Release);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<'a, M> fmt::Debug for MessageBorrow<'a, M> {
|
impl<M> fmt::Debug for MessageBorrow<'_, M> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.debug_struct("MessageBorrow").finish_non_exhaustive()
|
f.debug_struct("MessageBorrow").finish_non_exhaustive()
|
||||||
}
|
}
|
||||||
|
@ -315,7 +315,7 @@ where
|
|||||||
///
|
///
|
||||||
/// An arbitrary tag can be attached to the task, a clone of which will be
|
/// An arbitrary tag can be attached to the task, a clone of which will be
|
||||||
/// passed to the scheduling function each time it is called.
|
/// passed to the scheduling function each time it is called.
|
||||||
|
///
|
||||||
/// The returned `Runnable` must be scheduled by the user.
|
/// The returned `Runnable` must be scheduled by the user.
|
||||||
pub(crate) fn spawn<F, S, T>(
|
pub(crate) fn spawn<F, S, T>(
|
||||||
future: F,
|
future: F,
|
||||||
|
@ -70,7 +70,6 @@ use super::{Model, ProtoModel};
|
|||||||
/// }
|
/// }
|
||||||
/// impl Model for DelayedGreeter {}
|
/// impl Model for DelayedGreeter {}
|
||||||
/// ```
|
/// ```
|
||||||
|
|
||||||
// The self-scheduling caveat seems related to this issue:
|
// The self-scheduling caveat seems related to this issue:
|
||||||
// https://github.com/rust-lang/rust/issues/78649
|
// https://github.com/rust-lang/rust/issues/78649
|
||||||
pub struct Context<M: Model> {
|
pub struct Context<M: Model> {
|
||||||
|
@ -350,7 +350,7 @@ impl<'a, R> BroadcastFuture<'a, R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, R> Drop for BroadcastFuture<'a, R> {
|
impl<R> Drop for BroadcastFuture<'_, R> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Safety: this is safe since `self.futures` is never accessed after it
|
// Safety: this is safe since `self.futures` is never accessed after it
|
||||||
// is moved out.
|
// is moved out.
|
||||||
@ -361,7 +361,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, R> Future for BroadcastFuture<'a, R> {
|
impl<R> Future for BroadcastFuture<'_, R> {
|
||||||
type Output = Result<(), SendError>;
|
type Output = Result<(), SendError>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
@ -736,7 +736,7 @@ impl<'a, T> RecycledFuture<'a, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> Drop for RecycledFuture<'a, T> {
|
impl<T> Drop for RecycledFuture<'_, T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Return the box to the lender.
|
// Return the box to the lender.
|
||||||
//
|
//
|
||||||
@ -747,7 +747,7 @@ impl<'a, T> Drop for RecycledFuture<'a, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> Future for RecycledFuture<'a, T> {
|
impl<T> Future for RecycledFuture<'_, T> {
|
||||||
type Output = T;
|
type Output = T;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
@ -42,7 +42,6 @@ const COUNTDOWN_ONE: u64 = 1 << 32;
|
|||||||
/// more than 1 to wake the parent task less frequently. For instance, if
|
/// more than 1 to wake the parent task less frequently. For instance, if
|
||||||
/// `notify_count` is set to the number of pending sub-tasks, the parent task
|
/// `notify_count` is set to the number of pending sub-tasks, the parent task
|
||||||
/// will only be woken once all subtasks have been woken.
|
/// will only be woken once all subtasks have been woken.
|
||||||
|
|
||||||
pub(crate) struct TaskSet {
|
pub(crate) struct TaskSet {
|
||||||
/// Set of all tasks, scheduled or not.
|
/// Set of all tasks, scheduled or not.
|
||||||
///
|
///
|
||||||
@ -355,7 +354,7 @@ pub(crate) struct TaskIterator<'a> {
|
|||||||
next_index: u32,
|
next_index: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Iterator for TaskIterator<'a> {
|
impl Iterator for TaskIterator<'_> {
|
||||||
type Item = usize;
|
type Item = usize;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
@ -380,7 +379,7 @@ impl<'a> Iterator for TaskIterator<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Drop for TaskIterator<'a> {
|
impl Drop for TaskIterator<'_> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Put all remaining scheduled tasks in the sleeping state.
|
// Put all remaining scheduled tasks in the sleeping state.
|
||||||
//
|
//
|
||||||
|
Loading…
x
Reference in New Issue
Block a user