From 0274e62eb0fb89c6d8a0501906c73197d0e007d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Fri, 6 Dec 2024 15:29:18 +0100 Subject: [PATCH 1/3] Add replier adaptor --- nexosim-util/examples/replier_adaptor.rs | 276 +++++++++++++++++++++++ nexosim-util/src/combinators.rs | 54 +++++ nexosim-util/src/lib.rs | 1 + 3 files changed, 331 insertions(+) create mode 100644 nexosim-util/examples/replier_adaptor.rs create mode 100644 nexosim-util/src/combinators.rs diff --git a/nexosim-util/examples/replier_adaptor.rs b/nexosim-util/examples/replier_adaptor.rs new file mode 100644 index 0000000..d0fca58 --- /dev/null +++ b/nexosim-util/examples/replier_adaptor.rs @@ -0,0 +1,276 @@ +//! Example: RIU acquiring data from sensor. +//! +//! This example demonstrates in particular: +//! +//! * the use of replier port adaptor, +//! * periodic model self-scheduling. +//! +//! ```text +//! ┌────────┐ ┌─────────┐ Sensor TC ┌─────┐ +//! Set temperature ●────►│ │ ◄Sensor TC │ │◄────────────┤ │ +//! │ Sensor │◄►────────────►◄│ Adaptor │ Sensor TM │ RIU ├────► RIU TM +//! Set illuminance ●────►│ │ Sensor TM► │ ├────────────►│ │ +//! └────────┘ └─────────┘ └─────┘ +//! ``` + +use std::fmt::Debug; +use std::time::Duration; + +use nexosim::model::{Context, InitializedModel, Model}; +use nexosim::ports::{EventBuffer, Output}; +use nexosim::simulation::{Mailbox, SimInit, SimulationError}; +use nexosim::time::MonotonicTime; +use nexosim_util::combinators::ReplierAdaptor; + +const DELTA: Duration = Duration::from_millis(2); +const PERIOD: Duration = Duration::from_secs(1); + +/// Sensor TC. +#[derive(Clone, Debug, PartialEq)] +pub enum SensorTc { + GetTemp, + GetIllum, +} + +/// Sensor TM. +#[derive(Clone, Debug, PartialEq)] +pub enum SensorTm { + Temp(f64), + Illum(f64), +} + +/// Sensor model. +pub struct Sensor { + /// Temperature [deg C] -- internal state. + temp: f64, + + /// Illuminance [lx] -- internal state. + illum: f64, +} + +impl Sensor { + /// Creates a sensor model. + pub fn new() -> Self { + Self { + temp: 0.0, + illum: 0.0, + } + } + + /// Sets sensor temperature [deg C]. + pub async fn set_temp(&mut self, temp: f64) { + self.temp = temp; + } + + /// Sets sensor illuminance [lx]. + pub async fn set_illum(&mut self, illum: f64) { + self.illum = illum; + } + + /// Processes sensor TC -- input port. + pub async fn process_tc(&mut self, tc: SensorTc) -> SensorTm { + match tc { + SensorTc::GetTemp => SensorTm::Temp(self.temp), + SensorTc::GetIllum => SensorTm::Illum(self.illum), + } + } +} + +impl Model for Sensor {} + +/// Internal TM field. +#[derive(Clone, Debug, PartialEq)] +pub struct TmField +where + T: Clone + Debug + PartialEq, +{ + /// TM value. + pub value: T, + + /// TM readiness flag. + pub ready: bool, +} + +/// RIU TM. +#[derive(Clone, Debug, PartialEq)] +pub struct RiuTm { + /// Temperature [deg C]. + temp: f64, + + /// Iluminance [lx]. + illum: f64, +} + +/// RIU model. +pub struct Riu { + /// Sensor TC -- output port. + pub sensor_tc: Output, + + /// RIU TM -- output port. + pub tm: Output, + + /// Temperature [deg C] -- internal state. + temp: TmField, + + /// Illuminance [lx] -- internal state. + illum: TmField, +} + +impl Riu { + /// Creates an RIU model. + pub fn new() -> Self { + Self { + sensor_tc: Output::new(), + tm: Output::new(), + temp: TmField { + value: 0.0, + ready: true, + }, + illum: TmField { + value: 0.0, + ready: true, + }, + } + } + + /// Processes sensor TM -- input port. + pub async fn sensor_tm(&mut self, tm: SensorTm) { + match tm { + SensorTm::Temp(temp) => { + self.temp = TmField { + value: temp, + ready: true, + } + } + SensorTm::Illum(illum) => { + self.illum = TmField { + value: illum, + ready: true, + } + } + } + + if self.temp.ready && self.illum.ready { + self.report().await + } + } + + /// Starts sensor TM acquisition -- periodic activity. + async fn acquire(&mut self) { + self.temp.ready = false; + self.illum.ready = false; + self.sensor_tc.send(SensorTc::GetTemp).await; + self.sensor_tc.send(SensorTc::GetIllum).await + } + + /// Reports RIU TM. + async fn report(&mut self) { + self.tm + .send(RiuTm { + temp: self.temp.value, + illum: self.illum.value, + }) + .await + } +} + +impl Model for Riu { + /// Initializes model. + async fn init(self, cx: &mut Context) -> InitializedModel { + // Schedule periodic acquisition. + cx.schedule_periodic_event(DELTA, PERIOD, Riu::acquire, ()) + .unwrap(); + + self.into() + } +} + +fn main() -> Result<(), SimulationError> { + // --------------- + // Bench assembly. + // --------------- + + // Models. + let sensor = Sensor::new(); + let mut riu = Riu::new(); + let mut sensor_adaptor = ReplierAdaptor::new(); + + // Mailboxes. + let sensor_mbox = Mailbox::new(); + let riu_mbox = Mailbox::new(); + let sensor_adaptor_mbox = Mailbox::new(); + + // Connections. + riu.sensor_tc + .connect(ReplierAdaptor::input, &sensor_adaptor_mbox); + sensor_adaptor.output.connect(Riu::sensor_tm, &riu_mbox); + sensor_adaptor + .requestor + .connect(Sensor::process_tc, &sensor_mbox); + + // Model handles for simulation. + let mut tm = EventBuffer::new(); + let sensor_addr = sensor_mbox.address(); + + riu.tm.connect_sink(&tm); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let mut simu = SimInit::new() + .add_model(sensor, sensor_mbox, "sensor") + .add_model(riu, riu_mbox, "riu") + .add_model(sensor_adaptor, sensor_adaptor_mbox, "sensor_adaptor") + .init(t0)? + .0; + + // ---------- + // Simulation. + // ---------- + + // Initial state: no RIU TM. + assert_eq!(tm.next(), None); + + simu.step_until(Duration::from_millis(1200))?; + + // RIU TM generated. + assert_eq!( + tm.next(), + Some(RiuTm { + temp: 0.0, + illum: 0.0 + }) + ); + + // Consume all RIU TM generated so far. + while tm.next().is_some() {} + + // Set temperature and wait for RIU TM. + simu.process_event(Sensor::set_temp, 2.0, &sensor_addr)?; + + simu.step_until(Duration::from_millis(1000))?; + + assert_eq!( + tm.next(), + Some(RiuTm { + temp: 2.0, + illum: 0.0 + }) + ); + + // Set illuminance and wait for RIU TM. + simu.process_event(Sensor::set_illum, 3.0, &sensor_addr)?; + + simu.step_until(Duration::from_millis(1000))?; + + assert_eq!( + tm.next(), + Some(RiuTm { + temp: 2.0, + illum: 3.0 + }) + ); + + Ok(()) +} diff --git a/nexosim-util/src/combinators.rs b/nexosim-util/src/combinators.rs new file mode 100644 index 0000000..b8d6e19 --- /dev/null +++ b/nexosim-util/src/combinators.rs @@ -0,0 +1,54 @@ +//! Connector combinators. +//! +//! This module contains combinator types useful for simulation bench assembly. +//! + +use nexosim::model::Model; +use nexosim::ports::{Output, Requestor}; + +/// A replier adaptor. +/// +/// `ReplierAdaptor` generic model is aimed to connect pair of input/output +/// ports to a replier ports. +/// +/// Model input is propagated to all the connected replier ports and their +/// answers are written to the model output. +pub struct ReplierAdaptor +where + T: Clone + Send + 'static, + R: Clone + Send + 'static, +{ + /// Requestor port to be connected to replier port. + pub requestor: Requestor, + + /// Output port to be connected to input port. + pub output: Output, +} + +impl ReplierAdaptor +where + T: Clone + Send + 'static, + R: Clone + Send + 'static, +{ + /// Creates a `ReplierAdaptor` model. + pub fn new() -> Self { + Self { + requestor: Requestor::new(), + output: Output::new(), + } + } + + /// Input port. + pub async fn input(&mut self, data: T) { + for res in self.requestor.send(data).await { + self.output.send(res).await; + } + } +} + +impl Model for ReplierAdaptor +where + T: Clone + Send + 'static, + R: Clone + Send + 'static, +{ +} diff --git a/nexosim-util/src/lib.rs b/nexosim-util/src/lib.rs index 4010ef7..40bd6f4 100644 --- a/nexosim-util/src/lib.rs +++ b/nexosim-util/src/lib.rs @@ -1 +1,2 @@ +pub mod combinators; pub mod observables; From 97c855293df3c7755e8c8417ec64fc56c9ca4e9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Fri, 6 Dec 2024 15:50:11 +0100 Subject: [PATCH 2/3] Implement Default for ReplierAdaptor --- nexosim-util/src/combinators.rs | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/nexosim-util/src/combinators.rs b/nexosim-util/src/combinators.rs index b8d6e19..89278e1 100644 --- a/nexosim-util/src/combinators.rs +++ b/nexosim-util/src/combinators.rs @@ -13,11 +13,7 @@ use nexosim::ports::{Output, Requestor}; /// /// Model input is propagated to all the connected replier ports and their /// answers are written to the model output. -pub struct ReplierAdaptor -where - T: Clone + Send + 'static, - R: Clone + Send + 'static, -{ +pub struct ReplierAdaptor { /// Requestor port to be connected to replier port. pub requestor: Requestor, @@ -25,17 +21,10 @@ where pub output: Output, } -impl ReplierAdaptor -where - T: Clone + Send + 'static, - R: Clone + Send + 'static, -{ +impl ReplierAdaptor { /// Creates a `ReplierAdaptor` model. pub fn new() -> Self { - Self { - requestor: Requestor::new(), - output: Output::new(), - } + Self::default() } /// Input port. @@ -46,9 +35,13 @@ where } } -impl Model for ReplierAdaptor -where - T: Clone + Send + 'static, - R: Clone + Send + 'static, -{ +impl Model for ReplierAdaptor {} + +impl Default for ReplierAdaptor { + fn default() -> Self { + Self { + requestor: Requestor::new(), + output: Output::new(), + } + } } From d63bcdf4f08ec933649531dc7352ad1461c3d30d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Mon, 9 Dec 2024 15:01:21 +0100 Subject: [PATCH 3/3] Fix clippy warnings after version update --- nexosim/src/channel/queue.rs | 8 ++++---- nexosim/src/executor/task.rs | 2 +- nexosim/src/model/context.rs | 1 - nexosim/src/ports/output/broadcaster.rs | 4 ++-- nexosim/src/ports/output/sender.rs | 4 ++-- nexosim/src/util/task_set.rs | 5 ++--- 6 files changed, 11 insertions(+), 13 deletions(-) diff --git a/nexosim/src/channel/queue.rs b/nexosim/src/channel/queue.rs index 7876c3e..57747de 100644 --- a/nexosim/src/channel/queue.rs +++ b/nexosim/src/channel/queue.rs @@ -33,7 +33,7 @@ pub(super) struct MessageBorrow<'a, T: ?Sized> { stamp: usize, } -impl<'a, T: ?Sized> Deref for MessageBorrow<'a, T> { +impl Deref for MessageBorrow<'_, T> { type Target = T; fn deref(&self) -> &Self::Target { @@ -41,13 +41,13 @@ impl<'a, T: ?Sized> Deref for MessageBorrow<'a, T> { } } -impl<'a, T: ?Sized> DerefMut for MessageBorrow<'a, T> { +impl DerefMut for MessageBorrow<'_, T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.msg } } -impl<'a, T: ?Sized> Drop for MessageBorrow<'a, T> { +impl Drop for MessageBorrow<'_, T> { fn drop(&mut self) { let slot = &self.queue.buffer[self.index]; @@ -67,7 +67,7 @@ impl<'a, T: ?Sized> Drop for MessageBorrow<'a, T> { slot.stamp.store(self.stamp, Ordering::Release); } } -impl<'a, M> fmt::Debug for MessageBorrow<'a, M> { +impl fmt::Debug for MessageBorrow<'_, M> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("MessageBorrow").finish_non_exhaustive() } diff --git a/nexosim/src/executor/task.rs b/nexosim/src/executor/task.rs index 3b3bd5f..cbec651 100644 --- a/nexosim/src/executor/task.rs +++ b/nexosim/src/executor/task.rs @@ -315,7 +315,7 @@ where /// /// An arbitrary tag can be attached to the task, a clone of which will be /// passed to the scheduling function each time it is called. - +/// /// The returned `Runnable` must be scheduled by the user. pub(crate) fn spawn( future: F, diff --git a/nexosim/src/model/context.rs b/nexosim/src/model/context.rs index e71b7a4..2d593e6 100644 --- a/nexosim/src/model/context.rs +++ b/nexosim/src/model/context.rs @@ -70,7 +70,6 @@ use super::{Model, ProtoModel}; /// } /// impl Model for DelayedGreeter {} /// ``` - // The self-scheduling caveat seems related to this issue: // https://github.com/rust-lang/rust/issues/78649 pub struct Context { diff --git a/nexosim/src/ports/output/broadcaster.rs b/nexosim/src/ports/output/broadcaster.rs index ee09f54..e7d7470 100644 --- a/nexosim/src/ports/output/broadcaster.rs +++ b/nexosim/src/ports/output/broadcaster.rs @@ -350,7 +350,7 @@ impl<'a, R> BroadcastFuture<'a, R> { } } -impl<'a, R> Drop for BroadcastFuture<'a, R> { +impl Drop for BroadcastFuture<'_, R> { fn drop(&mut self) { // Safety: this is safe since `self.futures` is never accessed after it // is moved out. @@ -361,7 +361,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> { } } -impl<'a, R> Future for BroadcastFuture<'a, R> { +impl Future for BroadcastFuture<'_, R> { type Output = Result<(), SendError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/nexosim/src/ports/output/sender.rs b/nexosim/src/ports/output/sender.rs index ce85587..933cbf3 100644 --- a/nexosim/src/ports/output/sender.rs +++ b/nexosim/src/ports/output/sender.rs @@ -736,7 +736,7 @@ impl<'a, T> RecycledFuture<'a, T> { } } -impl<'a, T> Drop for RecycledFuture<'a, T> { +impl Drop for RecycledFuture<'_, T> { fn drop(&mut self) { // Return the box to the lender. // @@ -747,7 +747,7 @@ impl<'a, T> Drop for RecycledFuture<'a, T> { } } -impl<'a, T> Future for RecycledFuture<'a, T> { +impl Future for RecycledFuture<'_, T> { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/nexosim/src/util/task_set.rs b/nexosim/src/util/task_set.rs index 3ad9b4f..c8f0da4 100644 --- a/nexosim/src/util/task_set.rs +++ b/nexosim/src/util/task_set.rs @@ -42,7 +42,6 @@ const COUNTDOWN_ONE: u64 = 1 << 32; /// more than 1 to wake the parent task less frequently. For instance, if /// `notify_count` is set to the number of pending sub-tasks, the parent task /// will only be woken once all subtasks have been woken. - pub(crate) struct TaskSet { /// Set of all tasks, scheduled or not. /// @@ -355,7 +354,7 @@ pub(crate) struct TaskIterator<'a> { next_index: u32, } -impl<'a> Iterator for TaskIterator<'a> { +impl Iterator for TaskIterator<'_> { type Item = usize; fn next(&mut self) -> Option { @@ -380,7 +379,7 @@ impl<'a> Iterator for TaskIterator<'a> { } } -impl<'a> Drop for TaskIterator<'a> { +impl Drop for TaskIterator<'_> { fn drop(&mut self) { // Put all remaining scheduled tasks in the sleeping state. //