1
0
forked from ROMEO/nexosim

Merge pull request #82 from asynchronics/feature-blocking-event-queue

Add blocking event queue.
This commit is contained in:
Jauhien Piatlicki 2025-01-28 11:22:13 +01:00 committed by GitHub
commit 0804ba714e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 465 additions and 2 deletions

View File

@ -5,3 +5,6 @@ edition = "2021"
[dependencies]
nexosim = { version = "0.3.0", path = "../nexosim" }
[dev-dependencies]
rand = "0.8"

View File

@ -40,8 +40,9 @@ impl Default for Hk {
}
/// Processor mode ID.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum ModeId {
#[default]
Off,
Idle,
Processing,

View File

@ -0,0 +1,274 @@
//! Example: a simulation that runs infinitely until stopped. This setup is
//! typical for hardware-in-the-loop use case. The test scenario is driven by
//! simulation events.
//!
//! This example demonstrates in particular:
//!
//! * infinite simulation,
//! * blocking event queue,
//! * simulation halting,
//! * system clock,
//! * periodic scheduling,
//! * observable state.
//!
//! ```text
//! ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
//! ┃ Simulation ┃
//! ┃ ┌──────────┐ ┌──────────┐mode ┃
//! ┃ │ │pulses │ ├──────╂┐BlockingEventQueue
//! ┃ │ Detector ├──────►│ Counter │count ┃├───────────────────►
//! ┃ │ │ │ ├──────╂┘
//! ┃ └──────────┘ └──────────┘ ┃
//! ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
//! ```
use std::future::Future;
use std::thread;
use std::time::Duration;
use rand::Rng;
use nexosim::model::{Context, Model};
use nexosim::ports::{BlockingEventQueue, Output};
use nexosim::simulation::{ActionKey, ExecutionError, Mailbox, SimInit, SimulationError};
use nexosim::time::{AutoSystemClock, MonotonicTime};
use nexosim_util::helper_models::Ticker;
use nexosim_util::observables::ObservableValue;
const SWITCH_ON_DELAY: Duration = Duration::from_secs(1);
const MAX_PULSE_PERIOD: u64 = 100;
const TICK: Duration = Duration::from_millis(100);
const N: u64 = 10;
/// Counter mode.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum Mode {
#[default]
Off,
On,
}
/// Simulation event.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Event {
Mode(Mode),
Count(u64),
}
/// The `Counter` Model.
pub struct Counter {
/// Operation mode.
pub mode: Output<Mode>,
/// Pulses count.
pub count: Output<u64>,
/// Internal state.
state: ObservableValue<Mode>,
/// Counter.
acc: ObservableValue<u64>,
}
impl Counter {
/// Creates a new `Counter` model.
fn new() -> Self {
let mode = Output::default();
let count = Output::default();
Self {
mode: mode.clone(),
count: count.clone(),
state: ObservableValue::new(mode),
acc: ObservableValue::new(count),
}
}
/// Power -- input port.
pub async fn power_in(&mut self, on: bool, cx: &mut Context<Self>) {
match *self.state {
Mode::Off if on => cx
.schedule_event(SWITCH_ON_DELAY, Self::switch_on, ())
.unwrap(),
Mode::On if !on => self.switch_off().await,
_ => (),
};
}
/// Pulse -- input port.
pub async fn pulse(&mut self) {
self.acc.modify(|x| *x += 1).await;
}
/// Switches `Counter` on.
async fn switch_on(&mut self) {
self.state.set(Mode::On).await;
}
/// Switches `Counter` off.
async fn switch_off(&mut self) {
self.state.set(Mode::Off).await;
}
}
impl Model for Counter {}
/// Detector model that produces pulses.
pub struct Detector {
/// Output pulse.
pub pulse: Output<()>,
/// `ActionKey` of the next scheduled detection.
next: Option<ActionKey>,
}
impl Detector {
/// Creates a new `Detector` model.
pub fn new() -> Self {
Self {
pulse: Output::default(),
next: None,
}
}
/// Switches `Detector` on -- input port.
pub async fn switch_on(&mut self, _: (), cx: &mut Context<Self>) {
self.schedule_next(cx).await;
}
/// Switches `Detector` off -- input port.
pub async fn switch_off(&mut self) {
self.next = None;
}
/// Generates a pulse.
///
/// Note: self-scheduling async methods must be for now defined with an
/// explicit signature instead of `async fn` due to a rustc issue.
fn pulse<'a>(
&'a mut self,
_: (),
cx: &'a mut Context<Self>,
) -> impl Future<Output = ()> + Send + 'a {
async move {
self.pulse.send(()).await;
self.schedule_next(cx).await;
}
}
/// Schedules the next detection.
async fn schedule_next(&mut self, cx: &mut Context<Self>) {
let next = {
let mut rng = rand::thread_rng();
rng.gen_range(1..MAX_PULSE_PERIOD)
};
self.next = Some(
cx.schedule_keyed_event(Duration::from_millis(next), Self::pulse, ())
.unwrap(),
);
}
}
impl Model for Detector {}
fn main() -> Result<(), SimulationError> {
// ---------------
// Bench assembly.
// ---------------
// Models.
// The detector model that produces pulses.
let mut detector = Detector::new();
// The counter model.
let mut counter = Counter::new();
// The ticker model that keeps simulation alive.
let ticker = Ticker::new(TICK);
// Mailboxes.
let detector_mbox = Mailbox::new();
let counter_mbox = Mailbox::new();
let ticker_mbox = Mailbox::new();
// Connections.
detector.pulse.connect(Counter::pulse, &counter_mbox);
// Model handles for simulation.
let detector_addr = detector_mbox.address();
let counter_addr = counter_mbox.address();
let observer = BlockingEventQueue::new();
counter
.mode
.map_connect_sink(|m| Event::Mode(*m), &observer);
counter
.count
.map_connect_sink(|c| Event::Count(*c), &observer);
let mut observer = observer.into_reader();
// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;
// Assembly and initialization.
let (mut simu, mut scheduler) = SimInit::new()
.add_model(detector, detector_mbox, "detector")
.add_model(counter, counter_mbox, "counter")
.add_model(ticker, ticker_mbox, "ticker")
.set_clock(AutoSystemClock::new())
.init(t0)?;
// Simulation thread.
let simulation_handle = thread::spawn(move || {
// ---------- Simulation. ----------
// Infinitely kept alive by the ticker model until halted.
simu.step_unbounded()
});
// Switch the counter on.
scheduler.schedule_event(
Duration::from_millis(1),
Counter::power_in,
true,
counter_addr,
)?;
// Wait until counter mode is `On`.
loop {
let event = observer.next();
match event {
Some(Event::Mode(Mode::On)) => {
break;
}
None => panic!("Simulation exited unexpectedly"),
_ => (),
}
}
// Switch the detector on.
scheduler.schedule_event(
Duration::from_millis(100),
Detector::switch_on,
(),
detector_addr,
)?;
// Wait until `N` detections.
loop {
let event = observer.next();
match event {
Some(Event::Count(c)) if c >= N => {
break;
}
None => panic!("Simulation exited unexpectedly"),
_ => (),
}
}
// Stop the simulation.
scheduler.halt();
match simulation_handle.join().unwrap() {
Err(ExecutionError::Halted) => Ok(()),
Err(e) => Err(e.into()),
_ => Ok(()),
}
}

