1
0
forked from ROMEO/nexosim

Add infinite step and an example.

This commit is contained in:
Jaŭhien Piatlicki 2025-01-15 13:10:37 +01:00
parent 1b08f10e42
commit 27ec1396df
3 changed files with 168 additions and 17 deletions

View File

@ -148,9 +148,9 @@ impl Model for Listener {
impl Drop for Listener { impl Drop for Listener {
/// Wait for UDP Server shutdown. /// Wait for UDP Server shutdown.
fn drop(&mut self) { fn drop(&mut self) {
self.server_handle.take().map(|handle| { if let Some(handle) = self.server_handle.take() {
let _ = handle.join(); let _ = handle.join();
}); };
} }
} }

View File

@ -0,0 +1,127 @@
//! Example: a simulation that runs infinitely, receiving data from
//! outside. This setup is typical for hardware-in-the-loop use case.
//!
//! This example demonstrates in particular:
//!
//! * infinite simulation (useful in hardware-in-the-loop),
//! * simulation halting,
//! * processing of external data (useful in co-simulation),
//! * system clock,
//! * periodic scheduling.
//!
//! ```text
//! ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
//! ┃ Simulation ┃
//!┌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┐ ┃ ┌──────────┐ ┃
//!┆ ┆ message ┃ │ │ message ┃
//!┆ External thread ├╌╌╌╌╌╌╌╌╌╌╌╂╌╌►│ Listener ├─────────╂─►
//!┆ ┆ [channel] ┃ │ │ ┃
//!└╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┘ ┃ └──────────┘ ┃
//! ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
//! ```
use std::sync::mpsc::{channel, Receiver};
use std::thread::{self, sleep};
use std::time::Duration;
use nexosim::model::{Context, InitializedModel, Model};
use nexosim::ports::{EventBuffer, Output};
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
use nexosim::time::{AutoSystemClock, MonotonicTime};
const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: usize = 10;
/// The `Listener` Model.
pub struct Listener {
/// Received message.
pub message: Output<String>,
/// Source of external messages.
external: Receiver<String>,
}
impl Listener {
/// Creates new `Listener` model.
fn new(external: Receiver<String>) -> Self {
Self {
message: Output::default(),
external,
}
}
/// Periodically scheduled function that processes external events.
async fn process(&mut self) {
while let Ok(message) = self.external.try_recv() {
self.message.send(message).await;
}
}
}
impl Model for Listener {
/// Initialize model.
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();
self.into()
}
}
fn main() -> Result<(), SimulationError> {
// ---------------
// Bench assembly.
// ---------------
// Channel for communication with simulation from outside.
let (tx, rx) = channel();
// Models.
// The listener model.
let mut listener = Listener::new(rx);
// Mailboxes.
let listener_mbox = Mailbox::new();
// Model handles for simulation.
let mut message = EventBuffer::with_capacity(N + 1);
listener.message.connect_sink(&message);
// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;
// Assembly and initialization.
let (mut simu, mut scheduler) = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0)?;
// Simulation thread.
let simulation_handle = thread::spawn(move || {
// ----------
// Simulation.
// ----------
simu.step_forever()
});
// Send data to simulation from outside.
for i in 0..N {
tx.send(i.to_string()).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i as u32)
}
}
// Check collected external messages.
for i in 0..N {
assert_eq!(message.next().unwrap(), i.to_string());
}
assert_eq!(message.next(), None);
// Stop the simulation.
scheduler.halt();
Ok(simulation_handle.join().unwrap()?)
}

View File

