From d4494cf0d34902a8ffaead45ad9269b8fcf6f7ae Mon Sep 17 00:00:00 2001 From: Ulrich Mohr Date: Thu, 13 Mar 2025 17:59:52 +0100 Subject: [PATCH] ported romeo patch from v0.2.2 original patch is 93e643a5fde01a9ca0538d9813957687c541de65 --- nexosim/src/simulation.rs | 167 ++++++++++++++++++++++++++++++++++++++ nexosim/src/time/clock.rs | 24 ++++++ 2 files changed, 191 insertions(+) diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index c4e9193..9287378 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -501,6 +501,173 @@ impl Simulation { } } + /// Advances simulation time to that of the next scheduled action if its + /// scheduling time does not exceed the specified bound, processing that + /// action as well as all other actions scheduled for the same time. + /// + /// If at least one action was found that satisfied the time bound, the + /// corresponding new simulation time is returned. + pub fn step_with_idle_worker( + &mut self, + event_handler: F, + handler_address: impl Into> + Clone, + idle_worker: G, + ) -> Result<(), ExecutionError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + G: FnOnce(Duration) -> Vec, + { + if self.is_terminated { + return Err(ExecutionError::Terminated); + } + + if self.is_halted.load(Ordering::Relaxed) { + self.is_terminated = true; + return Err(ExecutionError::Halted); + } + + // Function pulling the next action. If the action is periodic, it is + // immediately re-scheduled. + fn pull_next_action(scheduler_queue: &mut MutexGuard) -> Action { + let ((time, channel_id), action) = scheduler_queue.pull().unwrap(); + if let Some((action_clone, period)) = action.next() { + scheduler_queue.insert((time + period, channel_id), action_clone); + } + + action + } + + let upper_time_bound = MonotonicTime::MAX; + + // Closure returning the next key which time stamp is no older than the + // upper bound, if any. Cancelled actions are pulled and discarded. + let peek_next_key = |scheduler_queue: &mut MutexGuard| { + loop { + match scheduler_queue.peek() { + Some((&key, action)) if key.0 <= upper_time_bound => { + if !action.is_cancelled() { + break Some(key); + } + // Discard cancelled actions. + scheduler_queue.pull(); + } + _ => break None, + } + } + }; + + // Move to the next scheduled time. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let mut current_key_opt = peek_next_key(&mut scheduler_queue); + let next_step = match current_key_opt { + Some((time, _)) => time, + None => upper_time_bound, + }; + + // so we can alter the queue + drop(scheduler_queue); + + // TODO this fails quite bad with clock being NoClock. Can we warn about it? + let block_timeout = next_step.duration_since(self.clock.now()); + + let sleeper = spin_sleep::SpinSleeper::default(); + let accuracy = Duration::new(0, sleeper.native_accuracy_ns()); + + // only let worker run if it has a chance to return in time + // insert events returned by worker into queue + // TODO if woker returns early without event, it will not be called before next scheduled + // event, which might be a long while -> worker should either return some events or timeout + if block_timeout > accuracy { + let event_data = idle_worker(block_timeout - accuracy); + + if !event_data.is_empty() { + // get a local scheduler + let scheduler = Scheduler::new( + self.scheduler_queue.clone(), + self.time.reader(), + self.is_halted.clone(), + ); + // schedule event for immediate execution (needs to be > now, hence the nanosecond) + // we do not process the event directely but prefer putting it into the queue + // that way, the code following can be left unmodified + // otherwise, we would need to loop over idle_worker() until the next event in the queue + // is reached. + for element in event_data { + scheduler.schedule_event( + Duration::from_nanos(1), + event_handler.clone(), + element, + handler_address.clone(), + ) + .unwrap(); + } + } + } + + // Start over finding the next event, as queue might have been altered by idle worker + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let mut current_key = match peek_next_key(&mut scheduler_queue) { + Some(key) => key, + None => return Ok(()), + }; + self.time.write(current_key.0); + + loop { + let action = pull_next_action(&mut scheduler_queue); + let mut next_key = peek_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { + // Since there are no other actions with the same origin and the + // same time, the action is spawned immediately. + action.spawn_and_forget(&self.executor); + } else { + // To ensure that their relative order of execution is + // preserved, all actions with the same origin are executed + // sequentially within a single compound future. + let mut action_sequence = SeqFuture::new(); + action_sequence.push(action.into_future()); + loop { + let action = pull_next_action(&mut scheduler_queue); + action_sequence.push(action.into_future()); + next_key = peek_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { + break; + } + } + + // Spawn a compound future that sequentially polls all actions + // targeting the same mailbox. + self.executor.spawn_and_forget(action_sequence); + } + + current_key = match next_key { + // If the next action is scheduled at the same time, update the + // key and continue. + Some(k) if k.0 == current_key.0 => k, + // Otherwise wait until all actions have completed and return. + _ => { + drop(scheduler_queue); // make sure the queue's mutex is released. + + let current_time = current_key.0; + if let SyncStatus::OutOfSync(lag) = self.clock.synchronize(current_time) { + if let Some(tolerance) = &self.clock_tolerance { + if &lag > tolerance { + self.is_terminated = true; + + return Err(ExecutionError::OutOfSync(lag)); + } + } + } + self.run()?; + + return Ok(()); + } + }; + } + } + /// Iteratively advances simulation time and processes all actions scheduled /// up to the specified target time. /// diff --git a/nexosim/src/time/clock.rs b/nexosim/src/time/clock.rs index 7a86b07..7d42bf3 100644 --- a/nexosim/src/time/clock.rs +++ b/nexosim/src/time/clock.rs @@ -14,18 +14,29 @@ use crate::time::MonotonicTime; pub trait Clock: Send { /// Blocks until the deadline. fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus; + + // Returns current (as of time of the call) sim time + fn now(&self) -> MonotonicTime; } impl Clock for &mut C { fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus { (**self).synchronize(deadline) } + + fn now(&self) -> MonotonicTime{ + (**self).now() + } } impl Clock for Box { fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus { (**self).synchronize(deadline) } + + fn now(&self) -> MonotonicTime{ + (**self).now() + } } /// The current synchronization status of a clock. @@ -58,6 +69,10 @@ impl Clock for NoClock { fn synchronize(&mut self, _: MonotonicTime) -> SyncStatus { SyncStatus::Synchronized } + + fn now(&self) -> MonotonicTime{ + MonotonicTime::new(0, 0).unwrap() + } } /// A real-time [`Clock`] based on the system's monotonic clock. @@ -158,6 +173,10 @@ impl Clock for SystemClock { SyncStatus::OutOfSync(now.duration_since(deadline)) } + + fn now(&self) -> MonotonicTime{ + self.0.now() + } } /// An automatically initialized real-time [`Clock`] based on the system's @@ -194,6 +213,11 @@ impl Clock for AutoSystemClock { Some(clock) => clock.synchronize(deadline), } } + + fn now(&self) -> MonotonicTime { + // TODO: how to avoid panic? self is not mut + self.inner.unwrap().now() + } } #[cfg(test)]