1
0
forked from ROMEO/nexosim

ported romeo patch from v0.2.2

original patch is 93e643a5fde01a9ca0538d9813957687c541de65
This commit is contained in:
Ulrich Mohr 2025-03-13 17:59:52 +01:00
parent 30779d975c
commit d4494cf0d3
2 changed files with 191 additions and 0 deletions

View File

@ -501,6 +501,173 @@ impl Simulation {
}
}
/// Advances simulation time to that of the next scheduled action if its
/// scheduling time does not exceed the specified bound, processing that
/// action as well as all other actions scheduled for the same time.
///
/// If at least one action was found that satisfied the time bound, the
/// corresponding new simulation time is returned.
pub fn step_with_idle_worker<M, F, T, S, G>(
&mut self,
event_handler: F,
handler_address: impl Into<Address<M>> + Clone,
idle_worker: G,
) -> Result<(), ExecutionError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
G: FnOnce(Duration) -> Vec<T>,
{
if self.is_terminated {
return Err(ExecutionError::Terminated);
}
if self.is_halted.load(Ordering::Relaxed) {
self.is_terminated = true;
return Err(ExecutionError::Halted);
}
// Function pulling the next action. If the action is periodic, it is
// immediately re-scheduled.
fn pull_next_action(scheduler_queue: &mut MutexGuard<SchedulerQueue>) -> Action {
let ((time, channel_id), action) = scheduler_queue.pull().unwrap();
if let Some((action_clone, period)) = action.next() {
scheduler_queue.insert((time + period, channel_id), action_clone);
}
action
}
let upper_time_bound = MonotonicTime::MAX;
// Closure returning the next key which time stamp is no older than the
// upper bound, if any. Cancelled actions are pulled and discarded.
let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
loop {
match scheduler_queue.peek() {
Some((&key, action)) if key.0 <= upper_time_bound => {
if !action.is_cancelled() {
break Some(key);
}
// Discard cancelled actions.
scheduler_queue.pull();
}
_ => break None,
}
}
};
// Move to the next scheduled time.
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key_opt = peek_next_key(&mut scheduler_queue);
let next_step = match current_key_opt {
Some((time, _)) => time,
None => upper_time_bound,
};
// so we can alter the queue
drop(scheduler_queue);
// TODO this fails quite bad with clock being NoClock. Can we warn about it?
let block_timeout = next_step.duration_since(self.clock.now());
let sleeper = spin_sleep::SpinSleeper::default();
let accuracy = Duration::new(0, sleeper.native_accuracy_ns());
// only let worker run if it has a chance to return in time
// insert events returned by worker into queue
// TODO if woker returns early without event, it will not be called before next scheduled
// event, which might be a long while -> worker should either return some events or timeout
if block_timeout > accuracy {
let event_data = idle_worker(block_timeout - accuracy);
if !event_data.is_empty() {
// get a local scheduler
let scheduler = Scheduler::new(
self.scheduler_queue.clone(),
self.time.reader(),
self.is_halted.clone(),
);
// schedule event for immediate execution (needs to be > now, hence the nanosecond)
// we do not process the event directely but prefer putting it into the queue
// that way, the code following can be left unmodified
// otherwise, we would need to loop over idle_worker() until the next event in the queue
// is reached.
for element in event_data {
scheduler.schedule_event(
Duration::from_nanos(1),
event_handler.clone(),
element,
handler_address.clone(),
)
.unwrap();
}
}
}
// Start over finding the next event, as queue might have been altered by idle worker
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = match peek_next_key(&mut scheduler_queue) {
Some(key) => key,
None => return Ok(()),
};
self.time.write(current_key.0);
loop {
let action = pull_next_action(&mut scheduler_queue);
let mut next_key = peek_next_key(&mut scheduler_queue);
if next_key != Some(current_key) {
// Since there are no other actions with the same origin and the
// same time, the action is spawned immediately.
action.spawn_and_forget(&self.executor);
} else {
// To ensure that their relative order of execution is
// preserved, all actions with the same origin are executed
// sequentially within a single compound future.
let mut action_sequence = SeqFuture::new();
action_sequence.push(action.into_future());
loop {
let action = pull_next_action(&mut scheduler_queue);
action_sequence.push(action.into_future());
next_key = peek_next_key(&mut scheduler_queue);
if next_key != Some(current_key) {
break;
}
}
// Spawn a compound future that sequentially polls all actions
// targeting the same mailbox.
self.executor.spawn_and_forget(action_sequence);
}
current_key = match next_key {
// If the next action is scheduled at the same time, update the
// key and continue.
Some(k) if k.0 == current_key.0 => k,
// Otherwise wait until all actions have completed and return.
_ => {
drop(scheduler_queue); // make sure the queue's mutex is released.
let current_time = current_key.0;
if let SyncStatus::OutOfSync(lag) = self.clock.synchronize(current_time) {
if let Some(tolerance) = &self.clock_tolerance {
if &lag > tolerance {
self.is_terminated = true;
return Err(ExecutionError::OutOfSync(lag));
}
}
}
self.run()?;
return Ok(());
}
};
}
}
/// Iteratively advances simulation time and processes all actions scheduled
/// up to the specified target time.
///

View File

@ -14,18 +14,29 @@ use crate::time::MonotonicTime;
pub trait Clock: Send {
/// Blocks until the deadline.
fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus;
// Returns current (as of time of the call) sim time
fn now(&self) -> MonotonicTime;
}
impl<C: Clock + ?Sized> Clock for &mut C {
fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus {
(**self).synchronize(deadline)
}
fn now(&self) -> MonotonicTime{
(**self).now()
}
}
impl<C: Clock + ?Sized> Clock for Box<C> {
fn synchronize(&mut self, deadline: MonotonicTime) -> SyncStatus {
(**self).synchronize(deadline)
}
fn now(&self) -> MonotonicTime{
(**self).now()
}
}
/// The current synchronization status of a clock.
@ -58,6 +69,10 @@ impl Clock for NoClock {
fn synchronize(&mut self, _: MonotonicTime) -> SyncStatus {
SyncStatus::Synchronized
}
fn now(&self) -> MonotonicTime{
MonotonicTime::new(0, 0).unwrap()
}
}
/// A real-time [`Clock`] based on the system's monotonic clock.
@ -158,6 +173,10 @@ impl Clock for SystemClock {
SyncStatus::OutOfSync(now.duration_since(deadline))
}
fn now(&self) -> MonotonicTime{
self.0.now()
}
}
/// An automatically initialized real-time [`Clock`] based on the system's
@ -194,6 +213,11 @@ impl Clock for AutoSystemClock {
Some(clock) => clock.synchronize(deadline),
}
}
fn now(&self) -> MonotonicTime {
// TODO: how to avoid panic? self is not mut
self.inner.unwrap().now()
}
}
#[cfg(test)]