@ -220,7 +220,7 @@ impl Simulation {
/// simulation clock. This method blocks until all newly processed events /// simulation clock. This method blocks until all newly processed events
/// have completed. /// have completed.
pub fn step(&mut self) -> Result<(), ExecutionError> { pub fn step(&mut self) -> Result<(), ExecutionError> {
self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ()) self.step_to_next(None).map(|_| ())
} }
/// Iteratively advances the simulation time until the specified deadline, /// Iteratively advances the simulation time until the specified deadline,
@ -236,7 +236,19 @@ impl Simulation {
if target_time < now { if target_time < now {
return Err(ExecutionError::InvalidDeadline(target_time)); return Err(ExecutionError::InvalidDeadline(target_time));
} }
self.step_until_unchecked(target_time) self.step_until_unchecked(Some(target_time))
}
/// Iteratively advances the simulation time, as if by calling
/// [`Simulation::step()`] repeatedly.
///
/// This method blocks until all events scheduled have completed. If
/// simulation is halted, this method returns without an error.
pub fn step_forever(&mut self) -> Result<(), ExecutionError> {
match self.step_until_unchecked(None) {
Err(ExecutionError::Halted) => Ok(()),
result => result,
}
} }
/// Processes an action immediately, blocking until completion. /// Processes an action immediately, blocking until completion.
@ -333,8 +345,9 @@ impl Simulation {
/// Runs the executor. /// Runs the executor.
fn run(&mut self) -> Result<(), ExecutionError> { fn run(&mut self) -> Result<(), ExecutionError> {
// Defensive programming, shouldn't happen // Defensive programming, shouldn't happen
if self.is_halted.load(Ordering::Relaxed) { if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) {
return Err(ExecutionError::Terminated); self.is_terminated = true;
return Err(ExecutionError::Halted);
} }
// Defensive programming, shouldn't happen // Defensive programming, shouldn't happen
@ -392,12 +405,13 @@ impl Simulation {
/// ///
/// If at least one action was found that satisfied the time bound, the /// If at least one action was found that satisfied the time bound, the
/// corresponding new simulation time is returned. /// corresponding new simulation time is returned.
fn step_to_next_bounded( fn step_to_next(
&mut self, &mut self,
upper_time_bound: MonotonicTime, upper_time_bound: Option<MonotonicTime>,
) -> Result<Option<MonotonicTime>, ExecutionError> { ) -> Result<Option<MonotonicTime>, ExecutionError> {
if self.is_halted.load(Ordering::Relaxed) { if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) {
return Err(ExecutionError::Terminated); self.is_terminated = true;
return Err(ExecutionError::Halted);
} }
if self.is_terminated { if self.is_terminated {
@ -415,12 +429,17 @@ impl Simulation {
action action
} }
let (unbounded, upper_time_bound) = match upper_time_bound {
Some(upper_time_bound) => (false, upper_time_bound),
None => (true, MonotonicTime::MAX),
};
// Closure returning the next key which time stamp is no older than the // Closure returning the next key which time stamp is no older than the
// upper bound, if any. Cancelled actions are pulled and discarded. // upper bound, if any. Cancelled actions are pulled and discarded.
let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| { let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
loop { loop {
match scheduler_queue.peek() { match scheduler_queue.peek() {
Some((&key, action)) if key.0 <= upper_time_bound => { Some((&key, action)) if unbounded || key.0 <= upper_time_bound => {
if !action.is_cancelled() { if !action.is_cancelled() {
break Some(key); break Some(key);
} }
@ -502,16 +521,21 @@ impl Simulation {
/// ///
/// This method does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { fn step_until_unchecked(
&mut self,
target_time: Option<MonotonicTime>,
) -> Result<(), ExecutionError> {
loop { loop {
match self.step_to_next_bounded(target_time) { match self.step_to_next(target_time) {
// The target time was reached exactly. // The target time was reached exactly.
Ok(Some(t)) if t == target_time => return Ok(()), Ok(reached_time) if reached_time == target_time => return Ok(()),
// No actions are scheduled before or at the target time. // No actions are scheduled before or at the target time.
Ok(None) => { Ok(None) => {
if let Some(target_time) = target_time {
// Update the simulation time. // Update the simulation time.
self.time.write(target_time); self.time.write(target_time);
self.clock.synchronize(target_time); self.clock.synchronize(target_time);
}
return Ok(()); return Ok(());
} }
Err(e) => return Err(e), Err(e) => return Err(e),