From 93e643a5fde01a9ca0538d9813957687c541de65 Mon Sep 17 00:00:00 2001 From: Ulrich Mohr Date: Wed, 31 Jul 2024 01:07:18 +0200 Subject: [PATCH] patches for romeo --- asynchronix/src/simulation.rs | 166 +++++++++++++++++++++++++++++++++- asynchronix/src/time/clock.rs | 57 ++++++++++++ 2 files changed, 220 insertions(+), 3 deletions(-) diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 1fba49f..34547b7 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -131,7 +131,7 @@ use std::error::Error; use std::fmt; use std::future::Future; use std::sync::{Arc, Mutex, MutexGuard}; -use std::time::Duration; +use std::time::{Duration, Instant}; use recycle_box::{coerce_box, RecycleBox}; @@ -238,6 +238,155 @@ impl Simulation { self.step_to_next_bounded(MonotonicTime::MAX); } + /// Advances simulation until the idle worker returns or times out + /// + /// If the idle worker returns event data, the data is schduled to be immediately + /// handled by the event handler method using the handler address. After handling + /// all of these events, this method returns with the simulation time set to the + /// time of the idle worker's return. + /// + /// If the idle worker returns no event data it is assumed to be timed out and + /// simulation time is advanced to that of the next scheduled event, processing + /// that event as well as all other event scheduled for the same time. + /// + /// The idle worker method MUST return after the timeout passed to it (plus or + /// minus OS inaccuracies, which are handled by this method) + pub fn step_with_idle_worker( + &mut self, + event_handler: F, + handler_address: impl Into> + Clone, + idle_worker: G, + ) where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + G: FnOnce(Duration) -> Vec, + { + let upper_time_bound = MonotonicTime::MAX; + + // Function pulling the next event. If the event is periodic, it is + // immediately re-scheduled. + fn pull_next_event( + scheduler_queue: &mut MutexGuard, + ) -> Box { + let ((time, channel_id), event) = scheduler_queue.pull().unwrap(); + if let Some((event_clone, period)) = event.next() { + scheduler_queue.insert((time + period, channel_id), event_clone); + } + + event + } + + // Closure returning the next key which time stamp is no older than the + // upper bound, if any. Cancelled events are pulled and discarded. + let peek_next_key = |scheduler_queue: &mut MutexGuard| { + loop { + match scheduler_queue.peek() { + Some((&k, t)) if k.0 <= upper_time_bound => { + if !t.is_cancelled() { + break Some(k); + } + // Discard cancelled events. + scheduler_queue.pull(); + } + _ => break None, + } + } + }; + + // Move to the next scheduled time. + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let current_key_opt = peek_next_key(&mut scheduler_queue); + let next_step = match current_key_opt { + Some((time, _)) => time, + None => upper_time_bound, + }; + // so we can alter the queue + drop(scheduler_queue); + + let block_timeout = self.clock.duration_until(next_step); + + let sleeper = spin_sleep::SpinSleeper::default(); + let accuracy = Duration::new(0, sleeper.native_accuracy_ns()); + + // only let worker run if it has a chance to return in time + // insert events returned by worker into queue + if block_timeout > accuracy { + let event_data = idle_worker(block_timeout - accuracy); + + if !event_data.is_empty() { + let sim_now = self.clock.current_sim_time(); + for element in event_data { + self.schedule_event( + sim_now, + event_handler.clone(), + element, + handler_address.clone(), + ) + .unwrap(); + } + } + } + + // Start over finding the next event, as queue might have been altered by idle worker + + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let current_key_opt = peek_next_key(&mut scheduler_queue); + + let mut current_key = match current_key_opt { + Some(key) => key, + // TODO I think, here we should adjust sim time first + None => return (), + }; + + self.time.write(current_key.0); + + loop { + let event = pull_next_event(&mut scheduler_queue); + let mut next_key = peek_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { + // Since there are no other events targeting the same mailbox + // and the same time, the event is spawned immediately. + event.spawn_and_forget(&self.executor); + } else { + // To ensure that their relative order of execution is + // preserved, all event targeting the same mailbox are executed + // sequentially within a single compound future. + let mut event_sequence = SeqFuture::new(); + event_sequence.push(event.into_future()); + loop { + let event = pull_next_event(&mut scheduler_queue); + event_sequence.push(event.into_future()); + next_key = peek_next_key(&mut scheduler_queue); + if next_key != Some(current_key) { + break; + } + } + + // Spawn a compound future that sequentially polls all events + // targeting the same mailbox. + self.executor.spawn_and_forget(event_sequence); + } + + current_key = match next_key { + // If the next event is scheduled at the same time, update the + // key and continue. + Some(k) if k.0 == current_key.0 => k, + // Otherwise wait until all events have completed and return. + _ => { + drop(scheduler_queue); // make sure the queue's mutex is released. + let current_time = current_key.0; + // TODO: check synchronization status? + self.clock.synchronize(current_time); + self.executor.run(); + + return (); + } + }; + } + } + /// Iteratively advances the simulation time by the specified duration, as /// if by calling [`Simulation::step()`] repeatedly. /// @@ -536,7 +685,18 @@ impl Simulation { // Move to the next scheduled time. let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - let mut current_key = peek_next_key(&mut scheduler_queue)?; + let current_key_opt = peek_next_key(&mut scheduler_queue); + let next_step = match current_key_opt { + Some((time, _)) => time, + None => upper_time_bound, + }; + self.clock.synchronize(next_step); + + let mut current_key = match current_key_opt { + Some(key) => key, + None => return Some(upper_time_bound), + }; + self.time.write(current_key.0); loop { @@ -575,7 +735,7 @@ impl Simulation { drop(scheduler_queue); // make sure the queue's mutex is released. let current_time = current_key.0; // TODO: check synchronization status? - self.clock.synchronize(current_time); + //self.clock.synchronize(current_time); self.executor.run(); return Some(current_time); diff --git a/asynchronix/src/time/clock.rs b/asynchronix/src/time/clock.rs index 54a7f95..ee64dd4 100644 --- a/asynchronix/src/time/clock.rs +++ b/asynchronix/src/time/clock.rs @@ -12,6 +12,13 @@ use crate::time::MonotonicTime; pub trait Clock: Send { /// Blocks until the deadline. fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus; + + /// returns duration until specified sim time + fn duration_until(&mut self, sim_time: MonotonicTime) -> Duration; + + /// returns sim time corresponding to current real time, adjusted + /// to the reference time of the clock + fn current_sim_time(&mut self) -> MonotonicTime; } /// The current synchronization status of a clock. @@ -42,6 +49,15 @@ impl Clock for NoClock { fn synchronize(&mut self, _: MonotonicTime) -> SyncStatus { SyncStatus::Synchronized } + + fn duration_until(&mut self, _deadline: MonotonicTime) -> Duration { + Duration::new(0, 0) + } + + fn current_sim_time(&mut self) -> MonotonicTime { + //TODO broken + panic!("you should not do this") + } } /// A real-time [`Clock`] based on the system's monotonic clock. @@ -196,6 +212,26 @@ impl Clock for SystemClock { None => SyncStatus::OutOfSync(now.duration_since(target_time)), } } + + fn duration_until(&mut self, deadline: MonotonicTime) -> Duration { + let target_time = if deadline >= self.simulation_ref { + self.wall_clock_ref + deadline.duration_since(self.simulation_ref) + } else { + self.wall_clock_ref - self.simulation_ref.duration_since(deadline) + }; + + let now = Instant::now(); + + match target_time.checked_duration_since(now) { + Some(sleep_duration) => sleep_duration, + None => panic!("invalid times"), + } + } + + fn current_sim_time(&mut self) -> MonotonicTime { + // TODO handle now() < wall_clock_ref? + self.simulation_ref + Instant::now().duration_since(self.wall_clock_ref) + } } /// An automatically initialized real-time [`Clock`] based on the system's @@ -232,4 +268,25 @@ impl Clock for AutoSystemClock { Some(clock) => clock.synchronize(deadline), } } + + fn duration_until(&mut self, deadline: MonotonicTime) -> Duration { + match self.inner { + None => { + let now = Instant::now(); + self.inner = Some(SystemClock::from_instant(deadline, now)); + Duration::new(0, 0) + } + Some(mut clock) => clock.duration_until(deadline), + } + } + + fn current_sim_time(&mut self) -> MonotonicTime { + //TODO This is broken + match self.inner { + None => { + panic!("I do not know what to do now") + } + Some(mut clock) => clock.current_sim_time(), + } + } }