1
0
forked from ROMEO/nexosim

Change scheduler interface and add external inputs example.

Relevant for issue #13.
This commit is contained in:
Jaŭhien Piatlicki 2024-07-31 16:01:16 +02:00
parent a6a2c85129
commit 6e3d5bb132
15 changed files with 996 additions and 766 deletions

View File

@ -73,9 +73,10 @@ waker-fn = "1.1"
[dev-dependencies]
atomic-wait = "1.1"
futures-util = "0.3"
futures-executor = "0.3"
mio = { version = "1.0", features = ["os-poll", "net"] }
[build-dependencies]
tonic-build = { version = "0.11", optional = true }

View File

@ -109,6 +109,8 @@ fn main() {
.add_model(assembly, assembly_mbox, "assembly")
.init(t0);
let scheduler = simu.scheduler();
// ----------
// Simulation.
// ----------
@ -120,13 +122,14 @@ fn main() {
assert!(position.next().is_none());
// Start the motor in 2s with a PPS of 10Hz.
simu.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();
// Advance simulation time to two next events.
simu.step();

View File

@ -140,6 +140,7 @@ impl Controller {
// Schedule the `stop_brew()` method and turn on the pump.
self.stop_brew_key = Some(
context
.scheduler
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
.unwrap(),
);
@ -206,7 +207,7 @@ impl Tank {
state.set_empty_key.cancel();
// Update the volume, saturating at 0 in case of rounding errors.
let time = context.time();
let time = context.scheduler.time();
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);
@ -231,7 +232,7 @@ impl Tank {
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
assert!(flow_rate >= 0.0);
let time = context.time();
let time = context.scheduler.time();
// If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() {
@ -273,7 +274,10 @@ impl Tank {
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);
// Schedule the next update.
match context.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) {
match context
.scheduler
.schedule_keyed_event(duration_until_empty, Self::set_empty, ())
{
Ok(set_empty_key) => {
let state = TankDynamicState {
last_volume_update: time,
@ -373,6 +377,8 @@ fn main() {
.add_model(tank, tank_mbox, "tank")
.init(t0);
let scheduler = simu.scheduler();
// ----------
// Simulation.
// ----------
@ -426,13 +432,14 @@ fn main() {
assert_eq!(flow_rate.next(), Some(0.0));
// Interrupt the brew after 15s by pressing again the brew button.
simu.schedule_event(
Duration::from_secs(15),
Controller::brew_cmd,
(),
&controller_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(15),
Controller::brew_cmd,
(),
&controller_addr,
)
.unwrap();
simu.process_event(Controller::brew_cmd, (), &controller_addr);
assert_eq!(flow_rate.next(), Some(pump_flow_rate));

View File

@ -0,0 +1,251 @@
//! Example: a model that reads data from the external world.
//!
//! This example demonstrates in particular:
//!
//! * external world inputs (useful in cosimulation),
//! * system clock,
//! * periodic scheduling.
//!
//! ```text
//! ┌────────────────────────────────┐
//! │ Simulation │
//! ┌────────────┐ ┌────────────┐ │ ┌──────────┐ │
//! │ │ UDP │ │ message │ message │ │ message │ ┌─────────────┐
//! │ UDP Client ├─────────▶│ UDP Server ├──────────▶├─────────▶│ Listener ├─────────▶├──▶│ EventBuffer │
//! │ │ message │ │ │ │ │ │ └─────────────┘
//! └────────────┘ └────────────┘ │ └──────────┘ │
//! └────────────────────────────────┘
//! ```
use std::io::ErrorKind;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{self, sleep, JoinHandle};
use std::time::Duration;
use atomic_wait::{wait, wake_one};
use mio::net::UdpSocket as MioUdpSocket;
use mio::{Events, Interest, Poll, Token};
use asynchronix::model::{Context, InitializedModel, Model, SetupContext};
use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{AutoSystemClock, MonotonicTime};
const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: u32 = 10;
const SENDER: &str = "127.0.0.1:8000";
const RECEIVER: &str = "127.0.0.1:9000";
/// Model that receives external input.
pub struct Listener {
/// Received message.
pub message: Output<String>,
/// Receiver of external messages.
rx: Receiver<String>,
/// External sender.
tx: Option<Sender<String>>,
/// Synchronization with client.
start: Arc<AtomicU32>,
/// Synchronization with simulation.
stop: Arc<AtomicBool>,
/// Handle to UDP Server.
external_handle: Option<JoinHandle<()>>,
}
impl Listener {
/// Creates a Listener.
pub fn new(start: Arc<AtomicU32>) -> Self {
start.store(0, Ordering::Relaxed);
let (tx, rx) = channel();
Self {
message: Output::default(),
rx,
tx: Some(tx),
start,
stop: Arc::new(AtomicBool::new(false)),
external_handle: None,
}
}
/// Periodically scheduled function that processes external events.
pub async fn process(&mut self) {
loop {
if let Ok(message) = self.rx.try_recv() {
self.message.send(message).await;
} else {
break;
}
}
}
/// UDP server.
///
/// Code is based on the MIO UDP example.
fn listener(tx: Sender<String>, start: Arc<AtomicU32>, stop: Arc<AtomicBool>) {
const UDP_SOCKET: Token = Token(0);
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(10);
let mut socket = MioUdpSocket::bind(RECEIVER.parse().unwrap()).unwrap();
poll.registry()
.register(&mut socket, UDP_SOCKET, Interest::READABLE)
.unwrap();
let mut buf = [0; 1 << 16];
// Wake up the client.
start.store(1, Ordering::Relaxed);
wake_one(&*start);
'process: loop {
// Wait for UDP packet or end of simulation.
if let Err(err) = poll.poll(&mut events, Some(Duration::from_secs(1))) {
if err.kind() == ErrorKind::Interrupted {
// Exit if simulation is finished.
if stop.load(Ordering::Relaxed) {
break 'process;
}
continue;
}
break 'process;
}
for event in events.iter() {
match event.token() {
UDP_SOCKET => loop {
match socket.recv_from(&mut buf) {
Ok((packet_size, _)) => {
if let Ok(message) = std::str::from_utf8(&buf[..packet_size]) {
// Inject external message into simulation.
if tx.send(message.into()).is_err() {
break 'process;
}
};
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
break;
}
_ => {
break 'process;
}
}
},
_ => {
panic!("Got event for unexpected token: {:?}", event);
}
}
}
// Exit if simulation is finished.
if stop.load(Ordering::Relaxed) {
break 'process;
}
}
poll.registry().deregister(&mut socket).unwrap();
}
}
impl Model for Listener {
/// Start UDP Server on model setup.
fn setup(&mut self, _: &SetupContext<Self>) {
let tx = self.tx.take().unwrap();
let start = Arc::clone(&self.start);
let stop = Arc::clone(&self.stop);
self.external_handle = Some(thread::spawn(move || {
Self::listener(tx, start, stop);
}));
}
/// Initialize model.
async fn init(self, context: &Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
context
.scheduler
.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();
self.into()
}
}
impl Drop for Listener {
/// Notify UDP Server that simulation is over and wait for server shutdown.
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
let handle = self.external_handle.take();
if let Some(handle) = handle {
handle.join().unwrap();
}
}
}
fn main() {
// ---------------
// Bench assembly.
// ---------------
// Models.
// Client-server synchronization.
let start = Arc::new(AtomicU32::new(0));
let mut listener = Listener::new(Arc::clone(&start));
// Mailboxes.
let listener_mbox = Mailbox::new();
// Model handles for simulation.
let mut message = EventBuffer::new();
listener.message.connect_sink(&message);
// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;
// Assembly and initialization.
let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0);
// ----------
// Simulation.
// ----------
// External client that sends UDP messages.
let sender_handle = thread::spawn(move || {
// Wait until UDP Server is ready.
wait(&start, 0);
for i in 0..N {
let socket = UdpSocket::bind(SENDER).unwrap();
socket.send_to(i.to_string().as_bytes(), RECEIVER).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i)
}
}
});
// Advance simulation, external messages will be collected.
simu.step_by(Duration::from_secs(2));
// Check collected external messages.
let mut packets = 0_u32;
for _ in 0..N {
// UDP can reorder packages, we are expecting that on not too loaded
// localhost packages would not be dropped
packets |= 1 << message.next().unwrap().parse::<u8>().unwrap();
}
assert_eq!(packets, u32::MAX >> 22);
assert_eq!(message.next(), None);
sender_handle.join().unwrap();
}

View File

@ -58,7 +58,7 @@ impl Motor {
println!(
"Model instance {} at time {}: setting currents: {:.2} and {:.2}",
context.name(),
context.time(),
context.scheduler.time(),
current.0,
current.1
);
@ -91,7 +91,7 @@ impl Motor {
println!(
"Model instance {} at time {}: setting load: {:.2}",
context.name(),
context.time(),
context.scheduler.time(),
torque
);
@ -141,7 +141,7 @@ impl Driver {
println!(
"Model instance {} at time {}: setting pps: {:.2}",
context.name(),
context.time(),
context.scheduler.time(),
pps
);
@ -172,7 +172,7 @@ impl Driver {
println!(
"Model instance {} at time {}: sending pulse",
context.name(),
context.time()
context.scheduler.time()
);
async move {
@ -195,6 +195,7 @@ impl Driver {
// Schedule the next pulse.
context
.scheduler
.schedule_event(pulse_duration, Self::send_pulse, ())
.unwrap();
}
@ -236,6 +237,8 @@ fn main() {
.add_model(motor, motor_mbox, "motor")
.init(t0);
let scheduler = simu.scheduler();
// ----------
// Simulation.
// ----------
@ -247,13 +250,14 @@ fn main() {
assert!(position.next().is_none());
// Start the motor in 2s with a PPS of 10Hz.
simu.schedule_event(
Duration::from_secs(2),
Driver::pulse_rate,
10.0,
&driver_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(2),
Driver::pulse_rate,
10.0,
&driver_addr,
)
.unwrap();
// Advance simulation time to two next events.
simu.step();

View File

@ -119,7 +119,7 @@
//! }
//! impl Delay {
//! pub fn input(&mut self, value: f64, context: &Context<Self>) {
//! context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! }
//!
//! async fn send(&mut self, value: f64) {
@ -190,7 +190,7 @@
//! # }
//! # impl Delay {
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # }
//! # async fn send(&mut self, value: f64) { // this method can be private
//! # self.output.send(value).await;
@ -250,7 +250,7 @@
//! [`Simulation::process_event()`](simulation::Simulation::process_event) or
//! [`Simulation::send_query()`](simulation::Simulation::process_query),
//! 3. by scheduling events, using for instance
//! [`Simulation::schedule_event()`](simulation::Simulation::schedule_event).
//! [`Scheduler::schedule_event()`](simulation::Scheduler::schedule_event).
//!
//! When initialized with the default clock, the simulation will run as fast as
//! possible, without regard for the actual wall clock time. Alternatively, the
@ -289,7 +289,7 @@
//! # }
//! # impl Delay {
//! # pub fn input(&mut self, value: f64, context: &Context<Self>) {
//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap();
//! # }
//! # async fn send(&mut self, value: f64) { // this method can be private
//! # self.output.send(value).await;
@ -370,7 +370,7 @@
//!
//! The first guarantee (and only the first) also extends to events scheduled
//! from a simulation with a
//! [`Simulation::schedule_*()`](simulation::Simulation::schedule_event) method:
//! [`Scheduler::schedule_*()`](simulation::Scheduler::schedule_event) method:
//! if the scheduler contains several events to be delivered at the same time to
//! the same model, these events will always be processed in the order in which
//! they were scheduled.

View File

@ -1,17 +1,7 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::channel::Sender;
use crate::executor::Executor;
use crate::ports::InputFn;
use crate::simulation::{
self, schedule_event_at_unchecked, schedule_keyed_event_at_unchecked,
schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, ActionKey,
Deadline, Mailbox, SchedulerQueue, SchedulingError,
};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::sync_cell::SyncCellReader;
use crate::simulation::{self, LocalScheduler, Mailbox};
use super::Model;
@ -60,13 +50,13 @@ use super::Model;
/// impl DelayedGreeter {
/// // Triggers a greeting on the output port after some delay [input port].
/// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context<Self>) {
/// let time = context.time();
/// let time = context.scheduler.time();
/// let greeting = format!("Hello, this message was scheduled at: {:?}.", time);
///
/// if delay.is_zero() {
/// self.msg_out.send(greeting).await;
/// } else {
/// context.schedule_event(delay, Self::send_msg, greeting).unwrap();
/// context.scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap();
/// }
/// }
///
@ -82,320 +72,21 @@ use super::Model;
// https://github.com/rust-lang/rust/issues/78649
pub struct Context<M: Model> {
name: String,
sender: Sender<M>,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
/// Local scheduler.
pub scheduler: LocalScheduler<M>,
}
impl<M: Model> Context<M> {
/// Creates a new local context.
pub(crate) fn new(
name: String,
sender: Sender<M>,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
) -> Self {
Self {
name,
sender,
scheduler_queue,
time,
}
pub(crate) fn new(name: String, scheduler: LocalScheduler<M>) -> Self {
Self { name, scheduler }
}
/// Returns the model instance name.
pub fn name(&self) -> &str {
&self.name
}
/// Returns the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::{Context, Model};
/// use asynchronix::time::MonotonicTime;
///
/// fn is_third_millenium<M: Model>(context: &Context<M>) -> bool {
/// let time = context.time();
/// time >= MonotonicTime::new(978307200, 0).unwrap()
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
/// }
/// ```
pub fn time(&self) -> MonotonicTime {
self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time")
}
/// Schedules an event at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
///
/// // A timer.
/// pub struct Timer {}
///
/// impl Timer {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: Duration, context: &Context<Self>) {
/// if context.schedule_event(setting, Self::ring, ()).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Rings [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg");
/// }
/// }
///
/// impl Model for Timer {}
/// ```
pub fn schedule_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let sender = self.sender.clone();
schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
Ok(())
}
/// Schedules a cancellable event at a future time and returns an action
/// key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock that can be cancelled.
/// #[derive(Default)]
/// pub struct CancellableAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// self.cancel();
/// match context.schedule_keyed_event(setting, Self::ring, ()) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels the current alarm, if any [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Rings the alarm [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg!");
/// }
/// }
///
/// impl Model for CancellableAlarmClock {}
/// ```
pub fn schedule_keyed_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let sender = self.sender.clone();
let event_key =
schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
Ok(event_key)
}
/// Schedules a periodically recurring event at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz.
/// pub struct BeepingAlarmClock {}
///
/// impl BeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// if context.schedule_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for BeepingAlarmClock {}
/// ```
pub fn schedule_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let sender = self.sender.clone();
schedule_periodic_event_at_unchecked(
time,
period,
func,
arg,
sender,
&self.scheduler_queue,
);
Ok(())
}
/// Schedules a cancellable, periodically recurring event at a future time
/// and returns an action key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
/// // stopped after it sets off.
/// #[derive(Default)]
/// pub struct CancellableBeepingAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableBeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// self.cancel();
/// match context.schedule_keyed_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels or stops the alarm [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for CancellableBeepingAlarmClock {}
/// ```
pub fn schedule_keyed_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let sender = self.sender.clone();
let event_key = schedule_periodic_keyed_event_at_unchecked(
time,
period,
func,
arg,
sender,
&self.scheduler_queue,
);
Ok(event_key)
}
}
impl<M: Model> fmt::Debug for Context<M> {
@ -501,8 +192,7 @@ impl<'a, M: Model> SetupContext<'a, M> {
model,
mailbox,
submodel_name,
self.context.scheduler_queue.clone(),
self.context.time.clone(),
self.context.scheduler.scheduler.clone(),
self.executor,
);
}

View File

@ -557,6 +557,7 @@ mod tests {
use crate::channel::Receiver;
use crate::model::Context;
use crate::simulation::{Address, LocalScheduler, Scheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
@ -616,9 +617,10 @@ mod tests {
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
dummy_address,
dummy_priority_queue,
dummy_time,
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
}
@ -671,9 +673,10 @@ mod tests {
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
dummy_address,
dummy_priority_queue,
dummy_time,
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));

View File

@ -440,6 +440,7 @@ mod tests {
use crate::channel::Receiver;
use crate::model::Context;
use crate::simulation::{Address, LocalScheduler, Scheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
@ -499,9 +500,10 @@ mod tests {
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
dummy_address,
dummy_priority_queue,
dummy_time,
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
}
@ -554,9 +556,10 @@ mod tests {
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
dummy_address,
dummy_priority_queue,
dummy_time,
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));

View File

@ -127,12 +127,12 @@ mod scheduler;
mod sim_init;
pub use mailbox::{Address, Mailbox};
pub use scheduler::{
Action, ActionKey, AutoActionKey, Deadline, LocalScheduler, Scheduler, SchedulingError,
};
pub(crate) use scheduler::{
schedule_event_at_unchecked, schedule_keyed_event_at_unchecked,
schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked,
KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue,
};
pub use scheduler::{Action, ActionKey, AutoActionKey, Deadline, SchedulingError};
pub use sim_init::SimInit;
use std::error::Error;
@ -149,7 +149,7 @@ use crate::ports::{InputFn, ReplierFn};
use crate::time::{Clock, MonotonicTime, TearableAtomicTime};
use crate::util::seq_futures::SeqFuture;
use crate::util::slot;
use crate::util::sync_cell::{SyncCell, SyncCellReader};
use crate::util::sync_cell::SyncCell;
/// Simulation environment.
///
@ -161,10 +161,10 @@ use crate::util::sync_cell::{SyncCell, SyncCellReader};
/// A [`Simulation`] object also manages an event scheduling queue and
/// simulation time. The scheduling queue can be accessed from the simulation
/// itself, but also from models via the optional
/// [`&Context`](crate::model::Context) argument of input and replier port methods.
/// Likewise, simulation time can be accessed with the [`Simulation::time()`]
/// method, or from models with the [`Context::time()`](crate::model::Context::time)
/// method.
/// [`&Context`](crate::model::Context) argument of input and replier port
/// methods. Likewise, simulation time can be accessed with the
/// [`Simulation::time()`] method, or from models with the
/// [`LocalScheduler::time()`](crate::simulation::LocalScheduler::time) method.
///
/// Events and queries can be scheduled immediately, *i.e.* for the current
/// simulation time, using [`process_event()`](Simulation::process_event) and
@ -173,7 +173,7 @@ use crate::util::sync_cell::{SyncCell, SyncCellReader};
/// completed. In the case of queries, the response is returned.
///
/// Events can also be scheduled at a future simulation time using one of the
/// [`schedule_*()`](Simulation::schedule_event) method. These methods queue an
/// [`schedule_*()`](Scheduler::schedule_event) method. These methods queue an
/// event without blocking.
///
/// Finally, the [`Simulation`] instance manages simulation time. A call to
@ -257,192 +257,9 @@ impl Simulation {
Ok(())
}
/// Schedules an action at a future time.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// If multiple actions send events at the same simulation time to the same
/// model, these events are guaranteed to be processed according to the
/// scheduling order of the actions.
pub fn schedule(
&mut self,
deadline: impl Deadline,
action: Action,
) -> Result<(), SchedulingError> {
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
// The channel ID is set to the same value for all actions. This
// ensures that the relative scheduling order of all source events is
// preserved, which is important if some of them target the same models.
// The value 0 was chosen as it prevents collisions with channel IDs as
// the latter are always non-zero.
scheduler_queue.insert((time, 0), action);
Ok(())
}
/// Schedules an event at a future time.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`Context::schedule_event`](crate::model::Context::schedule_event).
pub fn schedule_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
Ok(())
}
/// Schedules a cancellable event at a future time and returns an event key.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`Context::schedule_keyed_event`](crate::model::Context::schedule_keyed_event).
pub fn schedule_keyed_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event_key = schedule_keyed_event_at_unchecked(
time,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(event_key)
}
/// Schedules a periodically recurring event at a future time.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time or if the specified period is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`Context::schedule_periodic_event`](crate::model::Context::schedule_periodic_event).
pub fn schedule_periodic_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
schedule_periodic_event_at_unchecked(
time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(())
}
/// Schedules a cancellable, periodically recurring event at a future time
/// and returns an event key.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time or if the specified period is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`Context::schedule_keyed_periodic_event`](crate::model::Context::schedule_keyed_periodic_event).
pub fn schedule_keyed_periodic_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = schedule_periodic_keyed_event_at_unchecked(
time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(event_key)
/// Returns scheduler.
pub fn scheduler(&self) -> Scheduler {
Scheduler::new(self.scheduler_queue.clone(), self.time.reader())
}
/// Processes an action immediately, blocking until completion.
@ -630,6 +447,7 @@ impl Simulation {
None => {
// Update the simulation time.
self.time.write(target_time);
self.clock.synchronize(target_time);
return;
}
// The target time was not reached yet.
@ -667,13 +485,10 @@ pub(crate) fn add_model<M: Model>(
mut model: M,
mailbox: Mailbox<M>,
name: String,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
scheduler: Scheduler,
executor: &Executor,
) {
let sender = mailbox.0.sender();
let context = Context::new(name, sender, scheduler_queue, time);
let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address()));
let setup_context = SetupContext::new(&mailbox, &context, executor);
model.setup(&setup_context);

View File

@ -17,8 +17,532 @@ use crate::channel::Sender;
use crate::executor::Executor;
use crate::model::Model;
use crate::ports::InputFn;
use crate::time::MonotonicTime;
use crate::simulation::Address;
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCellReader;
/// Scheduler.
#[derive(Clone)]
pub struct Scheduler {
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
}
impl Scheduler {
pub(crate) fn new(
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
) -> Self {
Self {
scheduler_queue,
time,
}
}
/// Returns the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::simulation::Scheduler;
/// use asynchronix::time::MonotonicTime;
///
/// fn is_third_millenium(scheduler: &Scheduler) -> bool {
/// let time = scheduler.time();
/// time >= MonotonicTime::new(978307200, 0).unwrap()
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
/// }
/// ```
pub fn time(&self) -> MonotonicTime {
self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time")
}
/// Schedules an action at a future time.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// If multiple actions send events at the same simulation time to the same
/// model, these events are guaranteed to be processed according to the
/// scheduling order of the actions.
pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> {
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
// The channel ID is set to the same value for all actions. This
// ensures that the relative scheduling order of all source events is
// preserved, which is important if some of them target the same models.
// The value 0 was chosen as it prevents collisions with channel IDs as
// the latter are always non-zero.
scheduler_queue.insert((time, 0), action);
Ok(())
}
/// Schedules an event at a future time.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`LocalScheduler::schedule_event`](LocalScheduler::schedule_event).
pub fn schedule_event<M, F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let sender = address.into().0;
let channel_id = sender.channel_id();
let action = Action::new(OnceAction::new(process_event(func, arg, sender)));
scheduler_queue.insert((time, channel_id), action);
Ok(())
}
/// Schedules a cancellable event at a future time and returns an event key.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`LocalScheduler::schedule_keyed_event`](LocalScheduler::schedule_keyed_event).
pub fn schedule_keyed_event<M, F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event_key = ActionKey::new();
let sender = address.into().0;
let channel_id = sender.channel_id();
let action = Action::new(KeyedOnceAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
event_key.clone(),
));
scheduler_queue.insert((time, channel_id), action);
Ok(event_key)
}
/// Schedules a periodically recurring event at a future time.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time or if the specified period is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`LocalScheduler::schedule_periodic_event`](LocalScheduler::schedule_periodic_event).
pub fn schedule_periodic_event<M, F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let sender = address.into().0;
let channel_id = sender.channel_id();
let action = Action::new(PeriodicAction::new(
|| process_event(func, arg, sender),
period,
));
scheduler_queue.insert((time, channel_id), action);
Ok(())
}
/// Schedules a cancellable, periodically recurring event at a future time
/// and returns an event key.
///
/// An error is returned if the specified time is not in the future of the
/// current simulation time or if the specified period is null.
///
/// Events scheduled for the same time and targeting the same model are
/// guaranteed to be processed according to the scheduling order.
///
/// See also: [`LocalScheduler::schedule_keyed_periodic_event`](LocalScheduler::schedule_keyed_periodic_event).
pub fn schedule_keyed_periodic_event<M, F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<ActionKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = ActionKey::new();
let sender = address.into().0;
let channel_id = sender.channel_id();
let action = Action::new(KeyedPeriodicAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
period,
event_key.clone(),
));
scheduler_queue.insert((time, channel_id), action);
Ok(event_key)
}
}
impl fmt::Debug for Scheduler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Scheduler")
.field("time", &self.time())
.finish_non_exhaustive()
}
}
/// Local scheduler.
pub struct LocalScheduler<M: Model> {
pub(crate) scheduler: Scheduler,
address: Address<M>,
}
impl<M: Model> LocalScheduler<M> {
pub(crate) fn new(scheduler: Scheduler, address: Address<M>) -> Self {
Self { scheduler, address }
}
/// Returns the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::Model;
/// use asynchronix::simulation::LocalScheduler;
/// use asynchronix::time::MonotonicTime;
///
/// fn is_third_millenium<M: Model>(scheduler: &LocalScheduler<M>) -> bool {
/// let time = scheduler.time();
/// time >= MonotonicTime::new(978307200, 0).unwrap()
/// && time < MonotonicTime::new(32535216000, 0).unwrap()
/// }
/// ```
pub fn time(&self) -> MonotonicTime {
self.scheduler.time()
}
/// Schedules an event at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
///
/// // A timer.
/// pub struct Timer {}
///
/// impl Timer {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: Duration, context: &Context<Self>) {
/// if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Rings [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg");
/// }
/// }
///
/// impl Model for Timer {}
/// ```
pub fn schedule_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
self.scheduler
.schedule_event(deadline, func, arg, &self.address)
}
/// Schedules a cancellable event at a future time and returns an action
/// key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time.
///
/// # Examples
///
/// ```
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock that can be cancelled.
/// #[derive(Default)]
/// pub struct CancellableAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// self.cancel();
/// match context.scheduler.schedule_keyed_event(setting, Self::ring, ()) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels the current alarm, if any [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Rings the alarm [private input port].
/// fn ring(&mut self) {
/// println!("Brringggg!");
/// }
/// }
///
/// impl Model for CancellableAlarmClock {}
/// ```
pub fn schedule_keyed_event<F, T, S>(
&self,
deadline: impl Deadline,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = self
.scheduler
.schedule_keyed_event(deadline, func, arg, &self.address)?;
Ok(event_key)
}
/// Schedules a periodically recurring event at a future time.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz.
/// pub struct BeepingAlarmClock {}
///
/// impl BeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// if context.scheduler.schedule_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ).is_err() {
/// println!("The alarm clock can only be set for a future time");
/// }
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for BeepingAlarmClock {}
/// ```
pub fn schedule_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<(), SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
self.scheduler
.schedule_periodic_event(deadline, period, func, arg, &self.address)
}
/// Schedules a cancellable, periodically recurring event at a future time
/// and returns an action key.
///
/// An error is returned if the specified deadline is not in the future of
/// the current simulation time or if the specified period is null.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use asynchronix::model::{Context, Model};
/// use asynchronix::simulation::ActionKey;
/// use asynchronix::time::MonotonicTime;
///
/// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
/// // stopped after it sets off.
/// #[derive(Default)]
/// pub struct CancellableBeepingAlarmClock {
/// event_key: Option<ActionKey>,
/// }
///
/// impl CancellableBeepingAlarmClock {
/// // Sets an alarm [input port].
/// pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
/// self.cancel();
/// match context.scheduler.schedule_keyed_periodic_event(
/// setting,
/// Duration::from_secs(1), // 1Hz = 1/1s
/// Self::beep,
/// ()
/// ) {
/// Ok(event_key) => self.event_key = Some(event_key),
/// Err(_) => println!("The alarm clock can only be set for a future time"),
/// };
/// }
///
/// // Cancels or stops the alarm [input port].
/// pub fn cancel(&mut self) {
/// self.event_key.take().map(|k| k.cancel());
/// }
///
/// // Emits a single beep [private input port].
/// fn beep(&mut self) {
/// println!("Beep!");
/// }
/// }
///
/// impl Model for CancellableBeepingAlarmClock {}
/// ```
pub fn schedule_keyed_periodic_event<F, T, S>(
&self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
) -> Result<ActionKey, SchedulingError>
where
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = self.scheduler.schedule_keyed_periodic_event(
deadline,
period,
func,
arg,
&self.address,
)?;
Ok(event_key)
}
}
impl<M: Model> Clone for LocalScheduler<M> {
fn clone(&self) -> Self {
Self {
scheduler: self.scheduler.clone(),
address: self.address.clone(),
}
}
}
impl<M: Model> fmt::Debug for LocalScheduler<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalScheduler")
.field("time", &self.time())
.field("address", &self.address)
.finish_non_exhaustive()
}
}
/// Shorthand for the scheduler queue type.
@ -214,120 +738,6 @@ pub(crate) trait ActionInner: Send + 'static {
fn spawn_and_forget(self: Box<Self>, executor: &Executor);
}
/// Schedules an event at a future time.
///
/// This function does not check whether the specified time lies in the future
/// of the current simulation time.
pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
time: MonotonicTime,
func: F,
arg: T,
sender: Sender<M>,
scheduler_queue: &Mutex<SchedulerQueue>,
) where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let channel_id = sender.channel_id();
let action = Action::new(OnceAction::new(process_event(func, arg, sender)));
let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), action);
}
/// Schedules an event at a future time, returning an action key.
///
/// This function does not check whether the specified time lies in the future
/// of the current simulation time.
pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
time: MonotonicTime,
func: F,
arg: T,
sender: Sender<M>,
scheduler_queue: &Mutex<SchedulerQueue>,
) -> ActionKey
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = ActionKey::new();
let channel_id = sender.channel_id();
let action = Action::new(KeyedOnceAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
event_key.clone(),
));
let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), action);
event_key
}
/// Schedules a periodic event at a future time.
///
/// This function does not check whether the specified time lies in the future
/// of the current simulation time.
pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
time: MonotonicTime,
period: Duration,
func: F,
arg: T,
sender: Sender<M>,
scheduler_queue: &Mutex<SchedulerQueue>,
) where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let channel_id = sender.channel_id();
let action = Action::new(PeriodicAction::new(
|| process_event(func, arg, sender),
period,
));
let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), action);
}
/// Schedules an event at a future time, returning an action key.
///
/// This function does not check whether the specified time lies in the future
/// of the current simulation time.
pub(crate) fn schedule_periodic_keyed_event_at_unchecked<M, F, T, S>(
time: MonotonicTime,
period: Duration,
func: F,
arg: T,
sender: Sender<M>,
scheduler_queue: &Mutex<SchedulerQueue>,
) -> ActionKey
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let event_key = ActionKey::new();
let channel_id = sender.channel_id();
let action = Action::new(KeyedPeriodicAction::new(
|ek| send_keyed_event(ek, func, arg, sender),
period,
event_key.clone(),
));
let mut scheduler_queue = scheduler_queue.lock().unwrap();
scheduler_queue.insert((time, channel_id), action);
event_key
}
pin_project! {
/// An object that can be converted to a future performing a single
/// non-cancellable action.

View File

@ -8,7 +8,7 @@ use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::{add_model, Mailbox, SchedulerQueue, Simulation};
use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation};
/// Builder for a multi-threaded, discrete-event simulation.
pub struct SimInit {
@ -58,17 +58,8 @@ impl SimInit {
mailbox: Mailbox<M>,
name: impl Into<String>,
) -> Self {
let scheduler_queue = self.scheduler_queue.clone();
let time = self.time.reader();
add_model(
model,
mailbox,
name.into(),
scheduler_queue,
time,
&self.executor,
);
let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader());
add_model(model, mailbox, name.into(), scheduler, &self.executor);
self
}