View File

@ -0,0 +1,35 @@
//! Helper models.
//!
//! This module contains helper models useful for simulation bench assembly.
//!
use std::time::Duration;
use nexosim::model::{Context, InitializedModel, Model};
/// A ticker model.
///
/// This model self-schedules at the specified period, which can be used to keep
/// the simulation alive.
pub struct Ticker {
/// Tick period.
tick: Duration,
}
impl Ticker {
/// Creates a new `Ticker` with the specified self-scheduling period.
pub fn new(tick: Duration) -> Self {
Self { tick }
}
/// Self-scheduled function.
async fn tick(&mut self) {}
}
impl Model for Ticker {
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
cx.schedule_periodic_event(self.tick, self.tick, Self::tick, ())
.unwrap();
self.into()
}
}

View File

@ -1,2 +1,3 @@
pub mod combinators;
pub mod helper_models;
pub mod observables;

View File

@ -273,6 +273,9 @@ pub use input::markers;
pub use input::{InputFn, ReplierFn};
pub use output::{Output, Requestor, UniRequestor};
pub use sink::{
event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter,
blocking_event_queue::{BlockingEventQueue, BlockingEventQueueReader},
event_buffer::EventBuffer,
event_slot::EventSlot,
EventSink, EventSinkStream, EventSinkWriter,
};
pub use source::{EventSource, QuerySource, ReplyReceiver};

