1
0
forked from ROMEO/nexosim

Merge pull request #74 from asynchronics/feature-simulation-halt

Add possibility to halt simulation
This commit is contained in:
Serge Barral
2025-01-15 16:03:36 +01:00
committed by GitHub
11 changed files with 284 additions and 118 deletions

View File

@ -148,9 +148,9 @@ impl Model for Listener {
impl Drop for Listener { impl Drop for Listener {
/// Wait for UDP Server shutdown. /// Wait for UDP Server shutdown.
fn drop(&mut self) { fn drop(&mut self) {
self.server_handle.take().map(|handle| { if let Some(handle) = self.server_handle.take() {
let _ = handle.join(); let _ = handle.join();
}); };
} }
} }

View File

@ -0,0 +1,127 @@
//! Example: a simulation that runs infinitely, receiving data from
//! outside. This setup is typical for hardware-in-the-loop use case.
//!
//! This example demonstrates in particular:
//!
//! * infinite simulation (useful in hardware-in-the-loop),
//! * simulation halting,
//! * processing of external data (useful in co-simulation),
//! * system clock,
//! * periodic scheduling.
//!
//! ```text
//! ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
//! ┃ Simulation ┃
//!┌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┐ ┃ ┌──────────┐ ┃
//!┆ ┆ message ┃ │ │ message ┃
//!┆ External thread ├╌╌╌╌╌╌╌╌╌╌╌╂╌╌►│ Listener ├─────────╂─►
//!┆ ┆ [channel] ┃ │ │ ┃
//!└╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┘ ┃ └──────────┘ ┃
//! ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
//! ```
use std::sync::mpsc::{channel, Receiver};
use std::thread::{self, sleep};
use std::time::Duration;
use nexosim::model::{Context, InitializedModel, Model};
use nexosim::ports::{EventBuffer, Output};
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
use nexosim::time::{AutoSystemClock, MonotonicTime};
const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: usize = 10;
/// The `Listener` Model.
pub struct Listener {
/// Received message.
pub message: Output<String>,
/// Source of external messages.
external: Receiver<String>,
}
impl Listener {
/// Creates new `Listener` model.
fn new(external: Receiver<String>) -> Self {
Self {
message: Output::default(),
external,
}
}
/// Periodically scheduled function that processes external events.
async fn process(&mut self) {
while let Ok(message) = self.external.try_recv() {
self.message.send(message).await;
}
}
}
impl Model for Listener {
/// Initialize model.
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();
self.into()
}
}
fn main() -> Result<(), SimulationError> {
// ---------------
// Bench assembly.
// ---------------
// Channel for communication with simulation from outside.
let (tx, rx) = channel();
// Models.
// The listener model.
let mut listener = Listener::new(rx);
// Mailboxes.
let listener_mbox = Mailbox::new();
// Model handles for simulation.
let mut message = EventBuffer::with_capacity(N + 1);
listener.message.connect_sink(&message);
// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;
// Assembly and initialization.
let (mut simu, mut scheduler) = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0)?;
// Simulation thread.
let simulation_handle = thread::spawn(move || {
// ----------
// Simulation.
// ----------
simu.step_unbounded()
});
// Send data to simulation from outside.
for i in 0..N {
tx.send(i.to_string()).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i as u32)
}
}
// Check collected external messages.
for i in 0..N {
assert_eq!(message.next().unwrap(), i.to_string());
}
assert_eq!(message.next(), None);
// Stop the simulation.
scheduler.halt();
Ok(simulation_handle.join().unwrap()?)
}

View File

@ -17,15 +17,16 @@ enum ErrorCode {
INVALID_KEY = 6; INVALID_KEY = 6;
INITIALIZER_PANIC = 10; INITIALIZER_PANIC = 10;
SIMULATION_NOT_STARTED = 11; SIMULATION_NOT_STARTED = 11;
SIMULATION_TERMINATED = 12; SIMULATION_HALTED = 12;
SIMULATION_DEADLOCK = 13; SIMULATION_TERMINATED = 13;
SIMULATION_MESSAGE_LOSS = 14; SIMULATION_DEADLOCK = 14;
SIMULATION_NO_RECIPIENT = 15; SIMULATION_MESSAGE_LOSS = 15;
SIMULATION_PANIC = 16; SIMULATION_NO_RECIPIENT = 16;
SIMULATION_TIMEOUT = 17; SIMULATION_PANIC = 17;
SIMULATION_OUT_OF_SYNC = 18; SIMULATION_TIMEOUT = 18;
SIMULATION_BAD_QUERY = 19; SIMULATION_OUT_OF_SYNC = 19;
SIMULATION_TIME_OUT_OF_RANGE = 20; SIMULATION_BAD_QUERY = 20;
SIMULATION_TIME_OUT_OF_RANGE = 21;
SOURCE_NOT_FOUND = 30; SOURCE_NOT_FOUND = 30;
SINK_NOT_FOUND = 31; SINK_NOT_FOUND = 31;
} }

