From 49e713262bac68cacece27743f2e524bb215d62b Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 11 Nov 2024 18:50:29 +0100 Subject: [PATCH] Check clock sync with configurable tolerance --- asynchronix/src/grpc/api/simulation.proto | 1 + asynchronix/src/grpc/codegen/simulation.rs | 3 + asynchronix/src/grpc/services.rs | 3 +- asynchronix/src/simulation.rs | 31 ++++- asynchronix/src/simulation/sim_init.rs | 14 +++ asynchronix/src/time/clock.rs | 3 +- asynchronix/tests/integration/main.rs | 2 + .../integration/simulation_clock_sync.rs | 118 ++++++++++++++++++ .../tests/integration/simulation_timeout.rs | 3 +- 9 files changed, 168 insertions(+), 10 deletions(-) create mode 100644 asynchronix/tests/integration/simulation_clock_sync.rs diff --git a/asynchronix/src/grpc/api/simulation.proto b/asynchronix/src/grpc/api/simulation.proto index 4a0884f..8a9150a 100644 --- a/asynchronix/src/grpc/api/simulation.proto +++ b/asynchronix/src/grpc/api/simulation.proto @@ -17,6 +17,7 @@ enum ErrorCode { SIMULATION_PANIC = 6; SIMULATION_BAD_QUERY = 7; SIMULATION_TIME_OUT_OF_RANGE = 8; + SIMULATION_OUT_OF_SYNC = 9; MISSING_ARGUMENT = 20; INVALID_TIME = 30; INVALID_DURATION = 31; diff --git a/asynchronix/src/grpc/codegen/simulation.rs b/asynchronix/src/grpc/codegen/simulation.rs index 8338f24..ea18da6 100644 --- a/asynchronix/src/grpc/codegen/simulation.rs +++ b/asynchronix/src/grpc/codegen/simulation.rs @@ -345,6 +345,7 @@ pub enum ErrorCode { SimulationPanic = 6, SimulationBadQuery = 7, SimulationTimeOutOfRange = 8, + SimulationOutOfSync = 9, MissingArgument = 20, InvalidTime = 30, InvalidDuration = 31, @@ -370,6 +371,7 @@ impl ErrorCode { ErrorCode::SimulationPanic => "SIMULATION_PANIC", ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY", ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", + ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", ErrorCode::MissingArgument => "MISSING_ARGUMENT", ErrorCode::InvalidTime => "INVALID_TIME", ErrorCode::InvalidDuration => "INVALID_DURATION", @@ -392,6 +394,7 @@ impl ErrorCode { "SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_BAD_QUERY" => Some(Self::SimulationBadQuery), "SIMULATION_TIME_OUT_OF_RANGE" => Some(Self::SimulationTimeOutOfRange), + "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "MISSING_ARGUMENT" => Some(Self::MissingArgument), "INVALID_TIME" => Some(Self::InvalidTime), "INVALID_DURATION" => Some(Self::InvalidDuration), diff --git a/asynchronix/src/grpc/services.rs b/asynchronix/src/grpc/services.rs index c9a7de7..65bfb65 100644 --- a/asynchronix/src/grpc/services.rs +++ b/asynchronix/src/grpc/services.rs @@ -36,9 +36,10 @@ fn map_execution_error(error: ExecutionError) -> Error { ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, ExecutionError::ModelError { .. } => ErrorCode::SimulationModelError, ExecutionError::Panic(_) => ErrorCode::SimulationPanic, + ExecutionError::Timeout => ErrorCode::SimulationTimeout, + ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, ExecutionError::BadQuery => ErrorCode::SimulationBadQuery, ExecutionError::Terminated => ErrorCode::SimulationTerminated, - ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::InvalidTargetTime(_) => ErrorCode::InvalidTime, }; diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index d5f466d..6478fbe 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -148,7 +148,7 @@ use crate::channel::ChannelObserver; use crate::executor::{Executor, ExecutorError, Signal}; use crate::model::{BuildContext, Context, Model, ProtoModel}; use crate::ports::{InputFn, ReplierFn}; -use crate::time::{AtomicTime, Clock, MonotonicTime}; +use crate::time::{AtomicTime, Clock, MonotonicTime, SyncStatus}; use crate::util::seq_futures::SeqFuture; use crate::util::slot; @@ -195,6 +195,7 @@ pub struct Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + clock_tolerance: Option, timeout: Duration, observers: Vec<(String, Box)>, is_terminated: bool, @@ -207,6 +208,7 @@ impl Simulation { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + clock_tolerance: Option, timeout: Duration, observers: Vec<(String, Box)>, ) -> Self { @@ -215,6 +217,7 @@ impl Simulation { scheduler_queue, time, clock, + clock_tolerance, timeout, observers, is_terminated: false, @@ -483,9 +486,15 @@ impl Simulation { // Otherwise wait until all actions have completed and return. _ => { drop(scheduler_queue); // make sure the queue's mutex is released. + let current_time = current_key.0; - // TODO: check synchronization status? - self.clock.synchronize(current_time); + if let SyncStatus::OutOfSync(lag) = self.clock.synchronize(current_time) { + if let Some(tolerance) = &self.clock_tolerance { + if &lag > tolerance { + return Err(ExecutionError::OutOfSync(lag)); + } + } + } self.run()?; return Ok(Some(current_time)); @@ -560,6 +569,11 @@ pub enum ExecutionError { /// A panic was caught during execution with the message contained in the /// payload. Panic(String), + /// The simulation step has failed to complete within the allocated time. + Timeout, + /// The simulation has lost synchronization with the clock and lags behind + /// by the duration given in the payload. + OutOfSync(Duration), /// The specified target simulation time is in the past of the current /// simulation time. InvalidTargetTime(MonotonicTime), @@ -568,8 +582,6 @@ pub enum ExecutionError { /// The simulation has been terminated due to an earlier deadlock, model /// error, model panic or timeout. Terminated, - /// The simulation step has failed to complete within the allocated time. - Timeout, } impl fmt::Display for ExecutionError { @@ -608,6 +620,14 @@ impl fmt::Display for ExecutionError { f.write_str("a panic has been caught during simulation:\n")?; f.write_str(msg) } + Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"), + Self::OutOfSync(lag) => { + write!( + f, + "the simulation has lost synchronization and lags behind the clock by '{:?}'", + lag + ) + } Self::InvalidTargetTime(time) => { write!( f, @@ -617,7 +637,6 @@ impl fmt::Display for ExecutionError { } Self::BadQuery => f.write_str("the query did not return any response; maybe the target model was not added to the simulation?"), Self::Terminated => f.write_str("the simulation has been terminated"), - Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"), } } } diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index 7ac88c6..b5aa31c 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -18,6 +18,7 @@ pub struct SimInit { scheduler_queue: Arc>, time: AtomicTime, clock: Box, + clock_tolerance: Option, timeout: Duration, observers: Vec<(String, Box)>, abort_signal: Signal, @@ -60,6 +61,7 @@ impl SimInit { scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())), time, clock: Box::new(NoClock::new()), + clock_tolerance: None, timeout: Duration::ZERO, observers: Vec::new(), abort_signal, @@ -104,6 +106,17 @@ impl SimInit { self } + /// Specifies a tolerance for clock synchronization. + /// + /// When a clock synchronization tolerance is set, then any report of + /// synchronization loss by `Clock::synchronize` that exceeds the specified + /// tolerance will trigger an `ExecutionError::OutOfSync` error. + pub fn set_clock_tolerance(mut self, tolerance: Duration) -> Self { + self.clock_tolerance = Some(tolerance); + + self + } + /// Sets a timeout for the call to [`SimInit::init`] and for any subsequent /// simulation step. /// @@ -133,6 +146,7 @@ impl SimInit { self.scheduler_queue, self.time, self.clock, + self.clock_tolerance, self.timeout, self.observers, ); diff --git a/asynchronix/src/time/clock.rs b/asynchronix/src/time/clock.rs index c9027e1..64d299e 100644 --- a/asynchronix/src/time/clock.rs +++ b/asynchronix/src/time/clock.rs @@ -21,7 +21,8 @@ pub trait Clock: Send { pub enum SyncStatus { /// The clock is synchronized. Synchronized, - /// The clock is lagging behind by the specified offset. + /// The deadline has already elapsed and lags behind the current clock time + /// by the duration given in the payload. OutOfSync(Duration), } diff --git a/asynchronix/tests/integration/main.rs b/asynchronix/tests/integration/main.rs index cd29ae2..7a93ac1 100644 --- a/asynchronix/tests/integration/main.rs +++ b/asynchronix/tests/integration/main.rs @@ -2,6 +2,8 @@ // https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html mod model_scheduling; +#[cfg(not(miri))] +mod simulation_clock_sync; mod simulation_deadlock; mod simulation_scheduling; #[cfg(not(miri))] diff --git a/asynchronix/tests/integration/simulation_clock_sync.rs b/asynchronix/tests/integration/simulation_clock_sync.rs new file mode 100644 index 0000000..0bbd7c7 --- /dev/null +++ b/asynchronix/tests/integration/simulation_clock_sync.rs @@ -0,0 +1,118 @@ +//! Loss of synchronization during simulation step execution. + +use std::thread; +use std::time::Duration; + +use asynchronix::model::Model; +use asynchronix::simulation::{ExecutionError, Mailbox, SimInit}; +use asynchronix::time::{AutoSystemClock, MonotonicTime}; + +const MT_NUM_THREADS: usize = 4; + +#[derive(Default)] +struct TestModel {} +impl TestModel { + fn block_for(&mut self, duration: Duration) { + thread::sleep(duration); + } +} +impl Model for TestModel {} + +// Schedule `TestModel::block_for` at the required ticks, blocking each time for +// the specified period, and then run the simulation with the specified +// synchronization tolerance. +// +// Returns the last simulation tick at completion or when the error occurred, and +// the result of `Simulation::step_by`. +fn clock_sync( + num_threads: usize, + block_time_ms: u64, + clock_tolerance_ms: u64, + ticks_ms: &[u64], +) -> (Duration, Result<(), ExecutionError>) { + let block_time = Duration::from_millis(block_time_ms); + let clock_tolerance = Duration::from_millis(clock_tolerance_ms); + + let model = TestModel::default(); + let clock = AutoSystemClock::new(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, "test") + .set_clock(clock) + .set_clock_tolerance(clock_tolerance) + .init(t0) + .unwrap(); + + let scheduler = simu.scheduler(); + let mut delta = Duration::ZERO; + for tick_ms in ticks_ms { + let tick = Duration::from_millis(*tick_ms); + if tick > delta { + delta = tick; + } + scheduler + .schedule_event(tick, TestModel::block_for, block_time, &addr) + .unwrap(); + } + + let res = simu.step_by(delta); + let last_tick = simu.time().duration_since(t0); + + (last_tick, res) +} + +fn clock_sync_zero_tolerance(num_threads: usize) { + // The fourth tick should fail for being ~50ms too late. + const BLOCKING_MS: u64 = 100; + const CLOCK_TOLERANCE_MS: u64 = 0; + const TICKS_MS: &[u64] = &[100, 250, 400, 450, 650]; + + let (last_tick, res) = clock_sync(num_threads, BLOCKING_MS, CLOCK_TOLERANCE_MS, TICKS_MS); + + if let Err(ExecutionError::OutOfSync(_lag)) = res { + assert_eq!(last_tick, Duration::from_millis(TICKS_MS[3])); + } else { + panic!("loss of synchronization not observed"); + } +} + +fn clock_sync_with_tolerance(num_threads: usize) { + // The third tick is ~50ms too late but should pass thanks to the tolerance. + // The fifth tick should fail for being ~150ms too late, which is beyond the + // 100ms tolerance. + const BLOCKING_MS: u64 = 200; + const CLOCK_TOLERANCE_MS: u64 = 100; + const TICKS_MS: &[u64] = &[100, 350, 500, 800, 850, 1250]; + + let (last_tick, res) = clock_sync(num_threads, BLOCKING_MS, CLOCK_TOLERANCE_MS, TICKS_MS); + + if let Err(ExecutionError::OutOfSync(lag)) = res { + assert_eq!(last_tick, Duration::from_millis(TICKS_MS[4])); + assert!(lag > Duration::from_millis(CLOCK_TOLERANCE_MS)); + } else { + panic!("loss of synchronization not observed"); + } +} + +#[test] +fn clock_sync_zero_tolerance_st() { + clock_sync_zero_tolerance(1); +} + +#[test] +fn clock_sync_zero_tolerance_mt() { + clock_sync_zero_tolerance(MT_NUM_THREADS); +} + +#[test] +fn clock_sync_with_tolerance_st() { + clock_sync_with_tolerance(1); +} + +#[test] +fn clock_sync_with_tolerance_mt() { + clock_sync_with_tolerance(MT_NUM_THREADS); +} diff --git a/asynchronix/tests/integration/simulation_timeout.rs b/asynchronix/tests/integration/simulation_timeout.rs index 2c3d63a..9558681 100644 --- a/asynchronix/tests/integration/simulation_timeout.rs +++ b/asynchronix/tests/integration/simulation_timeout.rs @@ -1,4 +1,4 @@ -//! Timeout during step execution. +//! Timeout during simulation step execution. use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -12,7 +12,6 @@ use asynchronix::time::MonotonicTime; const MT_NUM_THREADS: usize = 4; -#[derive(Default)] struct TestModel { output: Output<()>, // A liveliness flag that is cleared when the model is dropped.