From 1b08f10e42ecfcc76d6d9867493ce64e6697659f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Mon, 13 Jan 2025 15:23:26 +0100 Subject: [PATCH 1/6] Add possibility to halt simulation --- nexosim/src/grpc/api/simulation.proto | 19 ++++++++-------- nexosim/src/grpc/codegen/simulation.v1.rs | 21 ++++++++++-------- nexosim/src/grpc/services.rs | 1 + nexosim/src/ports/output/broadcaster.rs | 14 +++++++----- nexosim/src/ports/source/broadcaster.rs | 14 +++++++----- nexosim/src/simulation.rs | 27 ++++++++++++++++++++++- nexosim/src/simulation/scheduler.rs | 26 +++++++++++++++++++--- nexosim/src/simulation/sim_init.rs | 16 ++++++++++++-- 8 files changed, 104 insertions(+), 34 deletions(-) diff --git a/nexosim/src/grpc/api/simulation.proto b/nexosim/src/grpc/api/simulation.proto index 1841f63..58afd12 100644 --- a/nexosim/src/grpc/api/simulation.proto +++ b/nexosim/src/grpc/api/simulation.proto @@ -17,15 +17,16 @@ enum ErrorCode { INVALID_KEY = 6; INITIALIZER_PANIC = 10; SIMULATION_NOT_STARTED = 11; - SIMULATION_TERMINATED = 12; - SIMULATION_DEADLOCK = 13; - SIMULATION_MESSAGE_LOSS = 14; - SIMULATION_NO_RECIPIENT = 15; - SIMULATION_PANIC = 16; - SIMULATION_TIMEOUT = 17; - SIMULATION_OUT_OF_SYNC = 18; - SIMULATION_BAD_QUERY = 19; - SIMULATION_TIME_OUT_OF_RANGE = 20; + SIMULATION_HALTED = 12; + SIMULATION_TERMINATED = 13; + SIMULATION_DEADLOCK = 14; + SIMULATION_MESSAGE_LOSS = 15; + SIMULATION_NO_RECIPIENT = 16; + SIMULATION_PANIC = 17; + SIMULATION_TIMEOUT = 18; + SIMULATION_OUT_OF_SYNC = 19; + SIMULATION_BAD_QUERY = 20; + SIMULATION_TIME_OUT_OF_RANGE = 21; SOURCE_NOT_FOUND = 30; SINK_NOT_FOUND = 31; } diff --git a/nexosim/src/grpc/codegen/simulation.v1.rs b/nexosim/src/grpc/codegen/simulation.v1.rs index bbbd6a5..88f9be4 100644 --- a/nexosim/src/grpc/codegen/simulation.v1.rs +++ b/nexosim/src/grpc/codegen/simulation.v1.rs @@ -343,15 +343,16 @@ pub enum ErrorCode { InvalidKey = 6, InitializerPanic = 10, SimulationNotStarted = 11, - SimulationTerminated = 12, - SimulationDeadlock = 13, - SimulationMessageLoss = 14, - SimulationNoRecipient = 15, - SimulationPanic = 16, - SimulationTimeout = 17, - SimulationOutOfSync = 18, - SimulationBadQuery = 19, - SimulationTimeOutOfRange = 20, + SimulationHalted = 12, + SimulationTerminated = 13, + SimulationDeadlock = 14, + SimulationMessageLoss = 15, + SimulationNoRecipient = 16, + SimulationPanic = 17, + SimulationTimeout = 18, + SimulationOutOfSync = 19, + SimulationBadQuery = 20, + SimulationTimeOutOfRange = 21, SourceNotFound = 30, SinkNotFound = 31, } @@ -371,6 +372,7 @@ impl ErrorCode { Self::InvalidKey => "INVALID_KEY", Self::InitializerPanic => "INITIALIZER_PANIC", Self::SimulationNotStarted => "SIMULATION_NOT_STARTED", + Self::SimulationHalted => "SIMULATION_HALTED", Self::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationDeadlock => "SIMULATION_DEADLOCK", Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS", @@ -396,6 +398,7 @@ impl ErrorCode { "INVALID_KEY" => Some(Self::InvalidKey), "INITIALIZER_PANIC" => Some(Self::InitializerPanic), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), + "SIMULATION_HALTED" => Some(Self::SimulationHalted), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss), diff --git a/nexosim/src/grpc/services.rs b/nexosim/src/grpc/services.rs index 05b5e9e..8d752a3 100644 --- a/nexosim/src/grpc/services.rs +++ b/nexosim/src/grpc/services.rs @@ -42,6 +42,7 @@ fn map_execution_error(error: ExecutionError) -> Error { ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, + ExecutionError::Halted => ErrorCode::SimulationHalted, ExecutionError::Terminated => ErrorCode::SimulationTerminated, ExecutionError::InvalidDeadline(_) => ErrorCode::InvalidDeadline, }; diff --git a/nexosim/src/ports/output/broadcaster.rs b/nexosim/src/ports/output/broadcaster.rs index e7d7470..77db015 100644 --- a/nexosim/src/ports/output/broadcaster.rs +++ b/nexosim/src/ports/output/broadcaster.rs @@ -503,7 +503,7 @@ fn recycle_vec(mut v: Vec) -> Vec { #[cfg(all(test, not(nexosim_loom)))] mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; @@ -578,9 +578,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); @@ -648,9 +649,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); block_on(async { @@ -708,9 +710,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); @@ -793,9 +796,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); diff --git a/nexosim/src/ports/source/broadcaster.rs b/nexosim/src/ports/source/broadcaster.rs index 1a1bdac..f221e63 100644 --- a/nexosim/src/ports/source/broadcaster.rs +++ b/nexosim/src/ports/source/broadcaster.rs @@ -398,7 +398,7 @@ impl Iterator for ReplyIterator { #[cfg(all(test, not(nexosim_loom)))] mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; @@ -473,9 +473,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); @@ -543,9 +544,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); block_on(async { @@ -603,9 +605,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); @@ -688,9 +691,10 @@ mod tests { let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); let mut dummy_cx = Context::new( String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time), + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), Address(dummy_address), ); diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index a80271b..d890fba 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -94,6 +94,7 @@ use std::error::Error; use std::fmt; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::Poll; use std::time::Duration; @@ -160,6 +161,7 @@ pub struct Simulation { timeout: Duration, observers: Vec<(String, Box)>, model_names: Vec, + is_halted: Arc, is_terminated: bool, } @@ -175,6 +177,7 @@ impl Simulation { timeout: Duration, observers: Vec<(String, Box)>, model_names: Vec, + is_halted: Arc, ) -> Self { Self { executor, @@ -185,6 +188,7 @@ impl Simulation { timeout, observers, model_names, + is_halted, is_terminated: false, } } @@ -328,6 +332,12 @@ impl Simulation { /// Runs the executor. fn run(&mut self) -> Result<(), ExecutionError> { + // Defensive programming, shouldn't happen + if self.is_halted.load(Ordering::Relaxed) { + return Err(ExecutionError::Terminated); + } + + // Defensive programming, shouldn't happen if self.is_terminated { return Err(ExecutionError::Terminated); } @@ -386,6 +396,14 @@ impl Simulation { &mut self, upper_time_bound: MonotonicTime, ) -> Result, ExecutionError> { + if self.is_halted.load(Ordering::Relaxed) { + return Err(ExecutionError::Terminated); + } + + if self.is_terminated { + return Err(ExecutionError::Terminated); + } + // Function pulling the next action. If the action is periodic, it is // immediately re-scheduled. fn pull_next_action(scheduler_queue: &mut MutexGuard) -> Action { @@ -506,7 +524,11 @@ impl Simulation { /// Returns a scheduler handle. #[cfg(feature = "grpc")] pub(crate) fn scheduler(&self) -> Scheduler { - Scheduler::new(self.scheduler_queue.clone(), self.time.reader()) + Scheduler::new( + self.scheduler_queue.clone(), + self.time.reader(), + self.is_halted.clone(), + ) } } @@ -533,6 +555,8 @@ pub struct DeadlockInfo { /// An error returned upon simulation execution failure. #[derive(Debug)] pub enum ExecutionError { + /// The simulation has been intentionally stopped. + Halted, /// The simulation has been terminated due to an earlier deadlock, message /// loss, missing recipient, model panic, timeout or synchronization loss. Terminated, @@ -613,6 +637,7 @@ pub enum ExecutionError { impl fmt::Display for ExecutionError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + Self::Halted => f.write_str("the simulation has been intentionally stopped"), Self::Terminated => f.write_str("the simulation has been terminated"), Self::Deadlock(list) => { f.write_str( diff --git a/nexosim/src/simulation/scheduler.rs b/nexosim/src/simulation/scheduler.rs index e938d4c..eaa90a4 100644 --- a/nexosim/src/simulation/scheduler.rs +++ b/nexosim/src/simulation/scheduler.rs @@ -27,8 +27,12 @@ const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; pub struct Scheduler(GlobalScheduler); impl Scheduler { - pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { - Self(GlobalScheduler::new(scheduler_queue, time)) + pub(crate) fn new( + scheduler_queue: Arc>, + time: AtomicTimeReader, + is_halted: Arc, + ) -> Self { + Self(GlobalScheduler::new(scheduler_queue, time, is_halted)) } /// Returns the current simulation time. @@ -175,6 +179,11 @@ impl Scheduler { GLOBAL_SCHEDULER_ORIGIN_ID, ) } + + /// Stops the simulation on the next step. + pub fn halt(&mut self) { + self.0.halt() + } } impl fmt::Debug for Scheduler { @@ -341,13 +350,19 @@ pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, usize), Action>; pub(crate) struct GlobalScheduler { scheduler_queue: Arc>, time: AtomicTimeReader, + is_halted: Arc, } impl GlobalScheduler { - pub(crate) fn new(scheduler_queue: Arc>, time: AtomicTimeReader) -> Self { + pub(crate) fn new( + scheduler_queue: Arc>, + time: AtomicTimeReader, + is_halted: Arc, + ) -> Self { Self { scheduler_queue, time, + is_halted, } } @@ -538,6 +553,11 @@ impl GlobalScheduler { Ok(event_key) } + + /// Stops the simulation on the next step. + pub(crate) fn halt(&mut self) { + self.is_halted.store(true, Ordering::Relaxed); + } } impl fmt::Debug for GlobalScheduler { diff --git a/nexosim/src/simulation/sim_init.rs b/nexosim/src/simulation/sim_init.rs index 273b256..9969dd4 100644 --- a/nexosim/src/simulation/sim_init.rs +++ b/nexosim/src/simulation/sim_init.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -20,6 +21,7 @@ pub struct SimInit { executor: Executor, scheduler_queue: Arc>, time: AtomicTime, + is_halted: Arc, clock: Box, clock_tolerance: Option, timeout: Duration, @@ -64,6 +66,7 @@ impl SimInit { executor, scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time, + is_halted: Arc::new(AtomicBool::new(false)), clock: Box::new(NoClock::new()), clock_tolerance: None, timeout: Duration::ZERO, @@ -91,7 +94,11 @@ impl SimInit { }; self.observers .push((name.clone(), Box::new(mailbox.0.observer()))); - let scheduler = GlobalScheduler::new(self.scheduler_queue.clone(), self.time.reader()); + let scheduler = GlobalScheduler::new( + self.scheduler_queue.clone(), + self.time.reader(), + self.is_halted.clone(), + ); add_model( model, @@ -157,7 +164,11 @@ impl SimInit { self.time.write(start_time); self.clock.synchronize(start_time); - let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); + let scheduler = Scheduler::new( + self.scheduler_queue.clone(), + self.time.reader(), + self.is_halted.clone(), + ); let mut simulation = Simulation::new( self.executor, self.scheduler_queue, @@ -167,6 +178,7 @@ impl SimInit { self.timeout, self.observers, self.model_names, + self.is_halted, ); simulation.run()?; From 27ec1396df54253823a4b5a25ca1e79f4d1697fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Wed, 15 Jan 2025 13:10:37 +0100 Subject: [PATCH 2/6] Add infinite step and an example. --- nexosim/examples/external_input.rs | 4 +- nexosim/examples/infinite_loop.rs | 127 +++++++++++++++++++++++++++++ nexosim/src/simulation.rs | 54 ++++++++---- 3 files changed, 168 insertions(+), 17 deletions(-) create mode 100644 nexosim/examples/infinite_loop.rs diff --git a/nexosim/examples/external_input.rs b/nexosim/examples/external_input.rs index 1b5406a..f64a012 100644 --- a/nexosim/examples/external_input.rs +++ b/nexosim/examples/external_input.rs @@ -148,9 +148,9 @@ impl Model for Listener { impl Drop for Listener { /// Wait for UDP Server shutdown. fn drop(&mut self) { - self.server_handle.take().map(|handle| { + if let Some(handle) = self.server_handle.take() { let _ = handle.join(); - }); + }; } } diff --git a/nexosim/examples/infinite_loop.rs b/nexosim/examples/infinite_loop.rs new file mode 100644 index 0000000..4af001e --- /dev/null +++ b/nexosim/examples/infinite_loop.rs @@ -0,0 +1,127 @@ +//! Example: a simulation that runs infinitely, receiving data from +//! outside. This setup is typical for hardware-in-the-loop use case. +//! +//! This example demonstrates in particular: +//! +//! * infinite simulation (useful in hardware-in-the-loop), +//! * simulation halting, +//! * processing of external data (useful in co-simulation), +//! * system clock, +//! * periodic scheduling. +//! +//! ```text +//! ┏━━━━━━━━━━━━━━━━━━━━━━━━┓ +//! ┃ Simulation ┃ +//!┌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┐ ┃ ┌──────────┐ ┃ +//!┆ ┆ message ┃ │ │ message ┃ +//!┆ External thread ├╌╌╌╌╌╌╌╌╌╌╌╂╌╌►│ Listener ├─────────╂─► +//!┆ ┆ [channel] ┃ │ │ ┃ +//!└╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┘ ┃ └──────────┘ ┃ +//! ┗━━━━━━━━━━━━━━━━━━━━━━━━┛ +//! ``` + +use std::sync::mpsc::{channel, Receiver}; +use std::thread::{self, sleep}; +use std::time::Duration; + +use nexosim::model::{Context, InitializedModel, Model}; +use nexosim::ports::{EventBuffer, Output}; +use nexosim::simulation::{Mailbox, SimInit, SimulationError}; +use nexosim::time::{AutoSystemClock, MonotonicTime}; + +const DELTA: Duration = Duration::from_millis(2); +const PERIOD: Duration = Duration::from_millis(20); +const N: usize = 10; + +/// The `Listener` Model. +pub struct Listener { + /// Received message. + pub message: Output, + + /// Source of external messages. + external: Receiver, +} + +impl Listener { + /// Creates new `Listener` model. + fn new(external: Receiver) -> Self { + Self { + message: Output::default(), + external, + } + } + + /// Periodically scheduled function that processes external events. + async fn process(&mut self) { + while let Ok(message) = self.external.try_recv() { + self.message.send(message).await; + } + } +} + +impl Model for Listener { + /// Initialize model. + async fn init(self, cx: &mut Context) -> InitializedModel { + // Schedule periodic function that processes external events. + cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) + .unwrap(); + + self.into() + } +} + +fn main() -> Result<(), SimulationError> { + // --------------- + // Bench assembly. + // --------------- + + // Channel for communication with simulation from outside. + let (tx, rx) = channel(); + + // Models. + + // The listener model. + let mut listener = Listener::new(rx); + + // Mailboxes. + let listener_mbox = Mailbox::new(); + + // Model handles for simulation. + let mut message = EventBuffer::with_capacity(N + 1); + listener.message.connect_sink(&message); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let (mut simu, mut scheduler) = SimInit::new() + .add_model(listener, listener_mbox, "listener") + .set_clock(AutoSystemClock::new()) + .init(t0)?; + + // Simulation thread. + let simulation_handle = thread::spawn(move || { + // ---------- + // Simulation. + // ---------- + simu.step_forever() + }); + + // Send data to simulation from outside. + for i in 0..N { + tx.send(i.to_string()).unwrap(); + if i % 3 == 0 { + sleep(PERIOD * i as u32) + } + } + + // Check collected external messages. + for i in 0..N { + assert_eq!(message.next().unwrap(), i.to_string()); + } + assert_eq!(message.next(), None); + + // Stop the simulation. + scheduler.halt(); + Ok(simulation_handle.join().unwrap()?) +} diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index d890fba..1a606a5 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -220,7 +220,7 @@ impl Simulation { /// simulation clock. This method blocks until all newly processed events /// have completed. pub fn step(&mut self) -> Result<(), ExecutionError> { - self.step_to_next_bounded(MonotonicTime::MAX).map(|_| ()) + self.step_to_next(None).map(|_| ()) } /// Iteratively advances the simulation time until the specified deadline, @@ -236,7 +236,19 @@ impl Simulation { if target_time < now { return Err(ExecutionError::InvalidDeadline(target_time)); } - self.step_until_unchecked(target_time) + self.step_until_unchecked(Some(target_time)) + } + + /// Iteratively advances the simulation time, as if by calling + /// [`Simulation::step()`] repeatedly. + /// + /// This method blocks until all events scheduled have completed. If + /// simulation is halted, this method returns without an error. + pub fn step_forever(&mut self) -> Result<(), ExecutionError> { + match self.step_until_unchecked(None) { + Err(ExecutionError::Halted) => Ok(()), + result => result, + } } /// Processes an action immediately, blocking until completion. @@ -333,8 +345,9 @@ impl Simulation { /// Runs the executor. fn run(&mut self) -> Result<(), ExecutionError> { // Defensive programming, shouldn't happen - if self.is_halted.load(Ordering::Relaxed) { - return Err(ExecutionError::Terminated); + if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); } // Defensive programming, shouldn't happen @@ -392,12 +405,13 @@ impl Simulation { /// /// If at least one action was found that satisfied the time bound, the /// corresponding new simulation time is returned. - fn step_to_next_bounded( + fn step_to_next( &mut self, - upper_time_bound: MonotonicTime, + upper_time_bound: Option, ) -> Result, ExecutionError> { - if self.is_halted.load(Ordering::Relaxed) { - return Err(ExecutionError::Terminated); + if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); } if self.is_terminated { @@ -415,12 +429,17 @@ impl Simulation { action } + let (unbounded, upper_time_bound) = match upper_time_bound { + Some(upper_time_bound) => (false, upper_time_bound), + None => (true, MonotonicTime::MAX), + }; + // Closure returning the next key which time stamp is no older than the // upper bound, if any. Cancelled actions are pulled and discarded. let peek_next_key = |scheduler_queue: &mut MutexGuard| { loop { match scheduler_queue.peek() { - Some((&key, action)) if key.0 <= upper_time_bound => { + Some((&key, action)) if unbounded || key.0 <= upper_time_bound => { if !action.is_cancelled() { break Some(key); } @@ -502,16 +521,21 @@ impl Simulation { /// /// This method does not check whether the specified time lies in the future /// of the current simulation time. - fn step_until_unchecked(&mut self, target_time: MonotonicTime) -> Result<(), ExecutionError> { + fn step_until_unchecked( + &mut self, + target_time: Option, + ) -> Result<(), ExecutionError> { loop { - match self.step_to_next_bounded(target_time) { + match self.step_to_next(target_time) { // The target time was reached exactly. - Ok(Some(t)) if t == target_time => return Ok(()), + Ok(reached_time) if reached_time == target_time => return Ok(()), // No actions are scheduled before or at the target time. Ok(None) => { - // Update the simulation time. - self.time.write(target_time); - self.clock.synchronize(target_time); + if let Some(target_time) = target_time { + // Update the simulation time. + self.time.write(target_time); + self.clock.synchronize(target_time); + } return Ok(()); } Err(e) => return Err(e), From 4111d492951d731fcde01f8dddbb2e35fe3a2c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Wed, 15 Jan 2025 13:29:36 +0100 Subject: [PATCH 3/6] Provide dummy context for testing. --- nexosim/src/model/context.rs | 16 ++++++++ nexosim/src/ports/output/broadcaster.rs | 52 +++---------------------- nexosim/src/ports/source/broadcaster.rs | 52 +++---------------------- nexosim/src/simulation/scheduler.rs | 14 +++++++ 4 files changed, 42 insertions(+), 92 deletions(-) diff --git a/nexosim/src/model/context.rs b/nexosim/src/model/context.rs index 2d593e6..0555d99 100644 --- a/nexosim/src/model/context.rs +++ b/nexosim/src/model/context.rs @@ -8,6 +8,9 @@ use crate::time::{Deadline, MonotonicTime}; use super::{Model, ProtoModel}; +#[cfg(all(test, not(nexosim_loom)))] +use crate::channel::Receiver; + /// A local context for models. /// /// A `Context` is a handle to the global context associated to a model @@ -521,3 +524,16 @@ impl<'a, P: ProtoModel> BuildContext<'a, P> { ); } } + +#[cfg(all(test, not(nexosim_loom)))] +impl Context { + /// Creates a dummy context for testing purposes. + pub(crate) fn new_dummy() -> Self { + let dummy_address = Receiver::new(1).sender(); + Context::new( + String::new(), + GlobalScheduler::new_dummy(), + Address(dummy_address), + ) + } +} diff --git a/nexosim/src/ports/output/broadcaster.rs b/nexosim/src/ports/output/broadcaster.rs index 77db015..4da1d4a 100644 --- a/nexosim/src/ports/output/broadcaster.rs +++ b/nexosim/src/ports/output/broadcaster.rs @@ -503,17 +503,13 @@ fn recycle_vec(mut v: Vec) -> Vec { #[cfg(all(test, not(nexosim_loom)))] mod tests { - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::sync::{Arc, Mutex}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use std::thread; use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, GlobalScheduler}; - use crate::time::{MonotonicTime, TearableAtomicTime}; - use crate::util::priority_queue::PriorityQueue; - use crate::util::sync_cell::SyncCell; use super::super::sender::{ FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender, @@ -574,16 +570,7 @@ mod tests { let mut sum_model = SumModel::new(sum.clone()); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); } }) @@ -645,16 +632,7 @@ mod tests { let mut sum_model = SumModel::new(sum.clone()); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(async { mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); @@ -706,16 +684,7 @@ mod tests { let mut double_model = DoubleModel::new(); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } @@ -792,16 +761,7 @@ mod tests { let mut double_model = DoubleModel::new(); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(async { mailbox diff --git a/nexosim/src/ports/source/broadcaster.rs b/nexosim/src/ports/source/broadcaster.rs index f221e63..69c91ed 100644 --- a/nexosim/src/ports/source/broadcaster.rs +++ b/nexosim/src/ports/source/broadcaster.rs @@ -398,17 +398,13 @@ impl Iterator for ReplyIterator { #[cfg(all(test, not(nexosim_loom)))] mod tests { - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::sync::{Arc, Mutex}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use std::thread; use futures_executor::block_on; use crate::channel::Receiver; - use crate::simulation::{Address, GlobalScheduler}; - use crate::time::{MonotonicTime, TearableAtomicTime}; - use crate::util::priority_queue::PriorityQueue; - use crate::util::sync_cell::SyncCell; use super::super::sender::{ FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender, @@ -469,16 +465,7 @@ mod tests { let mut sum_model = SumModel::new(sum.clone()); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(mailbox.recv(&mut sum_model, &mut dummy_cx)).unwrap(); } }) @@ -540,16 +527,7 @@ mod tests { let mut sum_model = SumModel::new(sum.clone()); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(async { mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); mailbox.recv(&mut sum_model, &mut dummy_cx).await.unwrap(); @@ -601,16 +579,7 @@ mod tests { let mut double_model = DoubleModel::new(); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(mailbox.recv(&mut double_model, &mut dummy_cx)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } @@ -687,16 +656,7 @@ mod tests { let mut double_model = DoubleModel::new(); move || { - let dummy_address = Receiver::new(1).sender(); - let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); - let dummy_time = - SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); - let dummy_halter = Arc::new(AtomicBool::new(false)); - let mut dummy_cx = Context::new( - String::new(), - GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter), - Address(dummy_address), - ); + let mut dummy_cx = Context::new_dummy(); block_on(async { mailbox diff --git a/nexosim/src/simulation/scheduler.rs b/nexosim/src/simulation/scheduler.rs index eaa90a4..e8c2073 100644 --- a/nexosim/src/simulation/scheduler.rs +++ b/nexosim/src/simulation/scheduler.rs @@ -20,6 +20,9 @@ use crate::simulation::Address; use crate::time::{AtomicTimeReader, Deadline, MonotonicTime}; use crate::util::priority_queue::PriorityQueue; +#[cfg(all(test, not(nexosim_loom)))] +use crate::{time::TearableAtomicTime, util::sync_cell::SyncCell}; + const GLOBAL_SCHEDULER_ORIGIN_ID: usize = 0; /// A global scheduler. @@ -834,3 +837,14 @@ pub(crate) async fn send_keyed_event( ) .await; } + +#[cfg(all(test, not(nexosim_loom)))] +impl GlobalScheduler { + /// Creates a dummy scheduler for testing purposes. + pub(crate) fn new_dummy() -> Self { + let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); + let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_halter = Arc::new(AtomicBool::new(false)); + GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_halter) + } +} From beaefe1d9eff4008451b8d54f1da35653e41b771 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Wed, 15 Jan 2025 13:38:27 +0100 Subject: [PATCH 4/6] Changes after review --- nexosim/src/simulation.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index 1a606a5..cf20b02 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -344,17 +344,15 @@ impl Simulation { /// Runs the executor. fn run(&mut self) -> Result<(), ExecutionError> { - // Defensive programming, shouldn't happen - if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) { - self.is_terminated = true; - return Err(ExecutionError::Halted); - } - - // Defensive programming, shouldn't happen if self.is_terminated { return Err(ExecutionError::Terminated); } + if self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); + } + self.executor.run(self.timeout).map_err(|e| { self.is_terminated = true; @@ -409,15 +407,15 @@ impl Simulation { &mut self, upper_time_bound: Option, ) -> Result, ExecutionError> { - if !self.is_terminated && self.is_halted.load(Ordering::Relaxed) { - self.is_terminated = true; - return Err(ExecutionError::Halted); - } - if self.is_terminated { return Err(ExecutionError::Terminated); } + if self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); + } + // Function pulling the next action. If the action is periodic, it is // immediately re-scheduled. fn pull_next_action(scheduler_queue: &mut MutexGuard) -> Action { From 922d93bd01687b9eaab32dd636dd0a021cb4a883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Wed, 15 Jan 2025 14:38:31 +0100 Subject: [PATCH 5/6] Change method name --- nexosim/examples/infinite_loop.rs | 2 +- nexosim/src/simulation.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nexosim/examples/infinite_loop.rs b/nexosim/examples/infinite_loop.rs index 4af001e..32c92b5 100644 --- a/nexosim/examples/infinite_loop.rs +++ b/nexosim/examples/infinite_loop.rs @@ -104,7 +104,7 @@ fn main() -> Result<(), SimulationError> { // ---------- // Simulation. // ---------- - simu.step_forever() + simu.step_unbounded() }); // Send data to simulation from outside. diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index cf20b02..28c3ffd 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -244,7 +244,7 @@ impl Simulation { /// /// This method blocks until all events scheduled have completed. If /// simulation is halted, this method returns without an error. - pub fn step_forever(&mut self) -> Result<(), ExecutionError> { + pub fn step_unbounded(&mut self) -> Result<(), ExecutionError> { match self.step_until_unchecked(None) { Err(ExecutionError::Halted) => Ok(()), result => result, From a223a14cc6716913b70078741ee156161057937d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Wed, 15 Jan 2025 15:59:33 +0100 Subject: [PATCH 6/6] Change after review. --- nexosim/src/simulation.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index 28c3ffd..b93f287 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -427,17 +427,14 @@ impl Simulation { action } - let (unbounded, upper_time_bound) = match upper_time_bound { - Some(upper_time_bound) => (false, upper_time_bound), - None => (true, MonotonicTime::MAX), - }; + let upper_time_bound = upper_time_bound.unwrap_or(MonotonicTime::MAX); // Closure returning the next key which time stamp is no older than the // upper bound, if any. Cancelled actions are pulled and discarded. let peek_next_key = |scheduler_queue: &mut MutexGuard| { loop { match scheduler_queue.peek() { - Some((&key, action)) if unbounded || key.0 <= upper_time_bound => { + Some((&key, action)) if key.0 <= upper_time_bound => { if !action.is_cancelled() { break Some(key); }