View File

@ -343,15 +343,16 @@ pub enum ErrorCode {
InvalidKey = 6, InvalidKey = 6,
InitializerPanic = 10, InitializerPanic = 10,
SimulationNotStarted = 11, SimulationNotStarted = 11,
SimulationTerminated = 12, SimulationHalted = 12,
SimulationDeadlock = 13, SimulationTerminated = 13,
SimulationMessageLoss = 14, SimulationDeadlock = 14,
SimulationNoRecipient = 15, SimulationMessageLoss = 15,
SimulationPanic = 16, SimulationNoRecipient = 16,
SimulationTimeout = 17, SimulationPanic = 17,
SimulationOutOfSync = 18, SimulationTimeout = 18,
SimulationBadQuery = 19, SimulationOutOfSync = 19,
SimulationTimeOutOfRange = 20, SimulationBadQuery = 20,
SimulationTimeOutOfRange = 21,
SourceNotFound = 30, SourceNotFound = 30,
SinkNotFound = 31, SinkNotFound = 31,
} }
@ -371,6 +372,7 @@ impl ErrorCode {
Self::InvalidKey => "INVALID_KEY", Self::InvalidKey => "INVALID_KEY",
Self::InitializerPanic => "INITIALIZER_PANIC", Self::InitializerPanic => "INITIALIZER_PANIC",
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED", Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
Self::SimulationHalted => "SIMULATION_HALTED",
Self::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK", Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS", Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS",
@ -396,6 +398,7 @@ impl ErrorCode {
"INVALID_KEY" => Some(Self::InvalidKey), "INVALID_KEY" => Some(Self::InvalidKey),
"INITIALIZER_PANIC" => Some(Self::InitializerPanic), "INITIALIZER_PANIC" => Some(Self::InitializerPanic),
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_HALTED" => Some(Self::SimulationHalted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss), "SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss),

View File

@ -42,6 +42,7 @@ fn map_execution_error(error: ExecutionError) -> Error {
ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,
ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, ExecutionError::BadQuery => ErrorCode::SimulationBadQuery,
ExecutionError::Halted => ErrorCode::SimulationHalted,
ExecutionError::Terminated => ErrorCode::SimulationTerminated, ExecutionError::Terminated => ErrorCode::SimulationTerminated,
ExecutionError::InvalidDeadline(_) => ErrorCode::InvalidDeadline, ExecutionError::InvalidDeadline(_) => ErrorCode::InvalidDeadline,
}; };

View File

@ -8,6 +8,9 @@ use crate::time::{Deadline, MonotonicTime};
use super::{Model, ProtoModel}; use super::{Model, ProtoModel};
#[cfg(all(test, not(nexosim_loom)))]
use crate::channel::Receiver;
/// A local context for models. /// A local context for models.
/// ///
/// A `Context` is a handle to the global context associated to a model /// A `Context` is a handle to the global context associated to a model
@ -521,3 +524,16 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> {
); );
} }
} }
#[cfg(all(test, not(nexosim_loom)))]
impl<M: Model> Context<M> {
/// Creates a dummy context for testing purposes.
pub(crate) fn new_dummy() -> Self {
let dummy_address = Receiver::new(1).sender();
Context::new(
String::new(),
GlobalScheduler::new_dummy(),
Address(dummy_address),
)
}
}

View File