View File

@ -1,3 +1,4 @@
pub(crate) mod blocking_event_queue;
pub(crate) mod event_buffer;
pub(crate) mod event_slot;

View File

@ -0,0 +1,145 @@
use std::fmt;
use std::iter::FusedIterator;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use super::{EventSink, EventSinkStream, EventSinkWriter};
/// A blocking event queue with an unbounded size.
///
/// Implements [`EventSink`].
///
/// Note that [`EventSinkStream`] is implemented by
/// [`BlockingEventQueueReader`], created with the
/// [`BlockingEventQueue::into_reader`] method.
pub struct BlockingEventQueue<T> {
is_open: Arc<AtomicBool>,
sender: Sender<T>,
receiver: Receiver<T>,
}
impl<T> BlockingEventQueue<T> {
/// Creates an open `BlockingEventQueue`.
pub fn new() -> Self {
Self::new_with_state(true)
}
/// Creates a closed `BlockingEventQueue`.
pub fn new_closed() -> Self {
Self::new_with_state(false)
}
/// Returns a consumer handle.
pub fn into_reader(self) -> BlockingEventQueueReader<T> {
BlockingEventQueueReader {
is_open: self.is_open,
receiver: self.receiver,
}
}
/// Creates a new `BlockingEventQueue` in the specified state.
fn new_with_state(is_open: bool) -> Self {
let (sender, receiver) = channel();
Self {
is_open: Arc::new(AtomicBool::new(is_open)),
sender,
receiver,
}
}
}
impl<T: Send + 'static> EventSink<T> for BlockingEventQueue<T> {
type Writer = BlockingEventQueueWriter<T>;
fn writer(&self) -> Self::Writer {
BlockingEventQueueWriter {
is_open: self.is_open.clone(),
sender: self.sender.clone(),
}
}
}
impl<T> Default for BlockingEventQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Debug for BlockingEventQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockingEventQueue").finish_non_exhaustive()
}
}
/// A consumer handle of a `BlockingEventQueue`.
///
/// Implements [`EventSinkStream`]. Calls to the iterator's `next` method are
/// blocking. `None` is returned when all writer handles have been dropped.
pub struct BlockingEventQueueReader<T> {
is_open: Arc<AtomicBool>,
receiver: Receiver<T>,
}
impl<T> Iterator for BlockingEventQueueReader<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self.receiver.recv() {
Ok(event) => Some(event),
Err(_) => None,
}
}
}
impl<T> FusedIterator for BlockingEventQueueReader<T> {}
impl<T: Send + 'static> EventSinkStream for BlockingEventQueueReader<T> {
fn open(&mut self) {
self.is_open.store(true, Ordering::Relaxed);
}
fn close(&mut self) {
self.is_open.store(false, Ordering::Relaxed);
}
}
impl<T> fmt::Debug for BlockingEventQueueReader<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockingEventQueueReader")
.finish_non_exhaustive()
}
}
/// A producer handle of a `BlockingEventQueue`.
pub struct BlockingEventQueueWriter<T> {
is_open: Arc<AtomicBool>,
sender: Sender<T>,
}
impl<T: Send + 'static> EventSinkWriter<T> for BlockingEventQueueWriter<T> {
/// Pushes an event onto the queue.
fn write(&self, event: T) {
if !self.is_open.load(Ordering::Relaxed) {
return;
}
// Ignore sending failure.
let _ = self.sender.send(event);
}
}
impl<T> Clone for BlockingEventQueueWriter<T> {
fn clone(&self) -> Self {
Self {
is_open: self.is_open.clone(),
sender: self.sender.clone(),
}
}
}
impl<T> fmt::Debug for BlockingEventQueueWriter<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockingEventQueueWriter")
.finish_non_exhaustive()
}
}