View File

@ -31,7 +31,7 @@
//!
//! // Sets an alarm [input port].
//! pub fn set(&mut self, setting: MonotonicTime, context: &Context<Self>) {
//! if context.schedule_event(setting, Self::ring, ()).is_err() {
//! if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() {
//! println!("The alarm clock can only be set for a future time");
//! }
//! }

View File

@ -16,7 +16,12 @@ fn model_schedule_event() {
impl TestModel {
fn trigger(&mut self, _: (), context: &Context<Self>) {
context
.schedule_event(context.time() + Duration::from_secs(2), Self::action, ())
.scheduler
.schedule_event(
context.scheduler.time() + Duration::from_secs(2),
Self::action,
(),
)
.unwrap();
}
async fn action(&mut self) {
@ -53,10 +58,20 @@ fn model_cancel_future_keyed_event() {
impl TestModel {
fn trigger(&mut self, _: (), context: &Context<Self>) {
context
.schedule_event(context.time() + Duration::from_secs(1), Self::action1, ())
.scheduler
.schedule_event(
context.scheduler.time() + Duration::from_secs(1),
Self::action1,
(),
)
.unwrap();
self.key = context
.schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ())
.scheduler
.schedule_keyed_event(
context.scheduler.time() + Duration::from_secs(2),
Self::action2,
(),
)
.ok();
}
async fn action1(&mut self) {
@ -99,10 +114,20 @@ fn model_cancel_same_time_keyed_event() {
impl TestModel {
fn trigger(&mut self, _: (), context: &Context<Self>) {
context
.schedule_event(context.time() + Duration::from_secs(2), Self::action1, ())
.scheduler
.schedule_event(
context.scheduler.time() + Duration::from_secs(2),
Self::action1,
(),
)
.unwrap();
self.key = context
.schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ())
.scheduler
.schedule_keyed_event(
context.scheduler.time() + Duration::from_secs(2),
Self::action2,
(),
)
.ok();
}
async fn action1(&mut self) {
@ -144,8 +169,9 @@ fn model_schedule_periodic_event() {
impl TestModel {
fn trigger(&mut self, _: (), context: &Context<Self>) {
context
.scheduler
.schedule_periodic_event(
context.time() + Duration::from_secs(2),
context.scheduler.time() + Duration::from_secs(2),
Duration::from_secs(3),
Self::action,
42,
@ -192,8 +218,9 @@ fn model_cancel_periodic_event() {
impl TestModel {
fn trigger(&mut self, _: (), context: &Context<Self>) {
self.key = context
.scheduler
.schedule_keyed_periodic_event(
context.time() + Duration::from_secs(2),
context.scheduler.time() + Duration::from_secs(2),
Duration::from_secs(3),
Self::action,
(),

View File

@ -48,16 +48,20 @@ fn simulation_schedule_events() {
let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(t0);
let scheduler = simu.scheduler();
// Queue 2 events at t0+3s and t0+2s, in reverse order.
simu.schedule_event(Duration::from_secs(3), PassThroughModel::input, (), &addr)
scheduler
.schedule_event(Duration::from_secs(3), PassThroughModel::input, (), &addr)
.unwrap();
scheduler
.schedule_event(
t0 + Duration::from_secs(2),
PassThroughModel::input,
(),
&addr,
)
.unwrap();
simu.schedule_event(
t0 + Duration::from_secs(2),
PassThroughModel::input,
(),
&addr,
)
.unwrap();
// Move to the 1st event at t0+2s.
simu.step();
@ -65,7 +69,8 @@ fn simulation_schedule_events() {
assert!(output.next().is_some());
// Schedule another event in 4s (at t0+6s).
simu.schedule_event(Duration::from_secs(4), PassThroughModel::input, (), &addr)
scheduler
.schedule_event(Duration::from_secs(4), PassThroughModel::input, (), &addr)
.unwrap();
// Move to the 2nd event at t0+3s.
@ -85,7 +90,9 @@ fn simulation_schedule_keyed_events() {
let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(t0);
let event_t1 = simu
let scheduler = simu.scheduler();
let event_t1 = scheduler
.schedule_keyed_event(
t0 + Duration::from_secs(1),
PassThroughModel::input,
@ -94,11 +101,12 @@ fn simulation_schedule_keyed_events() {
)
.unwrap();
let event_t2_1 = simu
let event_t2_1 = scheduler
.schedule_keyed_event(Duration::from_secs(2), PassThroughModel::input, 21, &addr)
.unwrap();
simu.schedule_event(Duration::from_secs(2), PassThroughModel::input, 22, &addr)
scheduler
.schedule_event(Duration::from_secs(2), PassThroughModel::input, 22, &addr)
.unwrap();
// Move to the 1st event at t0+1.
@ -124,23 +132,27 @@ fn simulation_schedule_periodic_events() {
let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(t0);
let scheduler = simu.scheduler();
// Queue 2 periodic events at t0 + 3s + k*2s.
simu.schedule_periodic_event(
Duration::from_secs(3),
Duration::from_secs(2),
PassThroughModel::input,
1,
&addr,
)
.unwrap();
simu.schedule_periodic_event(
t0 + Duration::from_secs(3),
Duration::from_secs(2),
PassThroughModel::input,
2,
&addr,
)
.unwrap();
scheduler
.schedule_periodic_event(
Duration::from_secs(3),
Duration::from_secs(2),
PassThroughModel::input,
1,
&addr,
)
.unwrap();
scheduler
.schedule_periodic_event(
t0 + Duration::from_secs(3),
Duration::from_secs(2),
PassThroughModel::input,
2,
&addr,
)
.unwrap();
// Move to the next events at t0 + 3s + k*2s.
for k in 0..10 {
@ -160,16 +172,19 @@ fn simulation_schedule_periodic_keyed_events() {
let t0 = MonotonicTime::EPOCH;
let (mut simu, addr, mut output) = passthrough_bench(t0);
let scheduler = simu.scheduler();
// Queue 2 periodic events at t0 + 3s + k*2s.
simu.schedule_periodic_event(
Duration::from_secs(3),
Duration::from_secs(2),
PassThroughModel::input,
1,
&addr,
)
.unwrap();
let event2_key = simu
scheduler
.schedule_periodic_event(
Duration::from_secs(3),
Duration::from_secs(2),
PassThroughModel::input,
1,
&addr,
)
.unwrap();
let event2_key = scheduler
.schedule_keyed_periodic_event(
t0 + Duration::from_secs(3),
Duration::from_secs(2),
@ -279,14 +294,17 @@ fn simulation_system_clock_from_instant() {
let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock);
let scheduler = simu.scheduler();
// Queue a single event at t0 + 0.1s.
simu.schedule_event(
Duration::from_secs_f64(0.1),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs_f64(0.1),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
// Check the stamps.
for expected_time in [
@ -333,14 +351,17 @@ fn simulation_system_clock_from_system_time() {
let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock);
let scheduler = simu.scheduler();
// Queue a single event at t0 + 0.1s.
simu.schedule_event(
Duration::from_secs_f64(0.1),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs_f64(0.1),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
// Check the stamps.
for expected_time in [
@ -376,24 +397,28 @@ fn simulation_auto_system_clock() {
let (mut simu, addr, mut stamp) = timestamp_bench(t0, AutoSystemClock::new());
let instant_t0 = Instant::now();
let scheduler = simu.scheduler();
// Queue a periodic event at t0 + 0.2s + k*0.2s.
simu.schedule_periodic_event(
Duration::from_secs_f64(0.2),
Duration::from_secs_f64(0.2),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
scheduler
.schedule_periodic_event(
Duration::from_secs_f64(0.2),
Duration::from_secs_f64(0.2),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
// Queue a single event at t0 + 0.3s.
simu.schedule_event(
Duration::from_secs_f64(0.3),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs_f64(0.3),
TimestampModel::trigger,
(),
&addr,
)
.unwrap();
// Check the stamps.
for expected_time in [0.0, 0.2, 0.3, 0.4, 0.6] {