forked from ROMEO/nexosim
Add possibility to halt simulation
This commit is contained in:
parent
2e0653e1e3
commit
1b08f10e42
@ -17,15 +17,16 @@ enum ErrorCode {
|
||||
INVALID_KEY = 6;
|
||||
INITIALIZER_PANIC = 10;
|
||||
SIMULATION_NOT_STARTED = 11;
|
||||
SIMULATION_TERMINATED = 12;
|
||||
SIMULATION_DEADLOCK = 13;
|
||||
SIMULATION_MESSAGE_LOSS = 14;
|
||||
SIMULATION_NO_RECIPIENT = 15;
|
||||
SIMULATION_PANIC = 16;
|
||||
SIMULATION_TIMEOUT = 17;
|
||||
SIMULATION_OUT_OF_SYNC = 18;
|
||||
SIMULATION_BAD_QUERY = 19;
|
||||
SIMULATION_TIME_OUT_OF_RANGE = 20;
|
||||
SIMULATION_HALTED = 12;
|
||||
SIMULATION_TERMINATED = 13;
|
||||
SIMULATION_DEADLOCK = 14;
|
||||
SIMULATION_MESSAGE_LOSS = 15;
|
||||
SIMULATION_NO_RECIPIENT = 16;
|
||||
SIMULATION_PANIC = 17;
|
||||
SIMULATION_TIMEOUT = 18;
|
||||
SIMULATION_OUT_OF_SYNC = 19;
|
||||
SIMULATION_BAD_QUERY = 20;
|
||||
SIMULATION_TIME_OUT_OF_RANGE = 21;
|
||||
SOURCE_NOT_FOUND = 30;
|
||||
SINK_NOT_FOUND = 31;
|
||||
}
|
||||
|
@ -343,15 +343,16 @@ pub enum ErrorCode {
|
||||
InvalidKey = 6,
|
||||
InitializerPanic = 10,
|
||||
SimulationNotStarted = 11,
|
||||
SimulationTerminated = 12,
|
||||
SimulationDeadlock = 13,
|
||||
SimulationMessageLoss = 14,
|
||||
SimulationNoRecipient = 15,
|
||||
SimulationPanic = 16,
|
||||
SimulationTimeout = 17,
|
||||
SimulationOutOfSync = 18,
|
||||
SimulationBadQuery = 19,
|
||||
SimulationTimeOutOfRange = 20,
|
||||
SimulationHalted = 12,
|
||||
SimulationTerminated = 13,
|
||||
SimulationDeadlock = 14,
|
||||
SimulationMessageLoss = 15,
|
||||
SimulationNoRecipient = 16,
|
||||
SimulationPanic = 17,
|
||||
SimulationTimeout = 18,
|
||||
SimulationOutOfSync = 19,
|
||||
SimulationBadQuery = 20,
|
||||
SimulationTimeOutOfRange = 21,
|
||||
SourceNotFound = 30,
|
||||
SinkNotFound = 31,
|
||||
}
|
||||
@ -371,6 +372,7 @@ impl ErrorCode {
|
||||
Self::InvalidKey => "INVALID_KEY",
|
||||
Self::InitializerPanic => "INITIALIZER_PANIC",
|
||||
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
|
||||
Self::SimulationHalted => "SIMULATION_HALTED",
|
||||
Self::SimulationTerminated => "SIMULATION_TERMINATED",
|
||||
Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
|
||||
Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS",
|
||||
@ -396,6 +398,7 @@ impl ErrorCode {
|
||||
"INVALID_KEY" => Some(Self::InvalidKey),
|
||||
"INITIALIZER_PANIC" => Some(Self::InitializerPanic),
|
||||
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
|
||||
"SIMULATION_HALTED" => Some(Self::SimulationHalted),
|
||||
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
|
||||
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
|
||||
"SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss),
|
||||
|
@ -42,6 +42,7 @@ fn map_execution_error(error: ExecutionError) -> Error {
|
||||
ExecutionError::Timeout => ErrorCode::SimulationTimeout,
|
||||
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,
|
||||
ExecutionError::BadQuery => ErrorCode::SimulationBadQuery,
|
||||
ExecutionError::Halted => ErrorCode::SimulationHalted,
|
||||
ExecutionError::Terminated => ErrorCode::SimulationTerminated,
|
||||
ExecutionError::InvalidDeadline(_) => ErrorCode::InvalidDeadline,
|
||||
};
|
||||
|
@ -503,7 +503,7 @@ fn recycle_vec<T, U>(mut v: Vec<T>) -> Vec<U> {
|
||||
|
||||
#[cfg(all(test, not(nexosim_loom)))]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
@ -578,9 +578,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap();
|
||||
@ -648,9 +649,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
block_on(async {
|
||||
@ -708,9 +710,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap();
|
||||
@ -793,9 +796,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
|
||||
|
@ -398,7 +398,7 @@ impl<R> Iterator for ReplyIterator<R> {
|
||||
|
||||
#[cfg(all(test, not(nexosim_loom)))]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
@ -473,9 +473,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap();
|
||||
@ -543,9 +544,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
block_on(async {
|
||||
@ -603,9 +605,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap();
|
||||
@ -688,9 +691,10 @@ mod tests {
|
||||
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
|
||||
let dummy_time =
|
||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||
let dummy_halter = Arc::new(AtomicBool::new(false));
|
||||
let mut dummy_cx = Context::new(
|
||||
String::new(),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time),
|
||||
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter),
|
||||
Address(dummy_address),
|
||||
);
|
||||
|
||||
|
@ -94,6 +94,7 @@ use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
@ -160,6 +161,7 @@ pub struct Simulation {
|
||||
timeout: Duration,
|
||||
observers: Vec<(String, Box<dyn ChannelObserver>)>,
|
||||
model_names: Vec<String>,
|
||||
is_halted: Arc<AtomicBool>,
|
||||
is_terminated: bool,
|
||||
}
|
||||
|
||||
@ -175,6 +177,7 @@ impl Simulation {
|
||||
timeout: Duration,
|
||||
observers: Vec<(String, Box<dyn ChannelObserver>)>,
|
||||
model_names: Vec<String>,
|
||||
is_halted: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
executor,
|
||||
@ -185,6 +188,7 @@ impl Simulation {
|
||||
timeout,
|
||||
observers,
|
||||
model_names,
|
||||
is_halted,
|
||||
is_terminated: false,
|
||||
}
|
||||
}
|
||||
@ -328,6 +332,12 @@ impl Simulation {
|
||||
|
||||
/// Runs the executor.
|
||||
fn run(&mut self) -> Result<(), ExecutionError> {
|
||||
// Defensive programming, shouldn't happen
|
||||
if self.is_halted.load(Ordering::Relaxed) {
|
||||
return Err(ExecutionError::Terminated);
|
||||
}
|
||||
|
||||
// Defensive programming, shouldn't happen
|
||||
if self.is_terminated {
|
||||
return Err(ExecutionError::Terminated);
|
||||
}
|
||||
@ -386,6 +396,14 @@ impl Simulation {
|
||||
&mut self,
|
||||
upper_time_bound: MonotonicTime,
|
||||
) -> Result<Option<MonotonicTime>, ExecutionError> {
|
||||
if self.is_halted.load(Ordering::Relaxed) {
|
||||
return Err(ExecutionError::Terminated);
|
||||
}
|
||||
|
||||
if self.is_terminated {
|
||||
return Err(ExecutionError::Terminated);
|
||||
}
|
||||
|
||||
// Function pulling the next action. If the action is periodic, it is
|
||||
// immediately re-scheduled.
|
||||
fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action {
|
||||
@ -506,7 +524,11 @@ impl Simulation {
|
||||
/// Returns a scheduler handle.
|
||||
#[cfg(feature = "grpc")]
|
||||
pub(crate) fn scheduler(&self) -> Scheduler {
|
||||
Scheduler::new(self.scheduler_queue.clone(), self.time.reader())
|
||||
Scheduler::new(
|
||||
self.scheduler_queue.clone(),
|
||||
self.time.reader(),
|
||||
self.is_halted.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -533,6 +555,8 @@ pub struct DeadlockInfo {
|
||||
/// An error returned upon simulation execution failure.
|
||||
#[derive(Debug)]
|
||||
pub enum ExecutionError {
|
||||
/// The simulation has been intentionally stopped.
|
||||
Halted,
|
||||
/// The simulation has been terminated due to an earlier deadlock, message
|
||||
/// loss, missing recipient, model panic, timeout or synchronization loss.
|
||||
Terminated,
|
||||
@ -613,6 +637,7 @@ pub enum ExecutionError {
|
||||
impl fmt::Display for ExecutionError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Self::Halted => f.write_str("the simulation has been intentionally stopped"),
|
||||
Self::Terminated => f.write_str("the simulation has been terminated"),
|
||||
Self::Deadlock(list) => {
|
||||
f.write_str(
|
||||
|
@ -27,8 +27,12 @@ const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0;
|
||||
pub struct Scheduler(GlobalScheduler);
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
|
||||
Self(GlobalScheduler::new(scheduler_queue, time))
|
||||
pub(crate) fn new(
|
||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||
time: AtomicTimeReader,
|
||||
is_halted: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
Self(GlobalScheduler::new(scheduler_queue, time, is_halted))
|
||||
}
|
||||
|
||||
/// Returns the current simulation time.
|
||||
@ -175,6 +179,11 @@ impl Scheduler {
|
||||
GLOBAL_SCHEDULER_ORIGIN_ID,
|
||||
)
|
||||
}
|
||||
|
||||
/// Stops the simulation on the next step.
|
||||
pub fn halt(&mut self) {
|
||||
self.0.halt()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Scheduler {
|
||||
@ -341,13 +350,19 @@ pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>;
|
||||
pub(crate) struct GlobalScheduler {
|
||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||
time: AtomicTimeReader,
|
||||
is_halted: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl GlobalScheduler {
|
||||
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
|
||||
pub(crate) fn new(
|
||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||
time: AtomicTimeReader,
|
||||
is_halted: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
scheduler_queue,
|
||||
time,
|
||||
is_halted,
|
||||
}
|
||||
}
|
||||
|
||||
@ -538,6 +553,11 @@ impl GlobalScheduler {
|
||||
|
||||
Ok(event_key)
|
||||
}
|
||||
|
||||
/// Stops the simulation on the next step.
|
||||
pub(crate) fn halt(&mut self) {
|
||||
self.is_halted.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for GlobalScheduler {
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::fmt;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
@ -20,6 +21,7 @@ pub struct SimInit {
|
||||
executor: Executor,
|
||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||
time: AtomicTime,
|
||||
is_halted: Arc<AtomicBool>,
|
||||
clock: Box<dyn Clock + 'static>,
|
||||
clock_tolerance: Option<Duration>,
|
||||
timeout: Duration,
|
||||
@ -64,6 +66,7 @@ impl SimInit {
|
||||
executor,
|
||||
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
|
||||
time,
|
||||
is_halted: Arc::new(AtomicBool::new(false)),
|
||||
clock: Box::new(NoClock::new()),
|
||||
clock_tolerance: None,
|
||||
timeout: Duration::ZERO,
|
||||
@ -91,7 +94,11 @@ impl SimInit {
|
||||
};
|
||||
self.observers
|
||||
.push((name.clone(), Box::new(mailbox.0.observer())));
|
||||
let scheduler = GlobalScheduler::new(self.scheduler_queue.clone(), self.time.reader());
|
||||
let scheduler = GlobalScheduler::new(
|
||||
self.scheduler_queue.clone(),
|
||||
self.time.reader(),
|
||||
self.is_halted.clone(),
|
||||
);
|
||||
|
||||
add_model(
|
||||
model,
|
||||
@ -157,7 +164,11 @@ impl SimInit {
|
||||
self.time.write(start_time);
|
||||
self.clock.synchronize(start_time);
|
||||
|
||||
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
|
||||
let scheduler = Scheduler::new(
|
||||
self.scheduler_queue.clone(),
|
||||
self.time.reader(),
|
||||
self.is_halted.clone(),
|
||||
);
|
||||
let mut simulation = Simulation::new(
|
||||
self.executor,
|
||||
self.scheduler_queue,
|
||||
@ -167,6 +178,7 @@ impl SimInit {
|
||||
self.timeout,
|
||||
self.observers,
|
||||
self.model_names,
|
||||
self.is_halted,
|
||||
);
|
||||
simulation.run()?;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user