forked from ROMEO/nexosim
Add tracing support for simulation timestamps
This commit is contained in:
parent
e376f17c7c
commit
7487a264ab
@ -24,7 +24,7 @@ autotests = false
|
|||||||
[features]
|
[features]
|
||||||
# gRPC service.
|
# gRPC service.
|
||||||
grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"]
|
grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"]
|
||||||
tracing = ["dep:tracing"]
|
tracing = ["dep:tracing", "dep:tracing-subscriber"]
|
||||||
|
|
||||||
# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking.
|
# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking.
|
||||||
dev-hooks = []
|
dev-hooks = []
|
||||||
@ -55,14 +55,14 @@ serde = { version = "1", optional = true }
|
|||||||
tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true }
|
tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true }
|
||||||
tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true }
|
tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true }
|
||||||
tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true }
|
tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true }
|
||||||
|
tracing-subscriber = { version= "0.3.18", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
atomic-wait = "1.1"
|
atomic-wait = "1.1"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
futures-executor = "0.3"
|
futures-executor = "0.3"
|
||||||
mio = { version = "1.0", features = ["os-poll", "net"] }
|
mio = { version = "1.0", features = ["os-poll", "net"] }
|
||||||
tracing = "0.1"
|
tracing-subscriber = { version= "0.3.18", features=["env-filter"] }
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
|
||||||
|
|
||||||
[target.'cfg(asynchronix_loom)'.dev-dependencies]
|
[target.'cfg(asynchronix_loom)'.dev-dependencies]
|
||||||
loom = "0.5"
|
loom = "0.5"
|
||||||
|
@ -15,7 +15,17 @@ impl Executor {
|
|||||||
///
|
///
|
||||||
/// The maximum number of threads is set with the `pool_size` parameter.
|
/// The maximum number of threads is set with the `pool_size` parameter.
|
||||||
pub fn new(pool_size: usize) -> Self {
|
pub fn new(pool_size: usize) -> Self {
|
||||||
Self(executor::Executor::new_multi_threaded(pool_size))
|
let dummy_context = crate::executor::SimulationContext {
|
||||||
|
#[cfg(feature = "tracing")]
|
||||||
|
time_reader: crate::util::sync_cell::SyncCell::new(
|
||||||
|
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
|
||||||
|
)
|
||||||
|
.reader(),
|
||||||
|
};
|
||||||
|
Self(executor::Executor::new_multi_threaded(
|
||||||
|
pool_size,
|
||||||
|
dummy_context,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a task which output will never be retrieved.
|
/// Spawns a task which output will never be retrieved.
|
||||||
|
@ -7,11 +7,23 @@ mod task;
|
|||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
|
|
||||||
|
use crate::macros::scoped_thread_local::scoped_thread_local;
|
||||||
|
#[cfg(feature = "tracing")]
|
||||||
|
use crate::time::AtomicTimeReader;
|
||||||
use task::Promise;
|
use task::Promise;
|
||||||
|
|
||||||
/// Unique identifier for executor instances.
|
/// Unique identifier for executor instances.
|
||||||
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
|
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
/// Context common to all executor types.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct SimulationContext {
|
||||||
|
#[cfg(feature = "tracing")]
|
||||||
|
pub(crate) time_reader: AtomicTimeReader,
|
||||||
|
}
|
||||||
|
|
||||||
|
scoped_thread_local!(pub(crate) static SIMULATION_CONTEXT: SimulationContext);
|
||||||
|
|
||||||
/// A single-threaded or multi-threaded `async` executor.
|
/// A single-threaded or multi-threaded `async` executor.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum Executor {
|
pub(crate) enum Executor {
|
||||||
@ -21,8 +33,8 @@ pub(crate) enum Executor {
|
|||||||
|
|
||||||
impl Executor {
|
impl Executor {
|
||||||
/// Creates an executor that runs futures on the current thread.
|
/// Creates an executor that runs futures on the current thread.
|
||||||
pub(crate) fn new_single_threaded() -> Self {
|
pub(crate) fn new_single_threaded(simulation_context: SimulationContext) -> Self {
|
||||||
Self::StExecutor(st_executor::Executor::new())
|
Self::StExecutor(st_executor::Executor::new(simulation_context))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates an executor that runs futures on a thread pool.
|
/// Creates an executor that runs futures on a thread pool.
|
||||||
@ -33,8 +45,11 @@ impl Executor {
|
|||||||
///
|
///
|
||||||
/// This will panic if the specified number of threads is zero or is more
|
/// This will panic if the specified number of threads is zero or is more
|
||||||
/// than `usize::BITS`.
|
/// than `usize::BITS`.
|
||||||
pub(crate) fn new_multi_threaded(num_threads: usize) -> Self {
|
pub(crate) fn new_multi_threaded(
|
||||||
Self::MtExecutor(mt_executor::Executor::new(num_threads))
|
num_threads: usize,
|
||||||
|
simulation_context: SimulationContext,
|
||||||
|
) -> Self {
|
||||||
|
Self::MtExecutor(mt_executor::Executor::new(num_threads, simulation_context))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a task which output will never be retrieved.
|
/// Spawns a task which output will never be retrieved.
|
||||||
@ -88,6 +103,16 @@ mod tests {
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
fn dummy_simulation_context() -> SimulationContext {
|
||||||
|
SimulationContext {
|
||||||
|
#[cfg(feature = "tracing")]
|
||||||
|
time_reader: crate::util::sync_cell::SyncCell::new(
|
||||||
|
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
|
||||||
|
)
|
||||||
|
.reader(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// An object that runs an arbitrary closure when dropped.
|
/// An object that runs an arbitrary closure when dropped.
|
||||||
struct RunOnDrop<F: FnOnce()> {
|
struct RunOnDrop<F: FnOnce()> {
|
||||||
drop_fn: Option<F>,
|
drop_fn: Option<F>,
|
||||||
@ -208,25 +233,25 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn executor_deadlock_st() {
|
fn executor_deadlock_st() {
|
||||||
executor_deadlock(Executor::new_single_threaded());
|
executor_deadlock(Executor::new_single_threaded(dummy_simulation_context()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn executor_deadlock_mt() {
|
fn executor_deadlock_mt() {
|
||||||
executor_deadlock(Executor::new_multi_threaded(3));
|
executor_deadlock(Executor::new_multi_threaded(3, dummy_simulation_context()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn executor_deadlock_mt_one_worker() {
|
fn executor_deadlock_mt_one_worker() {
|
||||||
executor_deadlock(Executor::new_multi_threaded(1));
|
executor_deadlock(Executor::new_multi_threaded(1, dummy_simulation_context()));
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn executor_drop_cycle_st() {
|
fn executor_drop_cycle_st() {
|
||||||
executor_drop_cycle(Executor::new_single_threaded());
|
executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn executor_drop_cycle_mt() {
|
fn executor_drop_cycle_mt() {
|
||||||
executor_drop_cycle(Executor::new_multi_threaded(3));
|
executor_drop_cycle(Executor::new_multi_threaded(3, dummy_simulation_context()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,11 +56,10 @@ use std::time::{Duration, Instant};
|
|||||||
use crossbeam_utils::sync::{Parker, Unparker};
|
use crossbeam_utils::sync::{Parker, Unparker};
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
|
use super::task::{self, CancelToken, Promise, Runnable};
|
||||||
|
use super::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT};
|
||||||
use crate::macros::scoped_thread_local::scoped_thread_local;
|
use crate::macros::scoped_thread_local::scoped_thread_local;
|
||||||
use crate::util::rng::Rng;
|
use crate::util::rng::Rng;
|
||||||
|
|
||||||
use super::task::{self, CancelToken, Promise, Runnable};
|
|
||||||
use super::NEXT_EXECUTOR_ID;
|
|
||||||
use pool_manager::PoolManager;
|
use pool_manager::PoolManager;
|
||||||
|
|
||||||
const BUCKET_SIZE: usize = 128;
|
const BUCKET_SIZE: usize = 128;
|
||||||
@ -95,7 +94,7 @@ impl Executor {
|
|||||||
///
|
///
|
||||||
/// This will panic if the specified number of threads is zero or is more
|
/// This will panic if the specified number of threads is zero or is more
|
||||||
/// than `usize::BITS`.
|
/// than `usize::BITS`.
|
||||||
pub(crate) fn new(num_threads: usize) -> Self {
|
pub(crate) fn new(num_threads: usize, simulation_context: SimulationContext) -> Self {
|
||||||
let parker = Parker::new();
|
let parker = Parker::new();
|
||||||
let unparker = parker.unparker().clone();
|
let unparker = parker.unparker().clone();
|
||||||
|
|
||||||
@ -141,11 +140,15 @@ impl Executor {
|
|||||||
.spawn({
|
.spawn({
|
||||||
let context = context.clone();
|
let context = context.clone();
|
||||||
let active_tasks = active_tasks.clone();
|
let active_tasks = active_tasks.clone();
|
||||||
|
let simulation_context = simulation_context.clone();
|
||||||
move || {
|
move || {
|
||||||
let worker = Worker::new(local_queue, context);
|
let worker = Worker::new(local_queue, context);
|
||||||
ACTIVE_TASKS.set(&active_tasks, || {
|
SIMULATION_CONTEXT.set(&simulation_context, || {
|
||||||
LOCAL_WORKER
|
ACTIVE_TASKS.set(&active_tasks, || {
|
||||||
.set(&worker, || run_local_worker(&worker, id, worker_parker))
|
LOCAL_WORKER.set(&worker, || {
|
||||||
|
run_local_worker(&worker, id, worker_parker)
|
||||||
|
})
|
||||||
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -8,6 +8,7 @@ use slab::Slab;
|
|||||||
use super::task::{self, CancelToken, Promise, Runnable};
|
use super::task::{self, CancelToken, Promise, Runnable};
|
||||||
use super::NEXT_EXECUTOR_ID;
|
use super::NEXT_EXECUTOR_ID;
|
||||||
|
|
||||||
|
use crate::executor::{SimulationContext, SIMULATION_CONTEXT};
|
||||||
use crate::macros::scoped_thread_local::scoped_thread_local;
|
use crate::macros::scoped_thread_local::scoped_thread_local;
|
||||||
|
|
||||||
const QUEUE_MIN_CAPACITY: usize = 32;
|
const QUEUE_MIN_CAPACITY: usize = 32;
|
||||||
@ -21,11 +22,13 @@ pub(crate) struct Executor {
|
|||||||
context: ExecutorContext,
|
context: ExecutorContext,
|
||||||
/// List of tasks that have not completed yet.
|
/// List of tasks that have not completed yet.
|
||||||
active_tasks: RefCell<Slab<CancelToken>>,
|
active_tasks: RefCell<Slab<CancelToken>>,
|
||||||
|
/// Read-only handle to the simulation time.
|
||||||
|
simulation_context: SimulationContext,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Executor {
|
impl Executor {
|
||||||
/// Creates an executor that runs futures on the current thread.
|
/// Creates an executor that runs futures on the current thread.
|
||||||
pub(crate) fn new() -> Self {
|
pub(crate) fn new(simulation_context: SimulationContext) -> Self {
|
||||||
// Each executor instance has a unique ID inherited by tasks to ensure
|
// Each executor instance has a unique ID inherited by tasks to ensure
|
||||||
// that tasks are scheduled on their parent executor.
|
// that tasks are scheduled on their parent executor.
|
||||||
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
|
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
|
||||||
@ -40,6 +43,7 @@ impl Executor {
|
|||||||
Self {
|
Self {
|
||||||
context,
|
context,
|
||||||
active_tasks,
|
active_tasks,
|
||||||
|
simulation_context,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,14 +106,16 @@ impl Executor {
|
|||||||
/// Execute spawned tasks, blocking until all futures have completed or
|
/// Execute spawned tasks, blocking until all futures have completed or
|
||||||
/// until the executor reaches a deadlock.
|
/// until the executor reaches a deadlock.
|
||||||
pub(crate) fn run(&mut self) {
|
pub(crate) fn run(&mut self) {
|
||||||
ACTIVE_TASKS.set(&self.active_tasks, || {
|
SIMULATION_CONTEXT.set(&self.simulation_context, || {
|
||||||
EXECUTOR_CONTEXT.set(&self.context, || loop {
|
ACTIVE_TASKS.set(&self.active_tasks, || {
|
||||||
let task = match self.context.queue.borrow_mut().pop() {
|
EXECUTOR_CONTEXT.set(&self.context, || loop {
|
||||||
Some(task) => task,
|
let task = match self.context.queue.borrow_mut().pop() {
|
||||||
None => break,
|
Some(task) => task,
|
||||||
};
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
task.run();
|
task.run();
|
||||||
|
})
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -225,9 +231,9 @@ impl<T: Future> Drop for CancellableFuture<T> {
|
|||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// This function will panic if called from called outside from the executor
|
/// This function will panic if called from outside the executor worker thread
|
||||||
/// work thread or from another executor instance than the one the task for this
|
/// or from another executor instance than the one the task for this `Runnable`
|
||||||
/// `Runnable` was spawned on.
|
/// was spawned on.
|
||||||
fn schedule_task(task: Runnable, executor_id: usize) {
|
fn schedule_task(task: Runnable, executor_id: usize) {
|
||||||
EXECUTOR_CONTEXT
|
EXECUTOR_CONTEXT
|
||||||
.map(|context| {
|
.map(|context| {
|
||||||
|
@ -392,46 +392,7 @@
|
|||||||
//! asynchronix = { version = "0.3", features = ["tracing"] }
|
//! asynchronix = { version = "0.3", features = ["tracing"] }
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! Each tracing event or span emitted by a model is then wrapped in a span
|
//! See the [`tracing`] module for more information.
|
||||||
//! named `model` with a target `asynchronix` and an attribute `name`. The value
|
|
||||||
//! of the attribute is the name provided to
|
|
||||||
//! [`SimInit::add_model`](simulation::SimInit::add_model).
|
|
||||||
//!
|
|
||||||
//! Note that model spans are always emitted at
|
|
||||||
//! [`Level::INFO`](tracing::Level::INFO) .
|
|
||||||
//!
|
|
||||||
//! ### Tracing examples
|
|
||||||
//!
|
|
||||||
//! The examples below assume that the `tracing` feature flag is activated, the
|
|
||||||
//! `tracing_subscriber` crate is used with the `env-filter` feature flag
|
|
||||||
//! activated and the default subscriber is set up, e.g. with:
|
|
||||||
//!
|
|
||||||
//! ```ignore
|
|
||||||
//! tracing_subscriber::fmt::init();
|
|
||||||
//! ```
|
|
||||||
//!
|
|
||||||
//! In order to let only warnings and errors pass through but still see model
|
|
||||||
//! span information (which is emitted as info), you may run the bench with:
|
|
||||||
//!
|
|
||||||
//! ```{.bash}
|
|
||||||
//! $ RUST_LOG="warn,[model]=info" cargo run --release my_bench
|
|
||||||
//! 2024-09-09T21:05:47.891984Z WARN model{name="kettle"}: my_bench: water is boiling
|
|
||||||
//! 2024-09-09T21:08:13.284753Z WARN model{name="timer"}: my_bench: ring ring
|
|
||||||
//! 2024-09-09T21:08:13.284753Z WARN model{name="kettle"}: my_bench: water is hot
|
|
||||||
//! ```
|
|
||||||
//!
|
|
||||||
//! In order to see warnings or errors for the `kettle` model only, you may
|
|
||||||
//! instead run the bench with:
|
|
||||||
//!
|
|
||||||
//! ```{.bash}
|
|
||||||
//! $ RUST_LOG="[model{name=kettle}]=warn" cargo run --release my_bench
|
|
||||||
//! 2024-09-09T21:05:47.891984Z WARN model{name="kettle"}: my_bench: water is boiling
|
|
||||||
//! 2024-09-09T21:08:13.284753Z WARN model{name="kettle"}: my_bench: water is hot
|
|
||||||
//! ```
|
|
||||||
//!
|
|
||||||
//! If the `model` span name collides with that of spans defined outside
|
|
||||||
//! `asynchronix`, the above filters can be made more specific using
|
|
||||||
//! `asynchronix[model]` instead of just `[model]`.
|
|
||||||
//!
|
//!
|
||||||
//!
|
//!
|
||||||
//! # Other resources
|
//! # Other resources
|
||||||
@ -466,17 +427,21 @@
|
|||||||
|
|
||||||
pub(crate) mod channel;
|
pub(crate) mod channel;
|
||||||
pub(crate) mod executor;
|
pub(crate) mod executor;
|
||||||
#[cfg(feature = "grpc")]
|
|
||||||
pub mod grpc;
|
|
||||||
mod loom_exports;
|
mod loom_exports;
|
||||||
pub(crate) mod macros;
|
pub(crate) mod macros;
|
||||||
pub mod model;
|
pub mod model;
|
||||||
pub mod ports;
|
pub mod ports;
|
||||||
#[cfg(feature = "grpc")]
|
|
||||||
pub mod registry;
|
|
||||||
pub mod simulation;
|
pub mod simulation;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
pub(crate) mod util;
|
pub(crate) mod util;
|
||||||
|
|
||||||
|
#[cfg(feature = "grpc")]
|
||||||
|
pub mod grpc;
|
||||||
|
#[cfg(feature = "grpc")]
|
||||||
|
pub mod registry;
|
||||||
|
|
||||||
|
#[cfg(feature = "tracing")]
|
||||||
|
pub mod tracing;
|
||||||
|
|
||||||
#[cfg(feature = "dev-hooks")]
|
#[cfg(feature = "dev-hooks")]
|
||||||
pub mod dev_hooks;
|
pub mod dev_hooks;
|
||||||
|
@ -146,10 +146,9 @@ use recycle_box::{coerce_box, RecycleBox};
|
|||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::model::{Context, Model, SetupContext};
|
use crate::model::{Context, Model, SetupContext};
|
||||||
use crate::ports::{InputFn, ReplierFn};
|
use crate::ports::{InputFn, ReplierFn};
|
||||||
use crate::time::{Clock, MonotonicTime, TearableAtomicTime};
|
use crate::time::{AtomicTime, Clock, MonotonicTime};
|
||||||
use crate::util::seq_futures::SeqFuture;
|
use crate::util::seq_futures::SeqFuture;
|
||||||
use crate::util::slot;
|
use crate::util::slot;
|
||||||
use crate::util::sync_cell::SyncCell;
|
|
||||||
|
|
||||||
/// Simulation environment.
|
/// Simulation environment.
|
||||||
///
|
///
|
||||||
@ -192,7 +191,7 @@ use crate::util::sync_cell::SyncCell;
|
|||||||
pub struct Simulation {
|
pub struct Simulation {
|
||||||
executor: Executor,
|
executor: Executor,
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||||
time: SyncCell<TearableAtomicTime>,
|
time: AtomicTime,
|
||||||
clock: Box<dyn Clock>,
|
clock: Box<dyn Clock>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,7 +200,7 @@ impl Simulation {
|
|||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
executor: Executor,
|
executor: Executor,
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||||
time: SyncCell<TearableAtomicTime>,
|
time: AtomicTime,
|
||||||
clock: Box<dyn Clock + 'static>,
|
clock: Box<dyn Clock + 'static>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -18,22 +18,18 @@ use crate::executor::Executor;
|
|||||||
use crate::model::Model;
|
use crate::model::Model;
|
||||||
use crate::ports::InputFn;
|
use crate::ports::InputFn;
|
||||||
use crate::simulation::Address;
|
use crate::simulation::Address;
|
||||||
use crate::time::{MonotonicTime, TearableAtomicTime};
|
use crate::time::{AtomicTimeReader, MonotonicTime};
|
||||||
use crate::util::priority_queue::PriorityQueue;
|
use crate::util::priority_queue::PriorityQueue;
|
||||||
use crate::util::sync_cell::SyncCellReader;
|
|
||||||
|
|
||||||
/// Scheduler.
|
/// Scheduler.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Scheduler {
|
pub struct Scheduler {
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||||
time: SyncCellReader<TearableAtomicTime>,
|
time: AtomicTimeReader,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Scheduler {
|
impl Scheduler {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
|
||||||
time: SyncCellReader<TearableAtomicTime>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
scheduler_queue,
|
scheduler_queue,
|
||||||
time,
|
time,
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::executor::Executor;
|
use crate::executor::{Executor, SimulationContext};
|
||||||
use crate::model::Model;
|
use crate::model::Model;
|
||||||
|
use crate::time::{AtomicTime, MonotonicTime, TearableAtomicTime};
|
||||||
use crate::time::{Clock, NoClock};
|
use crate::time::{Clock, NoClock};
|
||||||
use crate::time::{MonotonicTime, TearableAtomicTime};
|
|
||||||
use crate::util::priority_queue::PriorityQueue;
|
use crate::util::priority_queue::PriorityQueue;
|
||||||
use crate::util::sync_cell::SyncCell;
|
use crate::util::sync_cell::SyncCell;
|
||||||
|
|
||||||
@ -14,10 +14,8 @@ use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation};
|
|||||||
pub struct SimInit {
|
pub struct SimInit {
|
||||||
executor: Executor,
|
executor: Executor,
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||||
time: SyncCell<TearableAtomicTime>,
|
time: AtomicTime,
|
||||||
clock: Box<dyn Clock + 'static>,
|
clock: Box<dyn Clock + 'static>,
|
||||||
#[cfg(feature = "tracing")]
|
|
||||||
log_level: tracing::Level,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SimInit {
|
impl SimInit {
|
||||||
@ -34,20 +32,23 @@ impl SimInit {
|
|||||||
/// be between 1 and `usize::BITS` (inclusive).
|
/// be between 1 and `usize::BITS` (inclusive).
|
||||||
pub fn with_num_threads(num_threads: usize) -> Self {
|
pub fn with_num_threads(num_threads: usize) -> Self {
|
||||||
let num_threads = num_threads.clamp(1, usize::BITS as usize);
|
let num_threads = num_threads.clamp(1, usize::BITS as usize);
|
||||||
|
let time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH));
|
||||||
|
let simulation_context = SimulationContext {
|
||||||
|
#[cfg(feature = "tracing")]
|
||||||
|
time_reader: time.reader(),
|
||||||
|
};
|
||||||
|
|
||||||
let executor = if num_threads == 1 {
|
let executor = if num_threads == 1 {
|
||||||
Executor::new_single_threaded()
|
Executor::new_single_threaded(simulation_context)
|
||||||
} else {
|
} else {
|
||||||
Executor::new_multi_threaded(num_threads)
|
Executor::new_multi_threaded(num_threads, simulation_context)
|
||||||
};
|
};
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
executor,
|
executor,
|
||||||
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
|
scheduler_queue: Arc::new(Mutex::new(PriorityQueue::new())),
|
||||||
time: SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)),
|
time,
|
||||||
clock: Box::new(NoClock::new()),
|
clock: Box::new(NoClock::new()),
|
||||||
#[cfg(feature = "tracing")]
|
|
||||||
log_level: tracing::Level::INFO,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,16 +79,6 @@ impl SimInit {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the level of verbosity for model spans.
|
|
||||||
///
|
|
||||||
/// By default, model spans use [`Level::INFO`](tracing::Level::INFO).
|
|
||||||
#[cfg(feature = "tracing")]
|
|
||||||
pub fn with_log_level(mut self, level: tracing::Level) -> Self {
|
|
||||||
self.log_level = level;
|
|
||||||
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Builds a simulation initialized at the specified simulation time,
|
/// Builds a simulation initialized at the specified simulation time,
|
||||||
/// executing the [`Model::init()`](crate::model::Model::init) method on all
|
/// executing the [`Model::init()`](crate::model::Model::init) method on all
|
||||||
/// model initializers.
|
/// model initializers.
|
||||||
|
@ -52,3 +52,6 @@ pub use tai_time::MonotonicTime;
|
|||||||
|
|
||||||
pub use clock::{AutoSystemClock, Clock, NoClock, SyncStatus, SystemClock};
|
pub use clock::{AutoSystemClock, Clock, NoClock, SyncStatus, SystemClock};
|
||||||
pub(crate) use monotonic_time::TearableAtomicTime;
|
pub(crate) use monotonic_time::TearableAtomicTime;
|
||||||
|
|
||||||
|
pub(crate) type AtomicTime = crate::util::sync_cell::SyncCell<TearableAtomicTime>;
|
||||||
|
pub(crate) type AtomicTimeReader = crate::util::sync_cell::SyncCellReader<TearableAtomicTime>;
|
||||||
|
166
asynchronix/src/tracing.rs
Normal file
166
asynchronix/src/tracing.rs
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
//! Support for structured logging.
|
||||||
|
//!
|
||||||
|
//! # Overview
|
||||||
|
//!
|
||||||
|
//! When the `tracing` feature is activated, each tracing event or span emitted
|
||||||
|
//! by a model is wrapped in a [`tracing::Span`] with the following metadata:
|
||||||
|
//!
|
||||||
|
//! - name: `model`,
|
||||||
|
//! - target: `asynchronix`,
|
||||||
|
//! - verbosity level: [`Level::INFO`](tracing::Level::INFO),
|
||||||
|
//! - a unique field called `name`, associated to the model name provided in
|
||||||
|
//! [`SimInit::add_model`](crate::simulation::SimInit::add_model).
|
||||||
|
//!
|
||||||
|
//! The emission of `model` spans can be readily used for [event
|
||||||
|
//! filtering](#event-filtering-examples), using for instance the
|
||||||
|
//! [`tracing_subscriber::fmt`][mod@tracing_subscriber::fmt] subscriber. By
|
||||||
|
//! default, however, this subscriber will timestamp events with the wall clock
|
||||||
|
//! time. Because it is often desirable to log events using the simulation time
|
||||||
|
//! instead of (or on top of) the wall clock time, this module provides a custom
|
||||||
|
//! [`SimulationTime`] timer compatible with
|
||||||
|
//! [`tracing_subscriber::fmt`][mod@tracing_subscriber::fmt].
|
||||||
|
//!
|
||||||
|
//!
|
||||||
|
//! # Configuration
|
||||||
|
//!
|
||||||
|
//! Using the `tracing-subscriber` crate, simulation events can be logged to
|
||||||
|
//! standard output by placing the following call anywhere before
|
||||||
|
//! [`SimInit::init`](crate::simulation::SimInit::init):
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
|
//! tracing_subscriber::fmt::init();
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! However, this will stamp events with the system time rather than the
|
||||||
|
//! simulation time. To use simulation time instead, a dedicated timer can be
|
||||||
|
//! configured:
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
|
//! use asynchronix::tracing::SimulationTime;
|
||||||
|
//!
|
||||||
|
//! tracing_subscriber::fmt()
|
||||||
|
//! .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||||
|
//! .with_timer(SimulationTime::with_system_timer())
|
||||||
|
//! .init();
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! Note that this timer will automatically revert to system time stamping for
|
||||||
|
//! tracing events generated outside of simulation models, e.g.:
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! [2001-02-03 04:05:06.789012345] WARN model{name="my_model"}: my_simulation: something happened inside the simulation
|
||||||
|
//! 2024-09-10T14:39:24.670921Z INFO my_simulation: something happened outside the simulation
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! Alternatively, `SimulationTime::with_system_timer_always()` can be used to
|
||||||
|
//! always prepend the system time even for simulation events:
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! 2024-09-10T14:39:22.124945Z [2001-02-03 04:05:06.789012345] WARN model{name="my_model"}: my_simulation: something happened inside the simulation
|
||||||
|
//! 2024-09-10T14:39:24.670921Z INFO my_simulation: something happened outside the simulation
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//!
|
||||||
|
//! # Event filtering examples
|
||||||
|
//!
|
||||||
|
//! Note that event filtering based on the `RUST_LOG` environment variable
|
||||||
|
//! requires the `env-filter` feature of the
|
||||||
|
//! [`tracing-subscriber`][tracing_subscriber] crate.
|
||||||
|
//!
|
||||||
|
//! The following `RUST_LOG` directive could be used to only let warnings and
|
||||||
|
//! errors pass through but still see model span information (which is emitted
|
||||||
|
//! as info):
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! $ RUST_LOG="warn,[model]=info" cargo run --release my_simulation
|
||||||
|
//! [2001-01-01 00:00:06.000000000] WARN model{name="kettle"}: my_simulation: water is boiling
|
||||||
|
//! [2001-01-01 00:01:36.000000000] WARN model{name="timer"}: my_simulation: ring ring
|
||||||
|
//! [2001-01-01 00:01:36.000000000] WARN model{name="kettle"}: my_simulation: water is ready
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! In order to see warnings or errors for the `kettle` model only, this
|
||||||
|
//! directive could be modified as follows:
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! $ RUST_LOG="[model{name=kettle}]=warn" cargo run --release my_simulation
|
||||||
|
//! [2001-01-01 00:00:06.000000000] WARN model{name="kettle"}: my_simulation: water is boiling
|
||||||
|
//! [2001-01-01 00:01:36.000000000] WARN model{name="kettle"}: my_simulation: water is ready
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! If the `model` span name collides with that of spans defined outside
|
||||||
|
//! `asynchronix`, the above filters can be made more specific using
|
||||||
|
//! `asynchronix[model]` instead of just `[model]`.
|
||||||
|
//!
|
||||||
|
//!
|
||||||
|
//! # Customization
|
||||||
|
//!
|
||||||
|
//! The [`tracing-subscriber`][tracing_subscriber] crate allows for
|
||||||
|
//! customization such as logging to files or formatting logs with JSON.
|
||||||
|
//!
|
||||||
|
//! Further customization is possible by implementing a
|
||||||
|
//! [`tracing_subscriber::layer::Layer`] or a dedicated [`tracing::Subscriber`].
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
use tracing_subscriber::fmt::format::Writer;
|
||||||
|
use tracing_subscriber::fmt::time::{FormatTime, SystemTime};
|
||||||
|
|
||||||
|
use crate::executor::SIMULATION_CONTEXT;
|
||||||
|
|
||||||
|
/// A timer that can be used in conjunction with the
|
||||||
|
/// [`tracing-subscriber`][tracing_subscriber] crate to log events using the
|
||||||
|
/// simulation time instead of (or on top of) the wall clock time.
|
||||||
|
///
|
||||||
|
/// See the [module-level documentation][crate::tracing] for more details.
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct SimulationTime<const VERBOSE: bool, T> {
|
||||||
|
sys_timer: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SimulationTime<false, SystemTime> {
|
||||||
|
/// Constructs a new simulation timer which falls back to the [`SystemTime`]
|
||||||
|
/// timer for events generated outside the simulator.
|
||||||
|
pub fn with_system_timer() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SimulationTime<true, SystemTime> {
|
||||||
|
/// Constructs a new simulation timer which prepends a [`SystemTime`]
|
||||||
|
/// timestamp to all tracing events, as well as a simulation timestamp for
|
||||||
|
/// simulation events.
|
||||||
|
pub fn with_system_timer_always() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: FormatTime> SimulationTime<false, T> {
|
||||||
|
/// Constructs a new simulation timer which falls back to the provided
|
||||||
|
/// timer for tracing events generated outside the simulator.
|
||||||
|
pub fn with_custom_timer(sys_timer: T) -> Self {
|
||||||
|
Self { sys_timer }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: FormatTime> SimulationTime<true, T> {
|
||||||
|
/// Constructs a new simulation timer which prepends a timestamp generated
|
||||||
|
/// with the provided timer to all tracing events, as well as a simulation
|
||||||
|
/// timestamp for simulation events.
|
||||||
|
pub fn with_custom_timer_always(sys_timer: T) -> Self {
|
||||||
|
Self { sys_timer }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<const VERBOSE: bool, T: FormatTime> FormatTime for SimulationTime<VERBOSE, T> {
|
||||||
|
fn format_time(&self, w: &mut Writer<'_>) -> fmt::Result {
|
||||||
|
SIMULATION_CONTEXT
|
||||||
|
.map(|ctx| {
|
||||||
|
if VERBOSE {
|
||||||
|
self.sys_timer.format_time(w)?;
|
||||||
|
w.write_char(' ')?;
|
||||||
|
}
|
||||||
|
write!(w, "[{:.9}]", ctx.time_reader.try_read().unwrap())
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| self.sys_timer.format_time(w))
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user