@ -504,16 +504,12 @@ fn recycle_vec<T, U>(mut v: Vec<T>) -> Vec<U> {
#[cfg(all(test, not(nexosim_loom)))] #[cfg(all(test, not(nexosim_loom)))]
mod tests { mod tests {
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::thread; use std::thread;
use futures_executor::block_on; use futures_executor::block_on;
use crate::channel::Receiver; use crate::channel::Receiver;
use crate::simulation::{Address, GlobalScheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::super::sender::{ use super::super::sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender, FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender,
@ -574,15 +570,7 @@ mod tests {
let mut sum_model = SumModel::new(sum.clone()); let mut sum_model = SumModel::new(sum.clone());
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap();
} }
}) })
@ -644,15 +632,7 @@ mod tests {
let mut sum_model = SumModel::new(sum.clone()); let mut sum_model = SumModel::new(sum.clone());
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async { block_on(async {
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
@ -704,15 +684,7 @@ mod tests {
let mut double_model = DoubleModel::new(); let mut double_model = DoubleModel::new();
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap();
thread::sleep(std::time::Duration::from_millis(100)); thread::sleep(std::time::Duration::from_millis(100));
} }
@ -789,15 +761,7 @@ mod tests {
let mut double_model = DoubleModel::new(); let mut double_model = DoubleModel::new();
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async { block_on(async {
mailbox mailbox

View File

@ -399,16 +399,12 @@ impl<R> Iterator for ReplyIterator<R> {
#[cfg(all(test, not(nexosim_loom)))] #[cfg(all(test, not(nexosim_loom)))]
mod tests { mod tests {
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::thread; use std::thread;
use futures_executor::block_on; use futures_executor::block_on;
use crate::channel::Receiver; use crate::channel::Receiver;
use crate::simulation::{Address, GlobalScheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::super::sender::{ use super::super::sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender, FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender,
@ -469,15 +465,7 @@ mod tests {
let mut sum_model = SumModel::new(sum.clone()); let mut sum_model = SumModel::new(sum.clone());
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap();
} }
}) })
@ -539,15 +527,7 @@ mod tests {
let mut sum_model = SumModel::new(sum.clone()); let mut sum_model = SumModel::new(sum.clone());
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async { block_on(async {
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap();
@ -599,15 +579,7 @@ mod tests {
let mut double_model = DoubleModel::new(); let mut double_model = DoubleModel::new();
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap();
thread::sleep(std::time::Duration::from_millis(100)); thread::sleep(std::time::Duration::from_millis(100));
} }
@ -684,15 +656,7 @@ mod tests {
let mut double_model = DoubleModel::new(); let mut double_model = DoubleModel::new();
move || { move || {
let dummy_address = Receiver::new(1).sender(); let mut dummy_cx = Context::new_dummy();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let mut dummy_cx = Context::new(
String::new(),
GlobalScheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
);
block_on(async { block_on(async {
mailbox mailbox

View File

@ -94,6 +94,7 @@ use std::error::Error;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll; use std::task::Poll;
use std::time::Duration; use std::time::Duration;
@ -160,6 +161,7 @@ pub struct Simulation {
timeout: Duration, timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>, observers: Vec<(String, Box<dyn ChannelObserver>)>,
model_names: Vec<String>, model_names: Vec<String>,
is_halted: Arc<AtomicBool>,
is_terminated: bool, is_terminated: bool,
} }
@ -175,6 +177,7 @@ impl Simulation {
timeout: Duration, timeout: Duration,
observers: Vec<(String, Box<dyn ChannelObserver>)>, observers: Vec<(String, Box<dyn ChannelObserver>)>,
model_names: Vec<String>, model_names: Vec<String>,
is_halted: Arc<AtomicBool>,
) -> Self { ) -> Self {
Self { Self {
executor, executor,
@ -185,6 +188,7 @@ impl Simulation {
timeout, timeout,
observers, observers,
model_names, model_names,
is_halted,
is_terminated: false, is_terminated: false,
} }
} }
@ -216,7 +220,7 @@ impl Simulation {
/// simulation clock. This method blocks until all newly processed events /// simulation clock. This method blocks until all newly processed events
/// have completed. /// have completed.
pub fn step(&mut self) -> Result<(), ExecutionError> { pub fn step(&mut self) -> Result<(), ExecutionError> {
self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ()) self.step_to_next(None).map(|_| ())
} }
/// Iteratively advances the simulation time until the specified deadline, /// Iteratively advances the simulation time until the specified deadline,
@ -232,7 +236,19 @@ impl Simulation {
if target_time < now { if target_time < now {
return Err(ExecutionError::InvalidDeadline(target_time)); return Err(ExecutionError::InvalidDeadline(target_time));
} }
self.step_until_unchecked(target_time) self.step_until_unchecked(Some(target_time))
}
/// Iteratively advances the simulation time, as if by calling
/// [`Simulation::step()`] repeatedly.
///
/// This method blocks until all events scheduled have completed. If
/// simulation is halted, this method returns without an error.
pub fn step_unbounded(&mut self) -> Result<(), ExecutionError> {
match self.step_until_unchecked(None) {
Err(ExecutionError::Halted) => Ok(()),
result => result,
}
} }
/// Processes an action immediately, blocking until completion. /// Processes an action immediately, blocking until completion.
@ -332,6 +348,11 @@ impl Simulation {
return Err(ExecutionError::Terminated); return Err(ExecutionError::Terminated);
} }
if self.is_halted.load(Ordering::Relaxed) {
self.is_terminated = true;
return Err(ExecutionError::Halted);
}
self.executor.run(self.timeout).map_err(|e| { self.executor.run(self.timeout).map_err(|e| {
self.is_terminated = true; self.is_terminated = true;
@ -382,10 +403,19 @@ impl Simulation {
/// ///
/// If at least one action was found that satisfied the time bound, the /// If at least one action was found that satisfied the time bound, the
/// corresponding new simulation time is returned. /// corresponding new simulation time is returned.
fn step_to_next_bounded( fn step_to_next(
&mut self, &mut self,
upper_time_bound: MonotonicTime, upper_time_bound: Option<MonotonicTime>,
) -> Result<Option<MonotonicTime>, ExecutionError> { ) -> Result<Option<MonotonicTime>, ExecutionError> {
if self.is_terminated {
return Err(ExecutionError::Terminated);
}
if self.is_halted.load(Ordering::Relaxed) {
self.is_terminated = true;
return Err(ExecutionError::Halted);
}
// Function pulling the next action. If the action is periodic, it is // Function pulling the next action. If the action is periodic, it is
// immediately re-scheduled. // immediately re-scheduled.
fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action { fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action {
@ -397,6 +427,8 @@ impl Simulation {
action action
} }
let upper_time_bound = upper_time_bound.unwrap_or(MonotonicTime::MAX);
// Closure returning the next key which time stamp is no older than the // Closure returning the next key which time stamp is no older than the
// upper bound, if any. Cancelled actions are pulled and discarded. // upper bound, if any. Cancelled actions are pulled and discarded.
let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| { let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
@ -484,16 +516,21 @@ impl Simulation {
/// ///
/// This method does not check whether the specified time lies in the future /// This method does not check whether the specified time lies in the future
/// of the current simulation time. /// of the current simulation time.
fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { fn step_until_unchecked(
&mut self,
target_time: Option<MonotonicTime>,
) -> Result<(), ExecutionError> {
loop { loop {
match self.step_to_next_bounded(target_time) { match self.step_to_next(target_time) {
// The target time was reached exactly. // The target time was reached exactly.
Ok(Some(t)) if t == target_time => return Ok(()), Ok(reached_time) if reached_time == target_time => return Ok(()),
// No actions are scheduled before or at the target time. // No actions are scheduled before or at the target time.
Ok(None) => { Ok(None) => {
// Update the simulation time. if let Some(target_time) = target_time {
self.time.write(target_time); // Update the simulation time.
self.clock.synchronize(target_time); self.time.write(target_time);
self.clock.synchronize(target_time);
}
return Ok(()); return Ok(());
} }
Err(e) => return Err(e), Err(e) => return Err(e),
@ -506,7 +543,11 @@ impl Simulation {
/// Returns a scheduler handle. /// Returns a scheduler handle.
#[cfg(feature = "grpc")] #[cfg(feature = "grpc")]
pub(crate) fn scheduler(&self) -> Scheduler { pub(crate) fn scheduler(&self) -> Scheduler {
Scheduler::new(self.scheduler_queue.clone(), self.time.reader()) Scheduler::new(
self.scheduler_queue.clone(),
self.time.reader(),
self.is_halted.clone(),
)
} }
} }
@ -533,6 +574,8 @@ pub struct DeadlockInfo {
/// An error returned upon simulation execution failure. /// An error returned upon simulation execution failure.
#[derive(Debug)] #[derive(Debug)]
pub enum ExecutionError { pub enum ExecutionError {
/// The simulation has been intentionally stopped.
Halted,
/// The simulation has been terminated due to an earlier deadlock, message /// The simulation has been terminated due to an earlier deadlock, message
/// loss, missing recipient, model panic, timeout or synchronization loss. /// loss, missing recipient, model panic, timeout or synchronization loss.
Terminated, Terminated,
@ -613,6 +656,7 @@ pub enum ExecutionError {
impl fmt::Display for ExecutionError { impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
Self::Halted => f.write_str("the simulation has been intentionally stopped"),
Self::Terminated => f.write_str("the simulation has been terminated"), Self::Terminated => f.write_str("the simulation has been terminated"),
Self::Deadlock(list) => { Self::Deadlock(list) => {
f.write_str( f.write_str(

View File

@ -20,6 +20,9 @@ use crate::simulation::Address;
use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; use crate::time::{AtomicTimeReader, Deadline, MonotonicTime};
use crate::util::priority_queue::PriorityQueue; use crate::util::priority_queue::PriorityQueue;
#[cfg(all(test, not(nexosim_loom)))]
use crate::{time::TearableAtomicTime, util::sync_cell::SyncCell};
const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0;
/// A global scheduler. /// A global scheduler.
@ -27,8 +30,12 @@ const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0;
pub struct Scheduler(GlobalScheduler); pub struct Scheduler(GlobalScheduler);
impl Scheduler { impl Scheduler {
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self { pub(crate) fn new(
Self(GlobalScheduler::new(scheduler_queue, time)) scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
is_halted: Arc<AtomicBool>,
) -> Self {
Self(GlobalScheduler::new(scheduler_queue, time, is_halted))
} }
/// Returns the current simulation time. /// Returns the current simulation time.
@ -175,6 +182,11 @@ impl Scheduler {
GLOBAL_SCHEDULER_ORIGIN_ID, GLOBAL_SCHEDULER_ORIGIN_ID,
) )
} }
/// Stops the simulation on the next step.
pub fn halt(&mut self) {
self.0.halt()
}
} }
impl fmt::Debug for Scheduler { impl fmt::Debug for Scheduler {
@ -341,13 +353,19 @@ pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>;
pub(crate) struct GlobalScheduler { pub(crate) struct GlobalScheduler {
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader, time: AtomicTimeReader,
is_halted: Arc<AtomicBool>,
} }
impl GlobalScheduler { impl GlobalScheduler {
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self { pub(crate) fn new(
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
is_halted: Arc<AtomicBool>,
) -> Self {
Self { Self {
scheduler_queue, scheduler_queue,
time, time,
is_halted,
} }
} }
@ -538,6 +556,11 @@ impl GlobalScheduler {
Ok(event_key) Ok(event_key)
} }
/// Stops the simulation on the next step.
pub(crate) fn halt(&mut self) {
self.is_halted.store(true, Ordering::Relaxed);
}
} }
impl fmt::Debug for GlobalScheduler { impl fmt::Debug for GlobalScheduler {
@ -814,3 +837,14 @@ pub(crate) async fn send_keyed_event<M, F, T, S>(
) )
.await; .await;
} }
#[cfg(all(test, not(nexosim_loom)))]
impl GlobalScheduler {
/// Creates a dummy scheduler for testing purposes.
pub(crate) fn new_dummy() -> Self {
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_halter = Arc::new(AtomicBool::new(false));
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter)
}
}

View File

@ -1,4 +1,5 @@
use std::fmt; use std::fmt;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@ -20,6 +21,7 @@ pub struct SimInit {
executor: Executor, executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>, scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTime, time: AtomicTime,
is_halted: Arc<AtomicBool>,
clock: Box<dyn Clock + 'static>, clock: Box<dyn Clock + 'static>,
clock_tolerance: Option<Duration>, clock_tolerance: Option<Duration>,
timeout: Duration, timeout: Duration,
@ -64,6 +66,7 @@ impl SimInit {
executor, executor,
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
time, time,
is_halted: Arc::new(AtomicBool::new(false)),
clock: Box::new(NoClock::new()), clock: Box::new(NoClock::new()),
clock_tolerance: None, clock_tolerance: None,
timeout: Duration::ZERO, timeout: Duration::ZERO,
@ -91,7 +94,11 @@ impl SimInit {
}; };
self.observers self.observers
.push((name.clone(), Box::new(mailbox.0.observer()))); .push((name.clone(), Box::new(mailbox.0.observer())));
let scheduler = GlobalScheduler::new(self.scheduler_queue.clone(), self.time.reader()); let scheduler = GlobalScheduler::new(
self.scheduler_queue.clone(),
self.time.reader(),
self.is_halted.clone(),
);
add_model( add_model(
model, model,
@ -157,7 +164,11 @@ impl SimInit {
self.time.write(start_time); self.time.write(start_time);
self.clock.synchronize(start_time); self.clock.synchronize(start_time);
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); let scheduler = Scheduler::new(
self.scheduler_queue.clone(),
self.time.reader(),
self.is_halted.clone(),
);
let mut simulation = Simulation::new( let mut simulation = Simulation::new(
self.executor, self.executor,
self.scheduler_queue, self.scheduler_queue,
@ -167,6 +178,7 @@ impl SimInit {
self.timeout, self.timeout,
self.observers, self.observers,
self.model_names, self.model_names,
self.is_halted,
); );
simulation.run()?; simulation.run()?;