From 8a1a6cf35405dfbb56fe262d4223a472f654e5d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Fri, 24 Jan 2025 09:22:20 +0100 Subject: [PATCH 1/2] Add blocking event queue. --- nexosim-util/Cargo.toml | 3 + nexosim-util/examples/observables.rs | 3 +- nexosim-util/examples/simulation_driven.rs | 274 ++++++++++++++++++ nexosim-util/src/helper_models.rs | 34 +++ nexosim-util/src/lib.rs | 1 + nexosim/src/ports.rs | 5 +- nexosim/src/ports/sink.rs | 1 + .../src/ports/sink/blocking_event_queue.rs | 143 +++++++++ 8 files changed, 462 insertions(+), 2 deletions(-) create mode 100644 nexosim-util/examples/simulation_driven.rs create mode 100644 nexosim-util/src/helper_models.rs create mode 100644 nexosim/src/ports/sink/blocking_event_queue.rs diff --git a/nexosim-util/Cargo.toml b/nexosim-util/Cargo.toml index 3c6f541..33b2c2c 100644 --- a/nexosim-util/Cargo.toml +++ b/nexosim-util/Cargo.toml @@ -5,3 +5,6 @@ edition = "2021" [dependencies] nexosim = { version = "0.3.0", path = "../nexosim" } + +[dev-dependencies] +rand = "0.8" diff --git a/nexosim-util/examples/observables.rs b/nexosim-util/examples/observables.rs index c80f2b0..868c9a8 100644 --- a/nexosim-util/examples/observables.rs +++ b/nexosim-util/examples/observables.rs @@ -40,8 +40,9 @@ impl Default for Hk { } /// Processor mode ID. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub enum ModeId { + #[default] Off, Idle, Processing, diff --git a/nexosim-util/examples/simulation_driven.rs b/nexosim-util/examples/simulation_driven.rs new file mode 100644 index 0000000..0b4f4e4 --- /dev/null +++ b/nexosim-util/examples/simulation_driven.rs @@ -0,0 +1,274 @@ +//! Example: a simulation that runs infinitely until stopped. This setup is +//! typical for hardware-in-the-loop use case. The test scenario is driven by +//! simulation events. +//! +//! This example demonstrates in particular: +//! +//! * infinite simulation, +//! * blocking event queue, +//! * simulation halting, +//! * system clock, +//! * periodic scheduling, +//! * observable state. +//! +//! ```text +//! ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +//! ┃ Simulation ┃ +//! ┃ ┌──────────┐ ┌──────────┐mode ┃ +//! ┃ │ │pulses │ ├──────╂┐BlockingEventQueue +//! ┃ │ Detector ├──────►│ Counter │count ┃├───────────────────► +//! ┃ │ │ │ ├──────╂┘ +//! ┃ └──────────┘ └──────────┘ ┃ +//! ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ +//! ``` + +use std::future::Future; +use std::thread; +use std::time::Duration; + +use rand::Rng; + +use nexosim::model::{Context, Model}; +use nexosim::ports::{BlockingEventQueue, Output}; +use nexosim::simulation::{ActionKey, ExecutionError, Mailbox, SimInit, SimulationError}; +use nexosim::time::{AutoSystemClock, MonotonicTime}; +use nexosim_util::helper_models::Ticker; +use nexosim_util::observables::ObservableValue; + +const SWITCH_ON_DELAY: Duration = Duration::from_secs(1); +const MAX_PULSE_PERIOD: u64 = 100; +const TICK: Duration = Duration::from_millis(100); +const N: u64 = 10; + +/// Counter mode. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum Mode { + #[default] + Off, + On, +} + +/// Simulation event. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Event { + Mode(Mode), + Count(u64), +} + +/// The `Counter` Model. +pub struct Counter { + /// Operation mode. + pub mode: Output, + + /// Pulses count. + pub count: Output, + + /// Internal state. + state: ObservableValue, + + /// Counter. + acc: ObservableValue, +} + +impl Counter { + /// Creates new `Counter` model. + fn new() -> Self { + let mode = Output::default(); + let count = Output::default(); + Self { + mode: mode.clone(), + count: count.clone(), + state: ObservableValue::new(mode), + acc: ObservableValue::new(count), + } + } + + /// Power -- input port. + pub async fn power_in(&mut self, on: bool, cx: &mut Context) { + match *self.state { + Mode::Off if on => cx + .schedule_event(SWITCH_ON_DELAY, Self::switch_on, ()) + .unwrap(), + Mode::On if !on => self.switch_off().await, + _ => (), + }; + } + + /// Pulse -- input port. + pub async fn pulse(&mut self) { + self.acc.modify(|x| *x += 1).await; + } + + /// Switches `Counter` on. + async fn switch_on(&mut self) { + self.state.set(Mode::On).await; + } + + /// Switches `Counter` off. + async fn switch_off(&mut self) { + self.state.set(Mode::Off).await; + } +} + +impl Model for Counter {} + +/// Detector model that produces pulses. +pub struct Detector { + /// Output pulse. + pub pulse: Output<()>, + + /// `ActionKey` of the next scheduled detection. + next: Option, +} + +impl Detector { + /// Creates new `Detector` model. + pub fn new() -> Self { + Self { + pulse: Output::default(), + next: None, + } + } + + /// Switches `Detector` on -- input port. + pub async fn switch_on(&mut self, _: (), cx: &mut Context) { + self.schedule_next(cx).await; + } + + /// Switches `Detector` off -- input port. + pub async fn switch_off(&mut self) { + self.next = None; + } + + /// Generates pulse. + /// + /// Note: self-scheduling async methods must be for now defined with an + /// explicit signature instead of `async fn` due to a rustc issue. + fn pulse<'a>( + &'a mut self, + _: (), + cx: &'a mut Context, + ) -> impl Future + Send + 'a { + async move { + self.pulse.send(()).await; + self.schedule_next(cx).await; + } + } + + /// Schedules next detection. + async fn schedule_next(&mut self, cx: &mut Context) { + let next = { + let mut rng = rand::thread_rng(); + rng.gen_range(1..MAX_PULSE_PERIOD) + }; + self.next = Some( + cx.schedule_keyed_event(Duration::from_millis(next), Self::pulse, ()) + .unwrap(), + ); + } +} + +impl Model for Detector {} + +fn main() -> Result<(), SimulationError> { + // --------------- + // Bench assembly. + // --------------- + + // Models. + + // The detector model that produces pulses. + let mut detector = Detector::new(); + + // The counter model. + let mut counter = Counter::new(); + + // The ticker model that keeps simulation alive. + let ticker = Ticker::new(TICK); + + // Mailboxes. + let detector_mbox = Mailbox::new(); + let counter_mbox = Mailbox::new(); + let ticker_mbox = Mailbox::new(); + + // Connections. + detector.pulse.connect(Counter::pulse, &counter_mbox); + + // Model handles for simulation. + let detector_addr = detector_mbox.address(); + let counter_addr = counter_mbox.address(); + let observer = BlockingEventQueue::new(); + counter + .mode + .map_connect_sink(|m| Event::Mode(*m), &observer); + counter + .count + .map_connect_sink(|c| Event::Count(*c), &observer); + let mut observer = observer.reader(); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let (mut simu, mut scheduler) = SimInit::new() + .add_model(detector, detector_mbox, "detector") + .add_model(counter, counter_mbox, "counter") + .add_model(ticker, ticker_mbox, "ticker") + .set_clock(AutoSystemClock::new()) + .init(t0)?; + + // Simulation thread. + let simulation_handle = thread::spawn(move || { + // ---------- Simulation. ---------- + // Infinitely kept alive by the ticker model until halted. + simu.step_unbounded() + }); + + // Switch the counter on. + scheduler.schedule_event( + Duration::from_millis(1), + Counter::power_in, + true, + counter_addr, + )?; + + // Wait until counter mode is `On`. + loop { + let event = observer.next(); + match event { + Some(Event::Mode(Mode::On)) => { + break; + } + None => panic!("Simulation exited unexpectedly"), + _ => (), + } + } + + // Switch the detector on. + scheduler.schedule_event( + Duration::from_millis(100), + Detector::switch_on, + (), + detector_addr, + )?; + + // Wait until `N` detections. + loop { + let event = observer.next(); + match event { + Some(Event::Count(c)) if c >= N => { + break; + } + None => panic!("Simulation exited unexpectedly"), + _ => (), + } + } + + // Stop the simulation. + scheduler.halt(); + match simulation_handle.join().unwrap() { + Err(ExecutionError::Halted) => Ok(()), + Err(e) => Err(e.into()), + _ => Ok(()), + } +} diff --git a/nexosim-util/src/helper_models.rs b/nexosim-util/src/helper_models.rs new file mode 100644 index 0000000..c9a8d80 --- /dev/null +++ b/nexosim-util/src/helper_models.rs @@ -0,0 +1,34 @@ +//! Helper models. +//! +//! This module contains helper models useful for simulation bench assembly. +//! + +use std::time::Duration; + +use nexosim::model::{Context, InitializedModel, Model}; + +/// A ticker model. +/// +/// This model self-schedules with a provided period keeping simulation alive. +pub struct Ticker { + /// Tick period in milliseconds. + tick: Duration, +} + +impl Ticker { + /// Creates new `Ticker` with provided tick period in milliseconds. + pub fn new(tick: Duration) -> Self { + Self { tick } + } + + /// Self-scheduled function. + pub async fn tick(&mut self) {} +} + +impl Model for Ticker { + async fn init(self, cx: &mut Context) -> InitializedModel { + cx.schedule_periodic_event(self.tick, self.tick, Self::tick, ()) + .unwrap(); + self.into() + } +} diff --git a/nexosim-util/src/lib.rs b/nexosim-util/src/lib.rs index 40bd6f4..3c34f42 100644 --- a/nexosim-util/src/lib.rs +++ b/nexosim-util/src/lib.rs @@ -1,2 +1,3 @@ pub mod combinators; +pub mod helper_models; pub mod observables; diff --git a/nexosim/src/ports.rs b/nexosim/src/ports.rs index c7b55f7..7a3cb95 100644 --- a/nexosim/src/ports.rs +++ b/nexosim/src/ports.rs @@ -273,6 +273,9 @@ pub use input::markers; pub use input::{InputFn, ReplierFn}; pub use output::{Output, Requestor, UniRequestor}; pub use sink::{ - event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter, + blocking_event_queue::{BlockingEventQueue, BlockingEventQueueReader}, + event_buffer::EventBuffer, + event_slot::EventSlot, + EventSink, EventSinkStream, EventSinkWriter, }; pub use source::{EventSource, QuerySource, ReplyReceiver}; diff --git a/nexosim/src/ports/sink.rs b/nexosim/src/ports/sink.rs index b0b6743..17660ff 100644 --- a/nexosim/src/ports/sink.rs +++ b/nexosim/src/ports/sink.rs @@ -1,3 +1,4 @@ +pub(crate) mod blocking_event_queue; pub(crate) mod event_buffer; pub(crate) mod event_slot; diff --git a/nexosim/src/ports/sink/blocking_event_queue.rs b/nexosim/src/ports/sink/blocking_event_queue.rs new file mode 100644 index 0000000..a712ab9 --- /dev/null +++ b/nexosim/src/ports/sink/blocking_event_queue.rs @@ -0,0 +1,143 @@ +use std::fmt; +use std::iter::FusedIterator; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Arc; + +use super::{EventSink, EventSinkStream, EventSinkWriter}; + +/// A blocking event queue with an unbounded size. +/// +/// Implements [`EventSink`], while [`EventSinkStream`] is implemented by +/// [`BlockingEventQueueReader`] available through the +/// [`BlockingEventQueue::reader`] method. +pub struct BlockingEventQueue { + is_open: Arc, + sender: Sender, + receiver: Receiver, +} + +impl BlockingEventQueue { + /// Creates an open `BlockingEventQueue`. + pub fn new() -> Self { + Self::new_with_open_state(true) + } + + /// Creates a closed `BlockingEventQueue`. + pub fn new_closed() -> Self { + Self::new_with_open_state(false) + } + + /// Returns consumer handle. + pub fn reader(self) -> BlockingEventQueueReader { + BlockingEventQueueReader { + is_open: self.is_open, + receiver: self.receiver, + } + } + + /// Creates new `BlockingEventQueue` in a given state. + fn new_with_open_state(is_open: bool) -> Self { + let (sender, receiver) = channel(); + Self { + is_open: Arc::new(AtomicBool::new(is_open)), + sender, + receiver, + } + } +} + +impl EventSink for BlockingEventQueue { + type Writer = BlockingEventQueueWriter; + + fn writer(&self) -> Self::Writer { + BlockingEventQueueWriter { + is_open: self.is_open.clone(), + sender: self.sender.clone(), + } + } +} + +impl Default for BlockingEventQueue { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for BlockingEventQueue { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BlockingEventQueue").finish_non_exhaustive() + } +} + +/// A consumer handle to blocking event queue. +/// +/// Implements [`EventSinkStream`]. Call to iterator's `next` function is +/// blocking. `None` is returned when simulation is closed. +pub struct BlockingEventQueueReader { + is_open: Arc, + receiver: Receiver, +} + +impl Iterator for BlockingEventQueueReader { + type Item = T; + + fn next(&mut self) -> Option { + match self.receiver.recv() { + Ok(event) => Some(event), + Err(_) => None, + } + } +} + +impl FusedIterator for BlockingEventQueueReader {} + +impl EventSinkStream for BlockingEventQueueReader { + fn open(&mut self) { + self.is_open.store(true, Ordering::Relaxed); + } + + fn close(&mut self) { + self.is_open.store(false, Ordering::Relaxed); + } +} + +impl fmt::Debug for BlockingEventQueueReader { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BlockingEventQueueReader") + .finish_non_exhaustive() + } +} + +/// A producer handle of a `BlockingEventQueue`. +pub struct BlockingEventQueueWriter { + is_open: Arc, + sender: Sender, +} + +impl EventSinkWriter for BlockingEventQueueWriter { + /// Pushes an event onto the queue. + fn write(&self, event: T) { + if !self.is_open.load(Ordering::Relaxed) { + return; + } + // Ignore sending failure. + let _ = self.sender.send(event); + } +} + +impl Clone for BlockingEventQueueWriter { + fn clone(&self) -> Self { + Self { + is_open: self.is_open.clone(), + sender: self.sender.clone(), + } + } +} + +impl fmt::Debug for BlockingEventQueueWriter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BlockingEventQueueWriter") + .finish_non_exhaustive() + } +} From 7ff6f4c6c3b8a9fd5538ccbe48da19e075e9fd67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Tue, 28 Jan 2025 11:08:17 +0100 Subject: [PATCH 2/2] Changes after review. --- nexosim-util/examples/simulation_driven.rs | 10 +++---- nexosim-util/src/helper_models.rs | 9 ++++--- .../src/ports/sink/blocking_event_queue.rs | 26 ++++++++++--------- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/nexosim-util/examples/simulation_driven.rs b/nexosim-util/examples/simulation_driven.rs index 0b4f4e4..6a67378 100644 --- a/nexosim-util/examples/simulation_driven.rs +++ b/nexosim-util/examples/simulation_driven.rs @@ -71,7 +71,7 @@ pub struct Counter { } impl Counter { - /// Creates new `Counter` model. + /// Creates a new `Counter` model. fn new() -> Self { let mode = Output::default(); let count = Output::default(); @@ -122,7 +122,7 @@ pub struct Detector { } impl Detector { - /// Creates new `Detector` model. + /// Creates a new `Detector` model. pub fn new() -> Self { Self { pulse: Output::default(), @@ -140,7 +140,7 @@ impl Detector { self.next = None; } - /// Generates pulse. + /// Generates a pulse. /// /// Note: self-scheduling async methods must be for now defined with an /// explicit signature instead of `async fn` due to a rustc issue. @@ -155,7 +155,7 @@ impl Detector { } } - /// Schedules next detection. + /// Schedules the next detection. async fn schedule_next(&mut self, cx: &mut Context) { let next = { let mut rng = rand::thread_rng(); @@ -204,7 +204,7 @@ fn main() -> Result<(), SimulationError> { counter .count .map_connect_sink(|c| Event::Count(*c), &observer); - let mut observer = observer.reader(); + let mut observer = observer.into_reader(); // Start time (arbitrary since models do not depend on absolute time). let t0 = MonotonicTime::EPOCH; diff --git a/nexosim-util/src/helper_models.rs b/nexosim-util/src/helper_models.rs index c9a8d80..bc58367 100644 --- a/nexosim-util/src/helper_models.rs +++ b/nexosim-util/src/helper_models.rs @@ -9,20 +9,21 @@ use nexosim::model::{Context, InitializedModel, Model}; /// A ticker model. /// -/// This model self-schedules with a provided period keeping simulation alive. +/// This model self-schedules at the specified period, which can be used to keep +/// the simulation alive. pub struct Ticker { - /// Tick period in milliseconds. + /// Tick period. tick: Duration, } impl Ticker { - /// Creates new `Ticker` with provided tick period in milliseconds. + /// Creates a new `Ticker` with the specified self-scheduling period. pub fn new(tick: Duration) -> Self { Self { tick } } /// Self-scheduled function. - pub async fn tick(&mut self) {} + async fn tick(&mut self) {} } impl Model for Ticker { diff --git a/nexosim/src/ports/sink/blocking_event_queue.rs b/nexosim/src/ports/sink/blocking_event_queue.rs index a712ab9..91a719e 100644 --- a/nexosim/src/ports/sink/blocking_event_queue.rs +++ b/nexosim/src/ports/sink/blocking_event_queue.rs @@ -8,9 +8,11 @@ use super::{EventSink, EventSinkStream, EventSinkWriter}; /// A blocking event queue with an unbounded size. /// -/// Implements [`EventSink`], while [`EventSinkStream`] is implemented by -/// [`BlockingEventQueueReader`] available through the -/// [`BlockingEventQueue::reader`] method. +/// Implements [`EventSink`]. +/// +/// Note that [`EventSinkStream`] is implemented by +/// [`BlockingEventQueueReader`], created with the +/// [`BlockingEventQueue::into_reader`] method. pub struct BlockingEventQueue { is_open: Arc, sender: Sender, @@ -20,24 +22,24 @@ pub struct BlockingEventQueue { impl BlockingEventQueue { /// Creates an open `BlockingEventQueue`. pub fn new() -> Self { - Self::new_with_open_state(true) + Self::new_with_state(true) } /// Creates a closed `BlockingEventQueue`. pub fn new_closed() -> Self { - Self::new_with_open_state(false) + Self::new_with_state(false) } - /// Returns consumer handle. - pub fn reader(self) -> BlockingEventQueueReader { + /// Returns a consumer handle. + pub fn into_reader(self) -> BlockingEventQueueReader { BlockingEventQueueReader { is_open: self.is_open, receiver: self.receiver, } } - /// Creates new `BlockingEventQueue` in a given state. - fn new_with_open_state(is_open: bool) -> Self { + /// Creates a new `BlockingEventQueue` in the specified state. + fn new_with_state(is_open: bool) -> Self { let (sender, receiver) = channel(); Self { is_open: Arc::new(AtomicBool::new(is_open)), @@ -70,10 +72,10 @@ impl fmt::Debug for BlockingEventQueue { } } -/// A consumer handle to blocking event queue. +/// A consumer handle of a `BlockingEventQueue`. /// -/// Implements [`EventSinkStream`]. Call to iterator's `next` function is -/// blocking. `None` is returned when simulation is closed. +/// Implements [`EventSinkStream`]. Calls to the iterator's `next` method are +/// blocking. `None` is returned when all writer handles have been dropped. pub struct BlockingEventQueueReader { is_open: Arc, receiver: Receiver,