diff --git a/nexosim/examples/external_input.rs b/nexosim/examples/external_input.rs index 1b5406a..f64a012 100644 --- a/nexosim/examples/external_input.rs +++ b/nexosim/examples/external_input.rs @@ -148,9 +148,9 @@ impl Model for Listener { impl Drop for Listener { /// Wait for UDP Server shutdown. fn drop(&mut self) { - self.server_handle.take().map(|handle| { + if let Some(handle) = self.server_handle.take() { let _ = handle.join(); - }); + }; } } diff --git a/nexosim/examples/infinite_loop.rs b/nexosim/examples/infinite_loop.rs new file mode 100644 index 0000000..4af001e --- /dev/null +++ b/nexosim/examples/infinite_loop.rs @@ -0,0 +1,127 @@ +//! Example: a simulation that runs infinitely, receiving data from +//! outside. This setup is typical for hardware-in-the-loop use case. +//! +//! This example demonstrates in particular: +//! +//! * infinite simulation (useful in hardware-in-the-loop), +//! * simulation halting, +//! * processing of external data (useful in co-simulation), +//! * system clock, +//! * periodic scheduling. +//! +//! ```text +//! ┏━━━━━━━━━━━━━━━━━━━━━━━━┓ +//! ┃ Simulation ┃ +//!┌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┐ ┃ ┌──────────┐ ┃ +//!┆ ┆ message ┃ │ │ message ┃ +//!┆ External thread ├╌╌╌╌╌╌╌╌╌╌╌╂╌╌►│ Listener ├─────────╂─► +//!┆ ┆ [channel] ┃ │ │ ┃ +//!└╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┘ ┃ └──────────┘ ┃ +//! ┗━━━━━━━━━━━━━━━━━━━━━━━━┛ +//! ``` + +use std::sync::mpsc::{channel, Receiver}; +use std::thread::{self, sleep}; +use std::time::Duration; + +use nexosim::model::{Context, InitializedModel, Model}; +use nexosim::ports::{EventBuffer, Output}; +use nexosim::simulation::{Mailbox, SimInit, SimulationError}; +use nexosim::time::{AutoSystemClock, MonotonicTime}; + +const DELTA: Duration = Duration::from_millis(2); +const PERIOD: Duration = Duration::from_millis(20); +const N: usize = 10; + +/// The `Listener` Model. +pub struct Listener { + /// Received message. + pub message: Output, + + /// Source of external messages. + external: Receiver, +} + +impl Listener { + /// Creates new `Listener` model. + fn new(external: Receiver) -> Self { + Self { + message: Output::default(), + external, + } + } + + /// Periodically scheduled function that processes external events. + async fn process(&mut self) { + while let Ok(message) = self.external.try_recv() { + self.message.send(message).await; + } + } +} + +impl Model for Listener { + /// Initialize model. + async fn init(self, cx: &mut Context) -> InitializedModel { + // Schedule periodic function that processes external events. + cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) + .unwrap(); + + self.into() + } +} + +fn main() -> Result<(), SimulationError> { + // --------------- + // Bench assembly. + // --------------- + + // Channel for communication with simulation from outside. + let (tx, rx) = channel(); + + // Models. + + // The listener model. + let mut listener = Listener::new(rx); + + // Mailboxes. + let listener_mbox = Mailbox::new(); + + // Model handles for simulation. + let mut message = EventBuffer::with_capacity(N + 1); + listener.message.connect_sink(&message); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let (mut simu, mut scheduler) = SimInit::new() + .add_model(listener, listener_mbox, "listener") + .set_clock(AutoSystemClock::new()) + .init(t0)?; + + // Simulation thread. + let simulation_handle = thread::spawn(move || { + // ---------- + // Simulation. + // ---------- + simu.step_forever() + }); + + // Send data to simulation from outside. + for i in 0..N { + tx.send(i.to_string()).unwrap(); + if i % 3 == 0 { + sleep(PERIOD * i as u32) + } + } + + // Check collected external messages. + for i in 0..N { + assert_eq!(message.next().unwrap(), i.to_string()); + } + assert_eq!(message.next(), None); + + // Stop the simulation. + scheduler.halt(); + Ok(simulation_handle.join().unwrap()?) +} diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index d890fba..1a606a5 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -220,7 +220,7 @@ impl Simulation { /// simulation clock. This method blocks until all newly processed events /// have completed. pub fn step(&mut self) -> Result<(), ExecutionError> { - self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ()) + self.step_to_next(None).map(|_| ()) } /// Iteratively advances the simulation time until the specified deadline, @@ -236,7 +236,19 @@ impl Simulation { if target_time < now { return Err(ExecutionError::InvalidDeadline(target_time)); } - self.step_until_unchecked(target_time) + self.step_until_unchecked(Some(target_time)) + } + + /// Iteratively advances the simulation time, as if by calling + /// [`Simulation::step()`] repeatedly. + /// + /// This method blocks until all events scheduled have completed. If + /// simulation is halted, this method returns without an error. + pub fn step_forever(&mut self) -> Result<(), ExecutionError> { + match self.step_until_unchecked(None) { + Err(ExecutionError::Halted) => Ok(()), + result => result, + } } /// Processes an action immediately, blocking until completion. @@ -333,8 +345,9 @@ impl Simulation { /// Runs the executor. fn run(&mut self) -> Result<(), ExecutionError> { // Defensive programming, shouldn't happen - if self.is_halted.load(Ordering::Relaxed) { - return Err(ExecutionError::Terminated); + if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); } // Defensive programming, shouldn't happen @@ -392,12 +405,13 @@ impl Simulation { /// /// If at least one action was found that satisfied the time bound, the /// corresponding new simulation time is returned. - fn step_to_next_bounded( + fn step_to_next( &mut self, - upper_time_bound: MonotonicTime, + upper_time_bound: Option, ) -> Result, ExecutionError> { - if self.is_halted.load(Ordering::Relaxed) { - return Err(ExecutionError::Terminated); + if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); } if self.is_terminated { @@ -415,12 +429,17 @@ impl Simulation { action } + let (unbounded, upper_time_bound) = match upper_time_bound { + Some(upper_time_bound) => (false, upper_time_bound), + None => (true, MonotonicTime::MAX), + }; + // Closure returning the next key which time stamp is no older than the // upper bound, if any. Cancelled actions are pulled and discarded. let peek_next_key = |scheduler_queue: &mut MutexGuard| { loop { match scheduler_queue.peek() { - Some((&key, action)) if key.0 <= upper_time_bound => { + Some((&key, action)) if unbounded || key.0 <= upper_time_bound => { if !action.is_cancelled() { break Some(key); } @@ -502,16 +521,21 @@ impl Simulation { /// /// This method does not check whether the specified time lies in the future /// of the current simulation time. - fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { + fn step_until_unchecked( + &mut self, + target_time: Option, + ) -> Result<(), ExecutionError> { loop { - match self.step_to_next_bounded(target_time) { + match self.step_to_next(target_time) { // The target time was reached exactly. - Ok(Some(t)) if t == target_time => return Ok(()), + Ok(reached_time) if reached_time == target_time => return Ok(()), // No actions are scheduled before or at the target time. Ok(None) => { - // Update the simulation time. - self.time.write(target_time); - self.clock.synchronize(target_time); + if let Some(target_time) = target_time { + // Update the simulation time. + self.time.write(target_time); + self.clock.synchronize(target_time); + } return Ok(()); } Err(e) => return Err(e),