forked from ROMEO/nexosim
Merge pull request #30 from asynchronics/feature-asynchronuous-scheduling
Change scheduler interface and add external inputs example.
This commit is contained in:
commit
a163e5a1e1
@ -73,9 +73,10 @@ waker-fn = "1.1"
|
|||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
atomic-wait = "1.1"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
futures-executor = "0.3"
|
futures-executor = "0.3"
|
||||||
|
mio = { version = "1.0", features = ["os-poll", "net"] }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = { version = "0.11", optional = true }
|
tonic-build = { version = "0.11", optional = true }
|
||||||
|
@ -109,6 +109,8 @@ fn main() {
|
|||||||
.add_model(assembly, assembly_mbox, "assembly")
|
.add_model(assembly, assembly_mbox, "assembly")
|
||||||
.init(t0);
|
.init(t0);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// ----------
|
// ----------
|
||||||
// Simulation.
|
// Simulation.
|
||||||
// ----------
|
// ----------
|
||||||
@ -120,7 +122,8 @@ fn main() {
|
|||||||
assert!(position.next().is_none());
|
assert!(position.next().is_none());
|
||||||
|
|
||||||
// Start the motor in 2s with a PPS of 10Hz.
|
// Start the motor in 2s with a PPS of 10Hz.
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
Duration::from_secs(2),
|
Duration::from_secs(2),
|
||||||
MotorAssembly::pulse_rate,
|
MotorAssembly::pulse_rate,
|
||||||
10.0,
|
10.0,
|
||||||
|
@ -140,6 +140,7 @@ impl Controller {
|
|||||||
// Schedule the `stop_brew()` method and turn on the pump.
|
// Schedule the `stop_brew()` method and turn on the pump.
|
||||||
self.stop_brew_key = Some(
|
self.stop_brew_key = Some(
|
||||||
context
|
context
|
||||||
|
.scheduler
|
||||||
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
|
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
);
|
);
|
||||||
@ -206,7 +207,7 @@ impl Tank {
|
|||||||
state.set_empty_key.cancel();
|
state.set_empty_key.cancel();
|
||||||
|
|
||||||
// Update the volume, saturating at 0 in case of rounding errors.
|
// Update the volume, saturating at 0 in case of rounding errors.
|
||||||
let time = context.time();
|
let time = context.scheduler.time();
|
||||||
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
|
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
|
||||||
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);
|
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);
|
||||||
|
|
||||||
@ -231,7 +232,7 @@ impl Tank {
|
|||||||
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
|
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
|
||||||
assert!(flow_rate >= 0.0);
|
assert!(flow_rate >= 0.0);
|
||||||
|
|
||||||
let time = context.time();
|
let time = context.scheduler.time();
|
||||||
|
|
||||||
// If the flow rate was non-zero up to now, update the volume.
|
// If the flow rate was non-zero up to now, update the volume.
|
||||||
if let Some(state) = self.dynamic_state.take() {
|
if let Some(state) = self.dynamic_state.take() {
|
||||||
@ -273,7 +274,10 @@ impl Tank {
|
|||||||
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);
|
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);
|
||||||
|
|
||||||
// Schedule the next update.
|
// Schedule the next update.
|
||||||
match context.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) {
|
match context
|
||||||
|
.scheduler
|
||||||
|
.schedule_keyed_event(duration_until_empty, Self::set_empty, ())
|
||||||
|
{
|
||||||
Ok(set_empty_key) => {
|
Ok(set_empty_key) => {
|
||||||
let state = TankDynamicState {
|
let state = TankDynamicState {
|
||||||
last_volume_update: time,
|
last_volume_update: time,
|
||||||
@ -373,6 +377,8 @@ fn main() {
|
|||||||
.add_model(tank, tank_mbox, "tank")
|
.add_model(tank, tank_mbox, "tank")
|
||||||
.init(t0);
|
.init(t0);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// ----------
|
// ----------
|
||||||
// Simulation.
|
// Simulation.
|
||||||
// ----------
|
// ----------
|
||||||
@ -426,7 +432,8 @@ fn main() {
|
|||||||
assert_eq!(flow_rate.next(), Some(0.0));
|
assert_eq!(flow_rate.next(), Some(0.0));
|
||||||
|
|
||||||
// Interrupt the brew after 15s by pressing again the brew button.
|
// Interrupt the brew after 15s by pressing again the brew button.
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
Duration::from_secs(15),
|
Duration::from_secs(15),
|
||||||
Controller::brew_cmd,
|
Controller::brew_cmd,
|
||||||
(),
|
(),
|
||||||
|
251
asynchronix/examples/external_input.rs
Normal file
251
asynchronix/examples/external_input.rs
Normal file
@ -0,0 +1,251 @@
|
|||||||
|
//! Example: a model that reads data from the external world.
|
||||||
|
//!
|
||||||
|
//! This example demonstrates in particular:
|
||||||
|
//!
|
||||||
|
//! * external world inputs (useful in cosimulation),
|
||||||
|
//! * system clock,
|
||||||
|
//! * periodic scheduling.
|
||||||
|
//!
|
||||||
|
//! ```text
|
||||||
|
//! ┌────────────────────────────────┐
|
||||||
|
//! │ Simulation │
|
||||||
|
//! ┌────────────┐ ┌────────────┐ │ ┌──────────┐ │
|
||||||
|
//! │ │ UDP │ │ message │ message │ │ message │ ┌─────────────┐
|
||||||
|
//! │ UDP Client ├─────────▶│ UDP Server ├──────────▶├─────────▶│ Listener ├─────────▶├──▶│ EventBuffer │
|
||||||
|
//! │ │ message │ │ │ │ │ │ └─────────────┘
|
||||||
|
//! └────────────┘ └────────────┘ │ └──────────┘ │
|
||||||
|
//! └────────────────────────────────┘
|
||||||
|
//! ```
|
||||||
|
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||||
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread::{self, sleep, JoinHandle};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use atomic_wait::{wait, wake_one};
|
||||||
|
|
||||||
|
use mio::net::UdpSocket as MioUdpSocket;
|
||||||
|
use mio::{Events, Interest, Poll, Token};
|
||||||
|
|
||||||
|
use asynchronix::model::{Context, InitializedModel, Model, SetupContext};
|
||||||
|
use asynchronix::ports::{EventBuffer, Output};
|
||||||
|
use asynchronix::simulation::{Mailbox, SimInit};
|
||||||
|
use asynchronix::time::{AutoSystemClock, MonotonicTime};
|
||||||
|
|
||||||
|
const DELTA: Duration = Duration::from_millis(2);
|
||||||
|
const PERIOD: Duration = Duration::from_millis(20);
|
||||||
|
const N: u32 = 10;
|
||||||
|
const SENDER: &str = "127.0.0.1:8000";
|
||||||
|
const RECEIVER: &str = "127.0.0.1:9000";
|
||||||
|
|
||||||
|
/// Model that receives external input.
|
||||||
|
pub struct Listener {
|
||||||
|
/// Received message.
|
||||||
|
pub message: Output<String>,
|
||||||
|
|
||||||
|
/// Receiver of external messages.
|
||||||
|
rx: Receiver<String>,
|
||||||
|
|
||||||
|
/// External sender.
|
||||||
|
tx: Option<Sender<String>>,
|
||||||
|
|
||||||
|
/// Synchronization with client.
|
||||||
|
start: Arc<AtomicU32>,
|
||||||
|
|
||||||
|
/// Synchronization with simulation.
|
||||||
|
stop: Arc<AtomicBool>,
|
||||||
|
|
||||||
|
/// Handle to UDP Server.
|
||||||
|
external_handle: Option<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Listener {
|
||||||
|
/// Creates a Listener.
|
||||||
|
pub fn new(start: Arc<AtomicU32>) -> Self {
|
||||||
|
start.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
Self {
|
||||||
|
message: Output::default(),
|
||||||
|
rx,
|
||||||
|
tx: Some(tx),
|
||||||
|
start,
|
||||||
|
stop: Arc::new(AtomicBool::new(false)),
|
||||||
|
external_handle: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Periodically scheduled function that processes external events.
|
||||||
|
pub async fn process(&mut self) {
|
||||||
|
loop {
|
||||||
|
if let Ok(message) = self.rx.try_recv() {
|
||||||
|
self.message.send(message).await;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// UDP server.
|
||||||
|
///
|
||||||
|
/// Code is based on the MIO UDP example.
|
||||||
|
fn listener(tx: Sender<String>, start: Arc<AtomicU32>, stop: Arc<AtomicBool>) {
|
||||||
|
const UDP_SOCKET: Token = Token(0);
|
||||||
|
let mut poll = Poll::new().unwrap();
|
||||||
|
let mut events = Events::with_capacity(10);
|
||||||
|
let mut socket = MioUdpSocket::bind(RECEIVER.parse().unwrap()).unwrap();
|
||||||
|
poll.registry()
|
||||||
|
.register(&mut socket, UDP_SOCKET, Interest::READABLE)
|
||||||
|
.unwrap();
|
||||||
|
let mut buf = [0; 1 << 16];
|
||||||
|
|
||||||
|
// Wake up the client.
|
||||||
|
start.store(1, Ordering::Relaxed);
|
||||||
|
wake_one(&*start);
|
||||||
|
|
||||||
|
'process: loop {
|
||||||
|
// Wait for UDP packet or end of simulation.
|
||||||
|
if let Err(err) = poll.poll(&mut events, Some(Duration::from_secs(1))) {
|
||||||
|
if err.kind() == ErrorKind::Interrupted {
|
||||||
|
// Exit if simulation is finished.
|
||||||
|
if stop.load(Ordering::Relaxed) {
|
||||||
|
break 'process;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break 'process;
|
||||||
|
}
|
||||||
|
|
||||||
|
for event in events.iter() {
|
||||||
|
match event.token() {
|
||||||
|
UDP_SOCKET => loop {
|
||||||
|
match socket.recv_from(&mut buf) {
|
||||||
|
Ok((packet_size, _)) => {
|
||||||
|
if let Ok(message) = std::str::from_utf8(&buf[..packet_size]) {
|
||||||
|
// Inject external message into simulation.
|
||||||
|
if tx.send(message.into()).is_err() {
|
||||||
|
break 'process;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Err(e) if e.kind() == ErrorKind::WouldBlock => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
break 'process;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
panic!("Got event for unexpected token: {:?}", event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Exit if simulation is finished.
|
||||||
|
if stop.load(Ordering::Relaxed) {
|
||||||
|
break 'process;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
poll.registry().deregister(&mut socket).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Model for Listener {
|
||||||
|
/// Start UDP Server on model setup.
|
||||||
|
fn setup(&mut self, _: &SetupContext<Self>) {
|
||||||
|
let tx = self.tx.take().unwrap();
|
||||||
|
let start = Arc::clone(&self.start);
|
||||||
|
let stop = Arc::clone(&self.stop);
|
||||||
|
self.external_handle = Some(thread::spawn(move || {
|
||||||
|
Self::listener(tx, start, stop);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize model.
|
||||||
|
async fn init(self, context: &Context<Self>) -> InitializedModel<Self> {
|
||||||
|
// Schedule periodic function that processes external events.
|
||||||
|
context
|
||||||
|
.scheduler
|
||||||
|
.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
self.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Listener {
|
||||||
|
/// Notify UDP Server that simulation is over and wait for server shutdown.
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.stop.store(true, Ordering::Relaxed);
|
||||||
|
let handle = self.external_handle.take();
|
||||||
|
if let Some(handle) = handle {
|
||||||
|
handle.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
// ---------------
|
||||||
|
// Bench assembly.
|
||||||
|
// ---------------
|
||||||
|
|
||||||
|
// Models.
|
||||||
|
|
||||||
|
// Client-server synchronization.
|
||||||
|
let start = Arc::new(AtomicU32::new(0));
|
||||||
|
|
||||||
|
let mut listener = Listener::new(Arc::clone(&start));
|
||||||
|
|
||||||
|
// Mailboxes.
|
||||||
|
let listener_mbox = Mailbox::new();
|
||||||
|
|
||||||
|
// Model handles for simulation.
|
||||||
|
let mut message = EventBuffer::new();
|
||||||
|
listener.message.connect_sink(&message);
|
||||||
|
|
||||||
|
// Start time (arbitrary since models do not depend on absolute time).
|
||||||
|
let t0 = MonotonicTime::EPOCH;
|
||||||
|
|
||||||
|
// Assembly and initialization.
|
||||||
|
let mut simu = SimInit::new()
|
||||||
|
.add_model(listener, listener_mbox, "listener")
|
||||||
|
.set_clock(AutoSystemClock::new())
|
||||||
|
.init(t0);
|
||||||
|
|
||||||
|
// ----------
|
||||||
|
// Simulation.
|
||||||
|
// ----------
|
||||||
|
|
||||||
|
// External client that sends UDP messages.
|
||||||
|
let sender_handle = thread::spawn(move || {
|
||||||
|
// Wait until UDP Server is ready.
|
||||||
|
wait(&start, 0);
|
||||||
|
|
||||||
|
for i in 0..N {
|
||||||
|
let socket = UdpSocket::bind(SENDER).unwrap();
|
||||||
|
socket.send_to(i.to_string().as_bytes(), RECEIVER).unwrap();
|
||||||
|
if i % 3 == 0 {
|
||||||
|
sleep(PERIOD * i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Advance simulation, external messages will be collected.
|
||||||
|
simu.step_by(Duration::from_secs(2));
|
||||||
|
|
||||||
|
// Check collected external messages.
|
||||||
|
let mut packets = 0_u32;
|
||||||
|
for _ in 0..N {
|
||||||
|
// UDP can reorder packages, we are expecting that on not too loaded
|
||||||
|
// localhost packages would not be dropped
|
||||||
|
packets |= 1 << message.next().unwrap().parse::<u8>().unwrap();
|
||||||
|
}
|
||||||
|
assert_eq!(packets, u32::MAX >> 22);
|
||||||
|
assert_eq!(message.next(), None);
|
||||||
|
|
||||||
|
sender_handle.join().unwrap();
|
||||||
|
}
|
@ -58,7 +58,7 @@ impl Motor {
|
|||||||
println!(
|
println!(
|
||||||
"Model instance {} at time {}: setting currents: {:.2} and {:.2}",
|
"Model instance {} at time {}: setting currents: {:.2} and {:.2}",
|
||||||
context.name(),
|
context.name(),
|
||||||
context.time(),
|
context.scheduler.time(),
|
||||||
current.0,
|
current.0,
|
||||||
current.1
|
current.1
|
||||||
);
|
);
|
||||||
@ -91,7 +91,7 @@ impl Motor {
|
|||||||
println!(
|
println!(
|
||||||
"Model instance {} at time {}: setting load: {:.2}",
|
"Model instance {} at time {}: setting load: {:.2}",
|
||||||
context.name(),
|
context.name(),
|
||||||
context.time(),
|
context.scheduler.time(),
|
||||||
torque
|
torque
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -141,7 +141,7 @@ impl Driver {
|
|||||||
println!(
|
println!(
|
||||||
"Model instance {} at time {}: setting pps: {:.2}",
|
"Model instance {} at time {}: setting pps: {:.2}",
|
||||||
context.name(),
|
context.name(),
|
||||||
context.time(),
|
context.scheduler.time(),
|
||||||
pps
|
pps
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -172,7 +172,7 @@ impl Driver {
|
|||||||
println!(
|
println!(
|
||||||
"Model instance {} at time {}: sending pulse",
|
"Model instance {} at time {}: sending pulse",
|
||||||
context.name(),
|
context.name(),
|
||||||
context.time()
|
context.scheduler.time()
|
||||||
);
|
);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
@ -195,6 +195,7 @@ impl Driver {
|
|||||||
|
|
||||||
// Schedule the next pulse.
|
// Schedule the next pulse.
|
||||||
context
|
context
|
||||||
|
.scheduler
|
||||||
.schedule_event(pulse_duration, Self::send_pulse, ())
|
.schedule_event(pulse_duration, Self::send_pulse, ())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
@ -236,6 +237,8 @@ fn main() {
|
|||||||
.add_model(motor, motor_mbox, "motor")
|
.add_model(motor, motor_mbox, "motor")
|
||||||
.init(t0);
|
.init(t0);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// ----------
|
// ----------
|
||||||
// Simulation.
|
// Simulation.
|
||||||
// ----------
|
// ----------
|
||||||
@ -247,7 +250,8 @@ fn main() {
|
|||||||
assert!(position.next().is_none());
|
assert!(position.next().is_none());
|
||||||
|
|
||||||
// Start the motor in 2s with a PPS of 10Hz.
|
// Start the motor in 2s with a PPS of 10Hz.
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
Duration::from_secs(2),
|
Duration::from_secs(2),
|
||||||
Driver::pulse_rate,
|
Driver::pulse_rate,
|
||||||
10.0,
|
10.0,
|
||||||
|
@ -119,7 +119,7 @@
|
|||||||
//! }
|
//! }
|
||||||
//! impl Delay {
|
//! impl Delay {
|
||||||
//! pub fn input(&mut self, value: f64, context: &Context<Self>) {
|
//! pub fn input(&mut self, value: f64, context: &Context<Self>) {
|
||||||
//! context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
|
//! context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
|
||||||
//! }
|
//! }
|
||||||
//!
|
//!
|
||||||
//! async fn send(&mut self, value: f64) {
|
//! async fn send(&mut self, value: f64) {
|
||||||
@ -190,7 +190,7 @@
|
|||||||
//! # }
|
//! # }
|
||||||
//! # impl Delay {
|
//! # impl Delay {
|
||||||
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
|
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
|
||||||
//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
|
//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
|
||||||
//! # }
|
//! # }
|
||||||
//! # async fn send(&mut self, value: f64) { // this method can be private
|
//! # async fn send(&mut self, value: f64) { // this method can be private
|
||||||
//! # self.output.send(value).await;
|
//! # self.output.send(value).await;
|
||||||
@ -250,7 +250,7 @@
|
|||||||
//! [`Simulation::process_event()`](simulation::Simulation::process_event) or
|
//! [`Simulation::process_event()`](simulation::Simulation::process_event) or
|
||||||
//! [`Simulation::send_query()`](simulation::Simulation::process_query),
|
//! [`Simulation::send_query()`](simulation::Simulation::process_query),
|
||||||
//! 3. by scheduling events, using for instance
|
//! 3. by scheduling events, using for instance
|
||||||
//! [`Simulation::schedule_event()`](simulation::Simulation::schedule_event).
|
//! [`Scheduler::schedule_event()`](simulation::Scheduler::schedule_event).
|
||||||
//!
|
//!
|
||||||
//! When initialized with the default clock, the simulation will run as fast as
|
//! When initialized with the default clock, the simulation will run as fast as
|
||||||
//! possible, without regard for the actual wall clock time. Alternatively, the
|
//! possible, without regard for the actual wall clock time. Alternatively, the
|
||||||
@ -289,7 +289,7 @@
|
|||||||
//! # }
|
//! # }
|
||||||
//! # impl Delay {
|
//! # impl Delay {
|
||||||
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
|
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
|
||||||
//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
|
//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
|
||||||
//! # }
|
//! # }
|
||||||
//! # async fn send(&mut self, value: f64) { // this method can be private
|
//! # async fn send(&mut self, value: f64) { // this method can be private
|
||||||
//! # self.output.send(value).await;
|
//! # self.output.send(value).await;
|
||||||
@ -370,7 +370,7 @@
|
|||||||
//!
|
//!
|
||||||
//! The first guarantee (and only the first) also extends to events scheduled
|
//! The first guarantee (and only the first) also extends to events scheduled
|
||||||
//! from a simulation with a
|
//! from a simulation with a
|
||||||
//! [`Simulation::schedule_*()`](simulation::Simulation::schedule_event) method:
|
//! [`Scheduler::schedule_*()`](simulation::Scheduler::schedule_event) method:
|
||||||
//! if the scheduler contains several events to be delivered at the same time to
|
//! if the scheduler contains several events to be delivered at the same time to
|
||||||
//! the same model, these events will always be processed in the order in which
|
//! the same model, these events will always be processed in the order in which
|
||||||
//! they were scheduled.
|
//! they were scheduled.
|
||||||
|
@ -1,17 +1,7 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crate::channel::Sender;
|
|
||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::ports::InputFn;
|
use crate::simulation::{self, LocalScheduler, Mailbox};
|
||||||
use crate::simulation::{
|
|
||||||
self, schedule_event_at_unchecked, schedule_keyed_event_at_unchecked,
|
|
||||||
schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, ActionKey,
|
|
||||||
Deadline, Mailbox, SchedulerQueue, SchedulingError,
|
|
||||||
};
|
|
||||||
use crate::time::{MonotonicTime, TearableAtomicTime};
|
|
||||||
use crate::util::sync_cell::SyncCellReader;
|
|
||||||
|
|
||||||
use super::Model;
|
use super::Model;
|
||||||
|
|
||||||
@ -60,13 +50,13 @@ use super::Model;
|
|||||||
/// impl DelayedGreeter {
|
/// impl DelayedGreeter {
|
||||||
/// // Triggers a greeting on the output port after some delay [input port].
|
/// // Triggers a greeting on the output port after some delay [input port].
|
||||||
/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context<Self>) {
|
/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context<Self>) {
|
||||||
/// let time = context.time();
|
/// let time = context.scheduler.time();
|
||||||
/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time);
|
/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time);
|
||||||
///
|
///
|
||||||
/// if delay.is_zero() {
|
/// if delay.is_zero() {
|
||||||
/// self.msg_out.send(greeting).await;
|
/// self.msg_out.send(greeting).await;
|
||||||
/// } else {
|
/// } else {
|
||||||
/// context.schedule_event(delay, Self::send_msg, greeting).unwrap();
|
/// context.scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap();
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
///
|
///
|
||||||
@ -82,320 +72,21 @@ use super::Model;
|
|||||||
// https://github.com/rust-lang/rust/issues/78649
|
// https://github.com/rust-lang/rust/issues/78649
|
||||||
pub struct Context<M: Model> {
|
pub struct Context<M: Model> {
|
||||||
name: String,
|
name: String,
|
||||||
sender: Sender<M>,
|
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
/// Local scheduler.
|
||||||
time: SyncCellReader<TearableAtomicTime>,
|
pub scheduler: LocalScheduler<M>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Model> Context<M> {
|
impl<M: Model> Context<M> {
|
||||||
/// Creates a new local context.
|
/// Creates a new local context.
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(name: String, scheduler: LocalScheduler<M>) -> Self {
|
||||||
name: String,
|
Self { name, scheduler }
|
||||||
sender: Sender<M>,
|
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
|
||||||
time: SyncCellReader<TearableAtomicTime>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
name,
|
|
||||||
sender,
|
|
||||||
scheduler_queue,
|
|
||||||
time,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the model instance name.
|
/// Returns the model instance name.
|
||||||
pub fn name(&self) -> &str {
|
pub fn name(&self) -> &str {
|
||||||
&self.name
|
&self.name
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the current simulation time.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use asynchronix::model::{Context, Model};
|
|
||||||
/// use asynchronix::time::MonotonicTime;
|
|
||||||
///
|
|
||||||
/// fn is_third_millenium<M: Model>(context: &Context<M>) -> bool {
|
|
||||||
/// let time = context.time();
|
|
||||||
/// time >= MonotonicTime::new(978307200, 0).unwrap()
|
|
||||||
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
pub fn time(&self) -> MonotonicTime {
|
|
||||||
self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules an event at a future time.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified deadline is not in the future of
|
|
||||||
/// the current simulation time.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use std::time::Duration;
|
|
||||||
///
|
|
||||||
/// use asynchronix::model::{Context, Model};
|
|
||||||
///
|
|
||||||
/// // A timer.
|
|
||||||
/// pub struct Timer {}
|
|
||||||
///
|
|
||||||
/// impl Timer {
|
|
||||||
/// // Sets an alarm [input port].
|
|
||||||
/// pub fn set(&mut self, setting: Duration, context: &Context<Self>) {
|
|
||||||
/// if context.schedule_event(setting, Self::ring, ()).is_err() {
|
|
||||||
/// println!("The alarm clock can only be set for a future time");
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// // Rings [private input port].
|
|
||||||
/// fn ring(&mut self) {
|
|
||||||
/// println!("Brringggg");
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// impl Model for Timer {}
|
|
||||||
/// ```
|
|
||||||
pub fn schedule_event<F, T, S>(
|
|
||||||
&self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
) -> Result<(), SchedulingError>
|
|
||||||
where
|
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
let sender = self.sender.clone();
|
|
||||||
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a cancellable event at a future time and returns an action
|
|
||||||
/// key.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified deadline is not in the future of
|
|
||||||
/// the current simulation time.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use asynchronix::model::{Context, Model};
|
|
||||||
/// use asynchronix::simulation::ActionKey;
|
|
||||||
/// use asynchronix::time::MonotonicTime;
|
|
||||||
///
|
|
||||||
/// // An alarm clock that can be cancelled.
|
|
||||||
/// #[derive(Default)]
|
|
||||||
/// pub struct CancellableAlarmClock {
|
|
||||||
/// event_key: Option<ActionKey>,
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// impl CancellableAlarmClock {
|
|
||||||
/// // Sets an alarm [input port].
|
|
||||||
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
|
||||||
/// self.cancel();
|
|
||||||
/// match context.schedule_keyed_event(setting, Self::ring, ()) {
|
|
||||||
/// Ok(event_key) => self.event_key = Some(event_key),
|
|
||||||
/// Err(_) => println!("The alarm clock can only be set for a future time"),
|
|
||||||
/// };
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// // Cancels the current alarm, if any [input port].
|
|
||||||
/// pub fn cancel(&mut self) {
|
|
||||||
/// self.event_key.take().map(|k| k.cancel());
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// // Rings the alarm [private input port].
|
|
||||||
/// fn ring(&mut self) {
|
|
||||||
/// println!("Brringggg!");
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// impl Model for CancellableAlarmClock {}
|
|
||||||
/// ```
|
|
||||||
pub fn schedule_keyed_event<F, T, S>(
|
|
||||||
&self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
) -> Result<ActionKey, SchedulingError>
|
|
||||||
where
|
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
let sender = self.sender.clone();
|
|
||||||
let event_key =
|
|
||||||
schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
|
|
||||||
|
|
||||||
Ok(event_key)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a periodically recurring event at a future time.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified deadline is not in the future of
|
|
||||||
/// the current simulation time or if the specified period is null.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use std::time::Duration;
|
|
||||||
///
|
|
||||||
/// use asynchronix::model::{Context, Model};
|
|
||||||
/// use asynchronix::time::MonotonicTime;
|
|
||||||
///
|
|
||||||
/// // An alarm clock beeping at 1Hz.
|
|
||||||
/// pub struct BeepingAlarmClock {}
|
|
||||||
///
|
|
||||||
/// impl BeepingAlarmClock {
|
|
||||||
/// // Sets an alarm [input port].
|
|
||||||
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
|
||||||
/// if context.schedule_periodic_event(
|
|
||||||
/// setting,
|
|
||||||
/// Duration::from_secs(1), // 1Hz = 1/1s
|
|
||||||
/// Self::beep,
|
|
||||||
/// ()
|
|
||||||
/// ).is_err() {
|
|
||||||
/// println!("The alarm clock can only be set for a future time");
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// // Emits a single beep [private input port].
|
|
||||||
/// fn beep(&mut self) {
|
|
||||||
/// println!("Beep!");
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// impl Model for BeepingAlarmClock {}
|
|
||||||
/// ```
|
|
||||||
pub fn schedule_periodic_event<F, T, S>(
|
|
||||||
&self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
period: Duration,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
) -> Result<(), SchedulingError>
|
|
||||||
where
|
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
if period.is_zero() {
|
|
||||||
return Err(SchedulingError::NullRepetitionPeriod);
|
|
||||||
}
|
|
||||||
let sender = self.sender.clone();
|
|
||||||
schedule_periodic_event_at_unchecked(
|
|
||||||
time,
|
|
||||||
period,
|
|
||||||
func,
|
|
||||||
arg,
|
|
||||||
sender,
|
|
||||||
&self.scheduler_queue,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a cancellable, periodically recurring event at a future time
|
|
||||||
/// and returns an action key.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified deadline is not in the future of
|
|
||||||
/// the current simulation time or if the specified period is null.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use std::time::Duration;
|
|
||||||
///
|
|
||||||
/// use asynchronix::model::{Context, Model};
|
|
||||||
/// use asynchronix::simulation::ActionKey;
|
|
||||||
/// use asynchronix::time::MonotonicTime;
|
|
||||||
///
|
|
||||||
/// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
|
|
||||||
/// // stopped after it sets off.
|
|
||||||
/// #[derive(Default)]
|
|
||||||
/// pub struct CancellableBeepingAlarmClock {
|
|
||||||
/// event_key: Option<ActionKey>,
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// impl CancellableBeepingAlarmClock {
|
|
||||||
/// // Sets an alarm [input port].
|
|
||||||
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
|
||||||
/// self.cancel();
|
|
||||||
/// match context.schedule_keyed_periodic_event(
|
|
||||||
/// setting,
|
|
||||||
/// Duration::from_secs(1), // 1Hz = 1/1s
|
|
||||||
/// Self::beep,
|
|
||||||
/// ()
|
|
||||||
/// ) {
|
|
||||||
/// Ok(event_key) => self.event_key = Some(event_key),
|
|
||||||
/// Err(_) => println!("The alarm clock can only be set for a future time"),
|
|
||||||
/// };
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// // Cancels or stops the alarm [input port].
|
|
||||||
/// pub fn cancel(&mut self) {
|
|
||||||
/// self.event_key.take().map(|k| k.cancel());
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// // Emits a single beep [private input port].
|
|
||||||
/// fn beep(&mut self) {
|
|
||||||
/// println!("Beep!");
|
|
||||||
/// }
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// impl Model for CancellableBeepingAlarmClock {}
|
|
||||||
/// ```
|
|
||||||
pub fn schedule_keyed_periodic_event<F, T, S>(
|
|
||||||
&self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
period: Duration,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
) -> Result<ActionKey, SchedulingError>
|
|
||||||
where
|
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
if period.is_zero() {
|
|
||||||
return Err(SchedulingError::NullRepetitionPeriod);
|
|
||||||
}
|
|
||||||
let sender = self.sender.clone();
|
|
||||||
let event_key = schedule_periodic_keyed_event_at_unchecked(
|
|
||||||
time,
|
|
||||||
period,
|
|
||||||
func,
|
|
||||||
arg,
|
|
||||||
sender,
|
|
||||||
&self.scheduler_queue,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(event_key)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Model> fmt::Debug for Context<M> {
|
impl<M: Model> fmt::Debug for Context<M> {
|
||||||
@ -501,8 +192,7 @@ impl<'a, M: Model> SetupContext<'a, M> {
|
|||||||
model,
|
model,
|
||||||
mailbox,
|
mailbox,
|
||||||
submodel_name,
|
submodel_name,
|
||||||
self.context.scheduler_queue.clone(),
|
self.context.scheduler.scheduler.clone(),
|
||||||
self.context.time.clone(),
|
|
||||||
self.executor,
|
self.executor,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -557,6 +557,7 @@ mod tests {
|
|||||||
|
|
||||||
use crate::channel::Receiver;
|
use crate::channel::Receiver;
|
||||||
use crate::model::Context;
|
use crate::model::Context;
|
||||||
|
use crate::simulation::{Address, LocalScheduler, Scheduler};
|
||||||
use crate::time::{MonotonicTime, TearableAtomicTime};
|
use crate::time::{MonotonicTime, TearableAtomicTime};
|
||||||
use crate::util::priority_queue::PriorityQueue;
|
use crate::util::priority_queue::PriorityQueue;
|
||||||
use crate::util::sync_cell::SyncCell;
|
use crate::util::sync_cell::SyncCell;
|
||||||
@ -616,9 +617,10 @@ mod tests {
|
|||||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||||
let dummy_context = Context::new(
|
let dummy_context = Context::new(
|
||||||
String::new(),
|
String::new(),
|
||||||
dummy_address,
|
LocalScheduler::new(
|
||||||
dummy_priority_queue,
|
Scheduler::new(dummy_priority_queue, dummy_time),
|
||||||
dummy_time,
|
Address(dummy_address),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
||||||
}
|
}
|
||||||
@ -671,9 +673,10 @@ mod tests {
|
|||||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||||
let dummy_context = Context::new(
|
let dummy_context = Context::new(
|
||||||
String::new(),
|
String::new(),
|
||||||
dummy_address,
|
LocalScheduler::new(
|
||||||
dummy_priority_queue,
|
Scheduler::new(dummy_priority_queue, dummy_time),
|
||||||
dummy_time,
|
Address(dummy_address),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
||||||
thread::sleep(std::time::Duration::from_millis(100));
|
thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
@ -440,6 +440,7 @@ mod tests {
|
|||||||
|
|
||||||
use crate::channel::Receiver;
|
use crate::channel::Receiver;
|
||||||
use crate::model::Context;
|
use crate::model::Context;
|
||||||
|
use crate::simulation::{Address, LocalScheduler, Scheduler};
|
||||||
use crate::time::{MonotonicTime, TearableAtomicTime};
|
use crate::time::{MonotonicTime, TearableAtomicTime};
|
||||||
use crate::util::priority_queue::PriorityQueue;
|
use crate::util::priority_queue::PriorityQueue;
|
||||||
use crate::util::sync_cell::SyncCell;
|
use crate::util::sync_cell::SyncCell;
|
||||||
@ -499,9 +500,10 @@ mod tests {
|
|||||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||||
let dummy_context = Context::new(
|
let dummy_context = Context::new(
|
||||||
String::new(),
|
String::new(),
|
||||||
dummy_address,
|
LocalScheduler::new(
|
||||||
dummy_priority_queue,
|
Scheduler::new(dummy_priority_queue, dummy_time),
|
||||||
dummy_time,
|
Address(dummy_address),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
||||||
}
|
}
|
||||||
@ -554,9 +556,10 @@ mod tests {
|
|||||||
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
|
||||||
let dummy_context = Context::new(
|
let dummy_context = Context::new(
|
||||||
String::new(),
|
String::new(),
|
||||||
dummy_address,
|
LocalScheduler::new(
|
||||||
dummy_priority_queue,
|
Scheduler::new(dummy_priority_queue, dummy_time),
|
||||||
dummy_time,
|
Address(dummy_address),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
|
||||||
thread::sleep(std::time::Duration::from_millis(100));
|
thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
@ -127,12 +127,12 @@ mod scheduler;
|
|||||||
mod sim_init;
|
mod sim_init;
|
||||||
|
|
||||||
pub use mailbox::{Address, Mailbox};
|
pub use mailbox::{Address, Mailbox};
|
||||||
|
pub use scheduler::{
|
||||||
|
Action, ActionKey, AutoActionKey, Deadline, LocalScheduler, Scheduler, SchedulingError,
|
||||||
|
};
|
||||||
pub(crate) use scheduler::{
|
pub(crate) use scheduler::{
|
||||||
schedule_event_at_unchecked, schedule_keyed_event_at_unchecked,
|
|
||||||
schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked,
|
|
||||||
KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue,
|
KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue,
|
||||||
};
|
};
|
||||||
pub use scheduler::{Action, ActionKey, AutoActionKey, Deadline, SchedulingError};
|
|
||||||
pub use sim_init::SimInit;
|
pub use sim_init::SimInit;
|
||||||
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
@ -149,7 +149,7 @@ use crate::ports::{InputFn, ReplierFn};
|
|||||||
use crate::time::{Clock, MonotonicTime, TearableAtomicTime};
|
use crate::time::{Clock, MonotonicTime, TearableAtomicTime};
|
||||||
use crate::util::seq_futures::SeqFuture;
|
use crate::util::seq_futures::SeqFuture;
|
||||||
use crate::util::slot;
|
use crate::util::slot;
|
||||||
use crate::util::sync_cell::{SyncCell, SyncCellReader};
|
use crate::util::sync_cell::SyncCell;
|
||||||
|
|
||||||
/// Simulation environment.
|
/// Simulation environment.
|
||||||
///
|
///
|
||||||
@ -161,10 +161,10 @@ use crate::util::sync_cell::{SyncCell, SyncCellReader};
|
|||||||
/// A [`Simulation`] object also manages an event scheduling queue and
|
/// A [`Simulation`] object also manages an event scheduling queue and
|
||||||
/// simulation time. The scheduling queue can be accessed from the simulation
|
/// simulation time. The scheduling queue can be accessed from the simulation
|
||||||
/// itself, but also from models via the optional
|
/// itself, but also from models via the optional
|
||||||
/// [`&Context`](crate::model::Context) argument of input and replier port methods.
|
/// [`&Context`](crate::model::Context) argument of input and replier port
|
||||||
/// Likewise, simulation time can be accessed with the [`Simulation::time()`]
|
/// methods. Likewise, simulation time can be accessed with the
|
||||||
/// method, or from models with the [`Context::time()`](crate::model::Context::time)
|
/// [`Simulation::time()`] method, or from models with the
|
||||||
/// method.
|
/// [`LocalScheduler::time()`](crate::simulation::LocalScheduler::time) method.
|
||||||
///
|
///
|
||||||
/// Events and queries can be scheduled immediately, *i.e.* for the current
|
/// Events and queries can be scheduled immediately, *i.e.* for the current
|
||||||
/// simulation time, using [`process_event()`](Simulation::process_event) and
|
/// simulation time, using [`process_event()`](Simulation::process_event) and
|
||||||
@ -173,7 +173,7 @@ use crate::util::sync_cell::{SyncCell, SyncCellReader};
|
|||||||
/// completed. In the case of queries, the response is returned.
|
/// completed. In the case of queries, the response is returned.
|
||||||
///
|
///
|
||||||
/// Events can also be scheduled at a future simulation time using one of the
|
/// Events can also be scheduled at a future simulation time using one of the
|
||||||
/// [`schedule_*()`](Simulation::schedule_event) method. These methods queue an
|
/// [`schedule_*()`](Scheduler::schedule_event) method. These methods queue an
|
||||||
/// event without blocking.
|
/// event without blocking.
|
||||||
///
|
///
|
||||||
/// Finally, the [`Simulation`] instance manages simulation time. A call to
|
/// Finally, the [`Simulation`] instance manages simulation time. A call to
|
||||||
@ -257,192 +257,9 @@ impl Simulation {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Schedules an action at a future time.
|
/// Returns scheduler.
|
||||||
///
|
pub fn scheduler(&self) -> Scheduler {
|
||||||
/// An error is returned if the specified time is not in the future of the
|
Scheduler::new(self.scheduler_queue.clone(), self.time.reader())
|
||||||
/// current simulation time.
|
|
||||||
///
|
|
||||||
/// If multiple actions send events at the same simulation time to the same
|
|
||||||
/// model, these events are guaranteed to be processed according to the
|
|
||||||
/// scheduling order of the actions.
|
|
||||||
pub fn schedule(
|
|
||||||
&mut self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
action: Action,
|
|
||||||
) -> Result<(), SchedulingError> {
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
|
|
||||||
|
|
||||||
// The channel ID is set to the same value for all actions. This
|
|
||||||
// ensures that the relative scheduling order of all source events is
|
|
||||||
// preserved, which is important if some of them target the same models.
|
|
||||||
// The value 0 was chosen as it prevents collisions with channel IDs as
|
|
||||||
// the latter are always non-zero.
|
|
||||||
scheduler_queue.insert((time, 0), action);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules an event at a future time.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified time is not in the future of the
|
|
||||||
/// current simulation time.
|
|
||||||
///
|
|
||||||
/// Events scheduled for the same time and targeting the same model are
|
|
||||||
/// guaranteed to be processed according to the scheduling order.
|
|
||||||
///
|
|
||||||
/// See also: [`Context::schedule_event`](crate::model::Context::schedule_event).
|
|
||||||
pub fn schedule_event<M, F, T, S>(
|
|
||||||
&mut self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
address: impl Into<Address<M>>,
|
|
||||||
) -> Result<(), SchedulingError>
|
|
||||||
where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a cancellable event at a future time and returns an event key.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified time is not in the future of the
|
|
||||||
/// current simulation time.
|
|
||||||
///
|
|
||||||
/// Events scheduled for the same time and targeting the same model are
|
|
||||||
/// guaranteed to be processed according to the scheduling order.
|
|
||||||
///
|
|
||||||
/// See also: [`Context::schedule_keyed_event`](crate::model::Context::schedule_keyed_event).
|
|
||||||
pub fn schedule_keyed_event<M, F, T, S>(
|
|
||||||
&mut self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
address: impl Into<Address<M>>,
|
|
||||||
) -> Result<ActionKey, SchedulingError>
|
|
||||||
where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
let event_key = schedule_keyed_event_at_unchecked(
|
|
||||||
time,
|
|
||||||
func,
|
|
||||||
arg,
|
|
||||||
address.into().0,
|
|
||||||
&self.scheduler_queue,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(event_key)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a periodically recurring event at a future time.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified time is not in the future of the
|
|
||||||
/// current simulation time or if the specified period is null.
|
|
||||||
///
|
|
||||||
/// Events scheduled for the same time and targeting the same model are
|
|
||||||
/// guaranteed to be processed according to the scheduling order.
|
|
||||||
///
|
|
||||||
/// See also: [`Context::schedule_periodic_event`](crate::model::Context::schedule_periodic_event).
|
|
||||||
pub fn schedule_periodic_event<M, F, T, S>(
|
|
||||||
&mut self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
period: Duration,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
address: impl Into<Address<M>>,
|
|
||||||
) -> Result<(), SchedulingError>
|
|
||||||
where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
if period.is_zero() {
|
|
||||||
return Err(SchedulingError::NullRepetitionPeriod);
|
|
||||||
}
|
|
||||||
schedule_periodic_event_at_unchecked(
|
|
||||||
time,
|
|
||||||
period,
|
|
||||||
func,
|
|
||||||
arg,
|
|
||||||
address.into().0,
|
|
||||||
&self.scheduler_queue,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a cancellable, periodically recurring event at a future time
|
|
||||||
/// and returns an event key.
|
|
||||||
///
|
|
||||||
/// An error is returned if the specified time is not in the future of the
|
|
||||||
/// current simulation time or if the specified period is null.
|
|
||||||
///
|
|
||||||
/// Events scheduled for the same time and targeting the same model are
|
|
||||||
/// guaranteed to be processed according to the scheduling order.
|
|
||||||
///
|
|
||||||
/// See also: [`Context::schedule_keyed_periodic_event`](crate::model::Context::schedule_keyed_periodic_event).
|
|
||||||
pub fn schedule_keyed_periodic_event<M, F, T, S>(
|
|
||||||
&mut self,
|
|
||||||
deadline: impl Deadline,
|
|
||||||
period: Duration,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
address: impl Into<Address<M>>,
|
|
||||||
) -> Result<ActionKey, SchedulingError>
|
|
||||||
where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let now = self.time();
|
|
||||||
let time = deadline.into_time(now);
|
|
||||||
if now >= time {
|
|
||||||
return Err(SchedulingError::InvalidScheduledTime);
|
|
||||||
}
|
|
||||||
if period.is_zero() {
|
|
||||||
return Err(SchedulingError::NullRepetitionPeriod);
|
|
||||||
}
|
|
||||||
let event_key = schedule_periodic_keyed_event_at_unchecked(
|
|
||||||
time,
|
|
||||||
period,
|
|
||||||
func,
|
|
||||||
arg,
|
|
||||||
address.into().0,
|
|
||||||
&self.scheduler_queue,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(event_key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Processes an action immediately, blocking until completion.
|
/// Processes an action immediately, blocking until completion.
|
||||||
@ -630,6 +447,7 @@ impl Simulation {
|
|||||||
None => {
|
None => {
|
||||||
// Update the simulation time.
|
// Update the simulation time.
|
||||||
self.time.write(target_time);
|
self.time.write(target_time);
|
||||||
|
self.clock.synchronize(target_time);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// The target time was not reached yet.
|
// The target time was not reached yet.
|
||||||
@ -667,13 +485,10 @@ pub(crate) fn add_model<M: Model>(
|
|||||||
mut model: M,
|
mut model: M,
|
||||||
mailbox: Mailbox<M>,
|
mailbox: Mailbox<M>,
|
||||||
name: String,
|
name: String,
|
||||||
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
scheduler: Scheduler,
|
||||||
time: SyncCellReader<TearableAtomicTime>,
|
|
||||||
executor: &Executor,
|
executor: &Executor,
|
||||||
) {
|
) {
|
||||||
let sender = mailbox.0.sender();
|
let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address()));
|
||||||
|
|
||||||
let context = Context::new(name, sender, scheduler_queue, time);
|
|
||||||
let setup_context = SetupContext::new(&mailbox, &context, executor);
|
let setup_context = SetupContext::new(&mailbox, &context, executor);
|
||||||
|
|
||||||
model.setup(&setup_context);
|
model.setup(&setup_context);
|
||||||
|
@ -17,8 +17,532 @@ use crate::channel::Sender;
|
|||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::model::Model;
|
use crate::model::Model;
|
||||||
use crate::ports::InputFn;
|
use crate::ports::InputFn;
|
||||||
use crate::time::MonotonicTime;
|
use crate::simulation::Address;
|
||||||
|
use crate::time::{MonotonicTime, TearableAtomicTime};
|
||||||
use crate::util::priority_queue::PriorityQueue;
|
use crate::util::priority_queue::PriorityQueue;
|
||||||
|
use crate::util::sync_cell::SyncCellReader;
|
||||||
|
|
||||||
|
/// Scheduler.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Scheduler {
|
||||||
|
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||||
|
time: SyncCellReader<TearableAtomicTime>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Scheduler {
|
||||||
|
pub(crate) fn new(
|
||||||
|
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
|
||||||
|
time: SyncCellReader<TearableAtomicTime>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
scheduler_queue,
|
||||||
|
time,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the current simulation time.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use asynchronix::simulation::Scheduler;
|
||||||
|
/// use asynchronix::time::MonotonicTime;
|
||||||
|
///
|
||||||
|
/// fn is_third_millenium(scheduler: &Scheduler) -> bool {
|
||||||
|
/// let time = scheduler.time();
|
||||||
|
/// time >= MonotonicTime::new(978307200, 0).unwrap()
|
||||||
|
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn time(&self) -> MonotonicTime {
|
||||||
|
self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules an action at a future time.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified time is not in the future of the
|
||||||
|
/// current simulation time.
|
||||||
|
///
|
||||||
|
/// If multiple actions send events at the same simulation time to the same
|
||||||
|
/// model, these events are guaranteed to be processed according to the
|
||||||
|
/// scheduling order of the actions.
|
||||||
|
pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> {
|
||||||
|
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
|
||||||
|
|
||||||
|
let now = self.time();
|
||||||
|
let time = deadline.into_time(now);
|
||||||
|
if now >= time {
|
||||||
|
return Err(SchedulingError::InvalidScheduledTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel ID is set to the same value for all actions. This
|
||||||
|
// ensures that the relative scheduling order of all source events is
|
||||||
|
// preserved, which is important if some of them target the same models.
|
||||||
|
// The value 0 was chosen as it prevents collisions with channel IDs as
|
||||||
|
// the latter are always non-zero.
|
||||||
|
scheduler_queue.insert((time, 0), action);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules an event at a future time.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified time is not in the future of the
|
||||||
|
/// current simulation time.
|
||||||
|
///
|
||||||
|
/// Events scheduled for the same time and targeting the same model are
|
||||||
|
/// guaranteed to be processed according to the scheduling order.
|
||||||
|
///
|
||||||
|
/// See also: [`LocalScheduler::schedule_event`](LocalScheduler::schedule_event).
|
||||||
|
pub fn schedule_event<M, F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> Result<(), SchedulingError>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: for<'a> InputFn<'a, M, T, S>,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
|
||||||
|
let now = self.time();
|
||||||
|
let time = deadline.into_time(now);
|
||||||
|
if now >= time {
|
||||||
|
return Err(SchedulingError::InvalidScheduledTime);
|
||||||
|
}
|
||||||
|
let sender = address.into().0;
|
||||||
|
let channel_id = sender.channel_id();
|
||||||
|
let action = Action::new(OnceAction::new(process_event(func, arg, sender)));
|
||||||
|
|
||||||
|
scheduler_queue.insert((time, channel_id), action);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules a cancellable event at a future time and returns an event key.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified time is not in the future of the
|
||||||
|
/// current simulation time.
|
||||||
|
///
|
||||||
|
/// Events scheduled for the same time and targeting the same model are
|
||||||
|
/// guaranteed to be processed according to the scheduling order.
|
||||||
|
///
|
||||||
|
/// See also: [`LocalScheduler::schedule_keyed_event`](LocalScheduler::schedule_keyed_event).
|
||||||
|
pub fn schedule_keyed_event<M, F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> Result<ActionKey, SchedulingError>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: for<'a> InputFn<'a, M, T, S>,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
|
||||||
|
let now = self.time();
|
||||||
|
let time = deadline.into_time(now);
|
||||||
|
if now >= time {
|
||||||
|
return Err(SchedulingError::InvalidScheduledTime);
|
||||||
|
}
|
||||||
|
let event_key = ActionKey::new();
|
||||||
|
let sender = address.into().0;
|
||||||
|
let channel_id = sender.channel_id();
|
||||||
|
let action = Action::new(KeyedOnceAction::new(
|
||||||
|
|ek| send_keyed_event(ek, func, arg, sender),
|
||||||
|
event_key.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
scheduler_queue.insert((time, channel_id), action);
|
||||||
|
|
||||||
|
Ok(event_key)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules a periodically recurring event at a future time.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified time is not in the future of the
|
||||||
|
/// current simulation time or if the specified period is null.
|
||||||
|
///
|
||||||
|
/// Events scheduled for the same time and targeting the same model are
|
||||||
|
/// guaranteed to be processed according to the scheduling order.
|
||||||
|
///
|
||||||
|
/// See also: [`LocalScheduler::schedule_periodic_event`](LocalScheduler::schedule_periodic_event).
|
||||||
|
pub fn schedule_periodic_event<M, F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
period: Duration,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> Result<(), SchedulingError>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
|
||||||
|
let now = self.time();
|
||||||
|
let time = deadline.into_time(now);
|
||||||
|
if now >= time {
|
||||||
|
return Err(SchedulingError::InvalidScheduledTime);
|
||||||
|
}
|
||||||
|
if period.is_zero() {
|
||||||
|
return Err(SchedulingError::NullRepetitionPeriod);
|
||||||
|
}
|
||||||
|
let sender = address.into().0;
|
||||||
|
let channel_id = sender.channel_id();
|
||||||
|
|
||||||
|
let action = Action::new(PeriodicAction::new(
|
||||||
|
|| process_event(func, arg, sender),
|
||||||
|
period,
|
||||||
|
));
|
||||||
|
|
||||||
|
scheduler_queue.insert((time, channel_id), action);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules a cancellable, periodically recurring event at a future time
|
||||||
|
/// and returns an event key.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified time is not in the future of the
|
||||||
|
/// current simulation time or if the specified period is null.
|
||||||
|
///
|
||||||
|
/// Events scheduled for the same time and targeting the same model are
|
||||||
|
/// guaranteed to be processed according to the scheduling order.
|
||||||
|
///
|
||||||
|
/// See also: [`LocalScheduler::schedule_keyed_periodic_event`](LocalScheduler::schedule_keyed_periodic_event).
|
||||||
|
pub fn schedule_keyed_periodic_event<M, F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
period: Duration,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
address: impl Into<Address<M>>,
|
||||||
|
) -> Result<ActionKey, SchedulingError>
|
||||||
|
where
|
||||||
|
M: Model,
|
||||||
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
|
||||||
|
let now = self.time();
|
||||||
|
let time = deadline.into_time(now);
|
||||||
|
if now >= time {
|
||||||
|
return Err(SchedulingError::InvalidScheduledTime);
|
||||||
|
}
|
||||||
|
if period.is_zero() {
|
||||||
|
return Err(SchedulingError::NullRepetitionPeriod);
|
||||||
|
}
|
||||||
|
let event_key = ActionKey::new();
|
||||||
|
let sender = address.into().0;
|
||||||
|
let channel_id = sender.channel_id();
|
||||||
|
let action = Action::new(KeyedPeriodicAction::new(
|
||||||
|
|ek| send_keyed_event(ek, func, arg, sender),
|
||||||
|
period,
|
||||||
|
event_key.clone(),
|
||||||
|
));
|
||||||
|
scheduler_queue.insert((time, channel_id), action);
|
||||||
|
|
||||||
|
Ok(event_key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Scheduler {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("Scheduler")
|
||||||
|
.field("time", &self.time())
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Local scheduler.
|
||||||
|
pub struct LocalScheduler<M: Model> {
|
||||||
|
pub(crate) scheduler: Scheduler,
|
||||||
|
address: Address<M>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Model> LocalScheduler<M> {
|
||||||
|
pub(crate) fn new(scheduler: Scheduler, address: Address<M>) -> Self {
|
||||||
|
Self { scheduler, address }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the current simulation time.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use asynchronix::model::Model;
|
||||||
|
/// use asynchronix::simulation::LocalScheduler;
|
||||||
|
/// use asynchronix::time::MonotonicTime;
|
||||||
|
///
|
||||||
|
/// fn is_third_millenium<M: Model>(scheduler: &LocalScheduler<M>) -> bool {
|
||||||
|
/// let time = scheduler.time();
|
||||||
|
/// time >= MonotonicTime::new(978307200, 0).unwrap()
|
||||||
|
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn time(&self) -> MonotonicTime {
|
||||||
|
self.scheduler.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules an event at a future time.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified deadline is not in the future of
|
||||||
|
/// the current simulation time.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// use asynchronix::model::{Context, Model};
|
||||||
|
///
|
||||||
|
/// // A timer.
|
||||||
|
/// pub struct Timer {}
|
||||||
|
///
|
||||||
|
/// impl Timer {
|
||||||
|
/// // Sets an alarm [input port].
|
||||||
|
/// pub fn set(&mut self, setting: Duration, context: &Context<Self>) {
|
||||||
|
/// if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() {
|
||||||
|
/// println!("The alarm clock can only be set for a future time");
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Rings [private input port].
|
||||||
|
/// fn ring(&mut self) {
|
||||||
|
/// println!("Brringggg");
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl Model for Timer {}
|
||||||
|
/// ```
|
||||||
|
pub fn schedule_event<F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
) -> Result<(), SchedulingError>
|
||||||
|
where
|
||||||
|
F: for<'a> InputFn<'a, M, T, S>,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
self.scheduler
|
||||||
|
.schedule_event(deadline, func, arg, &self.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules a cancellable event at a future time and returns an action
|
||||||
|
/// key.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified deadline is not in the future of
|
||||||
|
/// the current simulation time.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use asynchronix::model::{Context, Model};
|
||||||
|
/// use asynchronix::simulation::ActionKey;
|
||||||
|
/// use asynchronix::time::MonotonicTime;
|
||||||
|
///
|
||||||
|
/// // An alarm clock that can be cancelled.
|
||||||
|
/// #[derive(Default)]
|
||||||
|
/// pub struct CancellableAlarmClock {
|
||||||
|
/// event_key: Option<ActionKey>,
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl CancellableAlarmClock {
|
||||||
|
/// // Sets an alarm [input port].
|
||||||
|
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
||||||
|
/// self.cancel();
|
||||||
|
/// match context.scheduler.schedule_keyed_event(setting, Self::ring, ()) {
|
||||||
|
/// Ok(event_key) => self.event_key = Some(event_key),
|
||||||
|
/// Err(_) => println!("The alarm clock can only be set for a future time"),
|
||||||
|
/// };
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Cancels the current alarm, if any [input port].
|
||||||
|
/// pub fn cancel(&mut self) {
|
||||||
|
/// self.event_key.take().map(|k| k.cancel());
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Rings the alarm [private input port].
|
||||||
|
/// fn ring(&mut self) {
|
||||||
|
/// println!("Brringggg!");
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl Model for CancellableAlarmClock {}
|
||||||
|
/// ```
|
||||||
|
pub fn schedule_keyed_event<F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
) -> Result<ActionKey, SchedulingError>
|
||||||
|
where
|
||||||
|
F: for<'a> InputFn<'a, M, T, S>,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let event_key = self
|
||||||
|
.scheduler
|
||||||
|
.schedule_keyed_event(deadline, func, arg, &self.address)?;
|
||||||
|
|
||||||
|
Ok(event_key)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules a periodically recurring event at a future time.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified deadline is not in the future of
|
||||||
|
/// the current simulation time or if the specified period is null.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// use asynchronix::model::{Context, Model};
|
||||||
|
/// use asynchronix::time::MonotonicTime;
|
||||||
|
///
|
||||||
|
/// // An alarm clock beeping at 1Hz.
|
||||||
|
/// pub struct BeepingAlarmClock {}
|
||||||
|
///
|
||||||
|
/// impl BeepingAlarmClock {
|
||||||
|
/// // Sets an alarm [input port].
|
||||||
|
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
||||||
|
/// if context.scheduler.schedule_periodic_event(
|
||||||
|
/// setting,
|
||||||
|
/// Duration::from_secs(1), // 1Hz = 1/1s
|
||||||
|
/// Self::beep,
|
||||||
|
/// ()
|
||||||
|
/// ).is_err() {
|
||||||
|
/// println!("The alarm clock can only be set for a future time");
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Emits a single beep [private input port].
|
||||||
|
/// fn beep(&mut self) {
|
||||||
|
/// println!("Beep!");
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl Model for BeepingAlarmClock {}
|
||||||
|
/// ```
|
||||||
|
pub fn schedule_periodic_event<F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
period: Duration,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
) -> Result<(), SchedulingError>
|
||||||
|
where
|
||||||
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
self.scheduler
|
||||||
|
.schedule_periodic_event(deadline, period, func, arg, &self.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedules a cancellable, periodically recurring event at a future time
|
||||||
|
/// and returns an action key.
|
||||||
|
///
|
||||||
|
/// An error is returned if the specified deadline is not in the future of
|
||||||
|
/// the current simulation time or if the specified period is null.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// use asynchronix::model::{Context, Model};
|
||||||
|
/// use asynchronix::simulation::ActionKey;
|
||||||
|
/// use asynchronix::time::MonotonicTime;
|
||||||
|
///
|
||||||
|
/// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
|
||||||
|
/// // stopped after it sets off.
|
||||||
|
/// #[derive(Default)]
|
||||||
|
/// pub struct CancellableBeepingAlarmClock {
|
||||||
|
/// event_key: Option<ActionKey>,
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl CancellableBeepingAlarmClock {
|
||||||
|
/// // Sets an alarm [input port].
|
||||||
|
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
||||||
|
/// self.cancel();
|
||||||
|
/// match context.scheduler.schedule_keyed_periodic_event(
|
||||||
|
/// setting,
|
||||||
|
/// Duration::from_secs(1), // 1Hz = 1/1s
|
||||||
|
/// Self::beep,
|
||||||
|
/// ()
|
||||||
|
/// ) {
|
||||||
|
/// Ok(event_key) => self.event_key = Some(event_key),
|
||||||
|
/// Err(_) => println!("The alarm clock can only be set for a future time"),
|
||||||
|
/// };
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Cancels or stops the alarm [input port].
|
||||||
|
/// pub fn cancel(&mut self) {
|
||||||
|
/// self.event_key.take().map(|k| k.cancel());
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Emits a single beep [private input port].
|
||||||
|
/// fn beep(&mut self) {
|
||||||
|
/// println!("Beep!");
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl Model for CancellableBeepingAlarmClock {}
|
||||||
|
/// ```
|
||||||
|
pub fn schedule_keyed_periodic_event<F, T, S>(
|
||||||
|
&self,
|
||||||
|
deadline: impl Deadline,
|
||||||
|
period: Duration,
|
||||||
|
func: F,
|
||||||
|
arg: T,
|
||||||
|
) -> Result<ActionKey, SchedulingError>
|
||||||
|
where
|
||||||
|
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
S: Send + 'static,
|
||||||
|
{
|
||||||
|
let event_key = self.scheduler.schedule_keyed_periodic_event(
|
||||||
|
deadline,
|
||||||
|
period,
|
||||||
|
func,
|
||||||
|
arg,
|
||||||
|
&self.address,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(event_key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Model> Clone for LocalScheduler<M> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
scheduler: self.scheduler.clone(),
|
||||||
|
address: self.address.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: Model> fmt::Debug for LocalScheduler<M> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("LocalScheduler")
|
||||||
|
.field("time", &self.time())
|
||||||
|
.field("address", &self.address)
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Shorthand for the scheduler queue type.
|
/// Shorthand for the scheduler queue type.
|
||||||
|
|
||||||
@ -214,120 +738,6 @@ pub(crate) trait ActionInner: Send + 'static {
|
|||||||
fn spawn_and_forget(self: Box<Self>, executor: &Executor);
|
fn spawn_and_forget(self: Box<Self>, executor: &Executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Schedules an event at a future time.
|
|
||||||
///
|
|
||||||
/// This function does not check whether the specified time lies in the future
|
|
||||||
/// of the current simulation time.
|
|
||||||
pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
|
|
||||||
time: MonotonicTime,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
sender: Sender<M>,
|
|
||||||
scheduler_queue: &Mutex<SchedulerQueue>,
|
|
||||||
) where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let channel_id = sender.channel_id();
|
|
||||||
|
|
||||||
let action = Action::new(OnceAction::new(process_event(func, arg, sender)));
|
|
||||||
|
|
||||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
|
||||||
scheduler_queue.insert((time, channel_id), action);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules an event at a future time, returning an action key.
|
|
||||||
///
|
|
||||||
/// This function does not check whether the specified time lies in the future
|
|
||||||
/// of the current simulation time.
|
|
||||||
pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
|
|
||||||
time: MonotonicTime,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
sender: Sender<M>,
|
|
||||||
scheduler_queue: &Mutex<SchedulerQueue>,
|
|
||||||
) -> ActionKey
|
|
||||||
where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S>,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let event_key = ActionKey::new();
|
|
||||||
let channel_id = sender.channel_id();
|
|
||||||
let action = Action::new(KeyedOnceAction::new(
|
|
||||||
|ek| send_keyed_event(ek, func, arg, sender),
|
|
||||||
event_key.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
|
||||||
scheduler_queue.insert((time, channel_id), action);
|
|
||||||
|
|
||||||
event_key
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a periodic event at a future time.
|
|
||||||
///
|
|
||||||
/// This function does not check whether the specified time lies in the future
|
|
||||||
/// of the current simulation time.
|
|
||||||
pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
|
|
||||||
time: MonotonicTime,
|
|
||||||
period: Duration,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
sender: Sender<M>,
|
|
||||||
scheduler_queue: &Mutex<SchedulerQueue>,
|
|
||||||
) where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let channel_id = sender.channel_id();
|
|
||||||
|
|
||||||
let action = Action::new(PeriodicAction::new(
|
|
||||||
|| process_event(func, arg, sender),
|
|
||||||
period,
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
|
||||||
scheduler_queue.insert((time, channel_id), action);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules an event at a future time, returning an action key.
|
|
||||||
///
|
|
||||||
/// This function does not check whether the specified time lies in the future
|
|
||||||
/// of the current simulation time.
|
|
||||||
pub(crate) fn schedule_periodic_keyed_event_at_unchecked<M, F, T, S>(
|
|
||||||
time: MonotonicTime,
|
|
||||||
period: Duration,
|
|
||||||
func: F,
|
|
||||||
arg: T,
|
|
||||||
sender: Sender<M>,
|
|
||||||
scheduler_queue: &Mutex<SchedulerQueue>,
|
|
||||||
) -> ActionKey
|
|
||||||
where
|
|
||||||
M: Model,
|
|
||||||
F: for<'a> InputFn<'a, M, T, S> + Clone,
|
|
||||||
T: Send + Clone + 'static,
|
|
||||||
S: Send + 'static,
|
|
||||||
{
|
|
||||||
let event_key = ActionKey::new();
|
|
||||||
let channel_id = sender.channel_id();
|
|
||||||
let action = Action::new(KeyedPeriodicAction::new(
|
|
||||||
|ek| send_keyed_event(ek, func, arg, sender),
|
|
||||||
period,
|
|
||||||
event_key.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut scheduler_queue = scheduler_queue.lock().unwrap();
|
|
||||||
scheduler_queue.insert((time, channel_id), action);
|
|
||||||
|
|
||||||
event_key
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
/// An object that can be converted to a future performing a single
|
/// An object that can be converted to a future performing a single
|
||||||
/// non-cancellable action.
|
/// non-cancellable action.
|
||||||
|
@ -8,7 +8,7 @@ use crate::time::{MonotonicTime, TearableAtomicTime};
|
|||||||
use crate::util::priority_queue::PriorityQueue;
|
use crate::util::priority_queue::PriorityQueue;
|
||||||
use crate::util::sync_cell::SyncCell;
|
use crate::util::sync_cell::SyncCell;
|
||||||
|
|
||||||
use super::{add_model, Mailbox, SchedulerQueue, Simulation};
|
use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation};
|
||||||
|
|
||||||
/// Builder for a multi-threaded, discrete-event simulation.
|
/// Builder for a multi-threaded, discrete-event simulation.
|
||||||
pub struct SimInit {
|
pub struct SimInit {
|
||||||
@ -58,17 +58,8 @@ impl SimInit {
|
|||||||
mailbox: Mailbox<M>,
|
mailbox: Mailbox<M>,
|
||||||
name: impl Into<String>,
|
name: impl Into<String>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let scheduler_queue = self.scheduler_queue.clone();
|
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
|
||||||
let time = self.time.reader();
|
add_model(model, mailbox, name.into(), scheduler, &self.executor);
|
||||||
|
|
||||||
add_model(
|
|
||||||
model,
|
|
||||||
mailbox,
|
|
||||||
name.into(),
|
|
||||||
scheduler_queue,
|
|
||||||
time,
|
|
||||||
&self.executor,
|
|
||||||
);
|
|
||||||
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@
|
|||||||
//!
|
//!
|
||||||
//! // Sets an alarm [input port].
|
//! // Sets an alarm [input port].
|
||||||
//! pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
//! pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
|
||||||
//! if context.schedule_event(setting, Self::ring, ()).is_err() {
|
//! if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() {
|
||||||
//! println!("The alarm clock can only be set for a future time");
|
//! println!("The alarm clock can only be set for a future time");
|
||||||
//! }
|
//! }
|
||||||
//! }
|
//! }
|
||||||
|
@ -16,7 +16,12 @@ fn model_schedule_event() {
|
|||||||
impl TestModel {
|
impl TestModel {
|
||||||
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
||||||
context
|
context
|
||||||
.schedule_event(context.time() + Duration::from_secs(2), Self::action, ())
|
.scheduler
|
||||||
|
.schedule_event(
|
||||||
|
context.scheduler.time() + Duration::from_secs(2),
|
||||||
|
Self::action,
|
||||||
|
(),
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
async fn action(&mut self) {
|
async fn action(&mut self) {
|
||||||
@ -53,10 +58,20 @@ fn model_cancel_future_keyed_event() {
|
|||||||
impl TestModel {
|
impl TestModel {
|
||||||
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
||||||
context
|
context
|
||||||
.schedule_event(context.time() + Duration::from_secs(1), Self::action1, ())
|
.scheduler
|
||||||
|
.schedule_event(
|
||||||
|
context.scheduler.time() + Duration::from_secs(1),
|
||||||
|
Self::action1,
|
||||||
|
(),
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
self.key = context
|
self.key = context
|
||||||
.schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ())
|
.scheduler
|
||||||
|
.schedule_keyed_event(
|
||||||
|
context.scheduler.time() + Duration::from_secs(2),
|
||||||
|
Self::action2,
|
||||||
|
(),
|
||||||
|
)
|
||||||
.ok();
|
.ok();
|
||||||
}
|
}
|
||||||
async fn action1(&mut self) {
|
async fn action1(&mut self) {
|
||||||
@ -99,10 +114,20 @@ fn model_cancel_same_time_keyed_event() {
|
|||||||
impl TestModel {
|
impl TestModel {
|
||||||
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
||||||
context
|
context
|
||||||
.schedule_event(context.time() + Duration::from_secs(2), Self::action1, ())
|
.scheduler
|
||||||
|
.schedule_event(
|
||||||
|
context.scheduler.time() + Duration::from_secs(2),
|
||||||
|
Self::action1,
|
||||||
|
(),
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
self.key = context
|
self.key = context
|
||||||
.schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ())
|
.scheduler
|
||||||
|
.schedule_keyed_event(
|
||||||
|
context.scheduler.time() + Duration::from_secs(2),
|
||||||
|
Self::action2,
|
||||||
|
(),
|
||||||
|
)
|
||||||
.ok();
|
.ok();
|
||||||
}
|
}
|
||||||
async fn action1(&mut self) {
|
async fn action1(&mut self) {
|
||||||
@ -144,8 +169,9 @@ fn model_schedule_periodic_event() {
|
|||||||
impl TestModel {
|
impl TestModel {
|
||||||
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
||||||
context
|
context
|
||||||
|
.scheduler
|
||||||
.schedule_periodic_event(
|
.schedule_periodic_event(
|
||||||
context.time() + Duration::from_secs(2),
|
context.scheduler.time() + Duration::from_secs(2),
|
||||||
Duration::from_secs(3),
|
Duration::from_secs(3),
|
||||||
Self::action,
|
Self::action,
|
||||||
42,
|
42,
|
||||||
@ -192,8 +218,9 @@ fn model_cancel_periodic_event() {
|
|||||||
impl TestModel {
|
impl TestModel {
|
||||||
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
fn trigger(&mut self, _: (), context: &Context<Self>) {
|
||||||
self.key = context
|
self.key = context
|
||||||
|
.scheduler
|
||||||
.schedule_keyed_periodic_event(
|
.schedule_keyed_periodic_event(
|
||||||
context.time() + Duration::from_secs(2),
|
context.scheduler.time() + Duration::from_secs(2),
|
||||||
Duration::from_secs(3),
|
Duration::from_secs(3),
|
||||||
Self::action,
|
Self::action,
|
||||||
(),
|
(),
|
||||||
|
@ -48,10 +48,14 @@ fn simulation_schedule_events() {
|
|||||||
let t0 = MonotonicTime::EPOCH;
|
let t0 = MonotonicTime::EPOCH;
|
||||||
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// Queue 2 events at t0+3s and t0+2s, in reverse order.
|
// Queue 2 events at t0+3s and t0+2s, in reverse order.
|
||||||
simu.schedule_event(Duration::from_secs(3), PassThroughModel::input, (), &addr)
|
scheduler
|
||||||
|
.schedule_event(Duration::from_secs(3), PassThroughModel::input, (), &addr)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
t0 + Duration::from_secs(2),
|
t0 + Duration::from_secs(2),
|
||||||
PassThroughModel::input,
|
PassThroughModel::input,
|
||||||
(),
|
(),
|
||||||
@ -65,7 +69,8 @@ fn simulation_schedule_events() {
|
|||||||
assert!(output.next().is_some());
|
assert!(output.next().is_some());
|
||||||
|
|
||||||
// Schedule another event in 4s (at t0+6s).
|
// Schedule another event in 4s (at t0+6s).
|
||||||
simu.schedule_event(Duration::from_secs(4), PassThroughModel::input, (), &addr)
|
scheduler
|
||||||
|
.schedule_event(Duration::from_secs(4), PassThroughModel::input, (), &addr)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Move to the 2nd event at t0+3s.
|
// Move to the 2nd event at t0+3s.
|
||||||
@ -85,7 +90,9 @@ fn simulation_schedule_keyed_events() {
|
|||||||
let t0 = MonotonicTime::EPOCH;
|
let t0 = MonotonicTime::EPOCH;
|
||||||
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
||||||
|
|
||||||
let event_t1 = simu
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
|
let event_t1 = scheduler
|
||||||
.schedule_keyed_event(
|
.schedule_keyed_event(
|
||||||
t0 + Duration::from_secs(1),
|
t0 + Duration::from_secs(1),
|
||||||
PassThroughModel::input,
|
PassThroughModel::input,
|
||||||
@ -94,11 +101,12 @@ fn simulation_schedule_keyed_events() {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let event_t2_1 = simu
|
let event_t2_1 = scheduler
|
||||||
.schedule_keyed_event(Duration::from_secs(2), PassThroughModel::input, 21, &addr)
|
.schedule_keyed_event(Duration::from_secs(2), PassThroughModel::input, 21, &addr)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
simu.schedule_event(Duration::from_secs(2), PassThroughModel::input, 22, &addr)
|
scheduler
|
||||||
|
.schedule_event(Duration::from_secs(2), PassThroughModel::input, 22, &addr)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Move to the 1st event at t0+1.
|
// Move to the 1st event at t0+1.
|
||||||
@ -124,8 +132,11 @@ fn simulation_schedule_periodic_events() {
|
|||||||
let t0 = MonotonicTime::EPOCH;
|
let t0 = MonotonicTime::EPOCH;
|
||||||
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// Queue 2 periodic events at t0 + 3s + k*2s.
|
// Queue 2 periodic events at t0 + 3s + k*2s.
|
||||||
simu.schedule_periodic_event(
|
scheduler
|
||||||
|
.schedule_periodic_event(
|
||||||
Duration::from_secs(3),
|
Duration::from_secs(3),
|
||||||
Duration::from_secs(2),
|
Duration::from_secs(2),
|
||||||
PassThroughModel::input,
|
PassThroughModel::input,
|
||||||
@ -133,7 +144,8 @@ fn simulation_schedule_periodic_events() {
|
|||||||
&addr,
|
&addr,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
simu.schedule_periodic_event(
|
scheduler
|
||||||
|
.schedule_periodic_event(
|
||||||
t0 + Duration::from_secs(3),
|
t0 + Duration::from_secs(3),
|
||||||
Duration::from_secs(2),
|
Duration::from_secs(2),
|
||||||
PassThroughModel::input,
|
PassThroughModel::input,
|
||||||
@ -160,8 +172,11 @@ fn simulation_schedule_periodic_keyed_events() {
|
|||||||
let t0 = MonotonicTime::EPOCH;
|
let t0 = MonotonicTime::EPOCH;
|
||||||
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
let (mut simu, addr, mut output) = passthrough_bench(t0);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// Queue 2 periodic events at t0 + 3s + k*2s.
|
// Queue 2 periodic events at t0 + 3s + k*2s.
|
||||||
simu.schedule_periodic_event(
|
scheduler
|
||||||
|
.schedule_periodic_event(
|
||||||
Duration::from_secs(3),
|
Duration::from_secs(3),
|
||||||
Duration::from_secs(2),
|
Duration::from_secs(2),
|
||||||
PassThroughModel::input,
|
PassThroughModel::input,
|
||||||
@ -169,7 +184,7 @@ fn simulation_schedule_periodic_keyed_events() {
|
|||||||
&addr,
|
&addr,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let event2_key = simu
|
let event2_key = scheduler
|
||||||
.schedule_keyed_periodic_event(
|
.schedule_keyed_periodic_event(
|
||||||
t0 + Duration::from_secs(3),
|
t0 + Duration::from_secs(3),
|
||||||
Duration::from_secs(2),
|
Duration::from_secs(2),
|
||||||
@ -279,8 +294,11 @@ fn simulation_system_clock_from_instant() {
|
|||||||
|
|
||||||
let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock);
|
let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// Queue a single event at t0 + 0.1s.
|
// Queue a single event at t0 + 0.1s.
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
Duration::from_secs_f64(0.1),
|
Duration::from_secs_f64(0.1),
|
||||||
TimestampModel::trigger,
|
TimestampModel::trigger,
|
||||||
(),
|
(),
|
||||||
@ -333,8 +351,11 @@ fn simulation_system_clock_from_system_time() {
|
|||||||
|
|
||||||
let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock);
|
let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock);
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// Queue a single event at t0 + 0.1s.
|
// Queue a single event at t0 + 0.1s.
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
Duration::from_secs_f64(0.1),
|
Duration::from_secs_f64(0.1),
|
||||||
TimestampModel::trigger,
|
TimestampModel::trigger,
|
||||||
(),
|
(),
|
||||||
@ -376,8 +397,11 @@ fn simulation_auto_system_clock() {
|
|||||||
let (mut simu, addr, mut stamp) = timestamp_bench(t0, AutoSystemClock::new());
|
let (mut simu, addr, mut stamp) = timestamp_bench(t0, AutoSystemClock::new());
|
||||||
let instant_t0 = Instant::now();
|
let instant_t0 = Instant::now();
|
||||||
|
|
||||||
|
let scheduler = simu.scheduler();
|
||||||
|
|
||||||
// Queue a periodic event at t0 + 0.2s + k*0.2s.
|
// Queue a periodic event at t0 + 0.2s + k*0.2s.
|
||||||
simu.schedule_periodic_event(
|
scheduler
|
||||||
|
.schedule_periodic_event(
|
||||||
Duration::from_secs_f64(0.2),
|
Duration::from_secs_f64(0.2),
|
||||||
Duration::from_secs_f64(0.2),
|
Duration::from_secs_f64(0.2),
|
||||||
TimestampModel::trigger,
|
TimestampModel::trigger,
|
||||||
@ -387,7 +411,8 @@ fn simulation_auto_system_clock() {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Queue a single event at t0 + 0.3s.
|
// Queue a single event at t0 + 0.3s.
|
||||||
simu.schedule_event(
|
scheduler
|
||||||
|
.schedule_event(
|
||||||
Duration::from_secs_f64(0.3),
|
Duration::from_secs_f64(0.3),
|
||||||
TimestampModel::trigger,
|
TimestampModel::trigger,
|
||||||
(),
|
(),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user