From 6e3d5bb13285a35e577f54175a8ed40ac4e3dc0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Wed, 31 Jul 2024 16:01:16 +0200 Subject: [PATCH] Change scheduler interface and add external inputs example. Relevant for issue #13. --- asynchronix/Cargo.toml | 3 +- asynchronix/examples/assembly.rs | 17 +- asynchronix/examples/espresso_machine.rs | 27 +- asynchronix/examples/external_input.rs | 251 ++++++++ asynchronix/examples/stepper_motor.rs | 26 +- asynchronix/src/lib.rs | 10 +- asynchronix/src/model/context.rs | 328 +--------- asynchronix/src/ports/output/broadcaster.rs | 15 +- asynchronix/src/ports/source/broadcaster.rs | 15 +- asynchronix/src/simulation.rs | 215 +------ asynchronix/src/simulation/scheduler.rs | 640 ++++++++++++++++---- asynchronix/src/simulation/sim_init.rs | 15 +- asynchronix/src/time.rs | 2 +- asynchronix/tests/model_scheduling.rs | 41 +- asynchronix/tests/simulation_scheduling.rs | 157 +++-- 15 files changed, 996 insertions(+), 766 deletions(-) create mode 100644 asynchronix/examples/external_input.rs diff --git a/asynchronix/Cargo.toml b/asynchronix/Cargo.toml index 5886394..ea2f278 100644 --- a/asynchronix/Cargo.toml +++ b/asynchronix/Cargo.toml @@ -73,9 +73,10 @@ waker-fn = "1.1" [dev-dependencies] +atomic-wait = "1.1" futures-util = "0.3" futures-executor = "0.3" - +mio = { version = "1.0", features = ["os-poll", "net"] } [build-dependencies] tonic-build = { version = "0.11", optional = true } diff --git a/asynchronix/examples/assembly.rs b/asynchronix/examples/assembly.rs index 8a88a6c..ccc2eb3 100644 --- a/asynchronix/examples/assembly.rs +++ b/asynchronix/examples/assembly.rs @@ -109,6 +109,8 @@ fn main() { .add_model(assembly, assembly_mbox, "assembly") .init(t0); + let scheduler = simu.scheduler(); + // ---------- // Simulation. // ---------- @@ -120,13 +122,14 @@ fn main() { assert!(position.next().is_none()); // Start the motor in 2s with a PPS of 10Hz. - simu.schedule_event( - Duration::from_secs(2), - MotorAssembly::pulse_rate, - 10.0, - &assembly_addr, - ) - .unwrap(); + scheduler + .schedule_event( + Duration::from_secs(2), + MotorAssembly::pulse_rate, + 10.0, + &assembly_addr, + ) + .unwrap(); // Advance simulation time to two next events. simu.step(); diff --git a/asynchronix/examples/espresso_machine.rs b/asynchronix/examples/espresso_machine.rs index fcab7c3..f5b90da 100644 --- a/asynchronix/examples/espresso_machine.rs +++ b/asynchronix/examples/espresso_machine.rs @@ -140,6 +140,7 @@ impl Controller { // Schedule the `stop_brew()` method and turn on the pump. self.stop_brew_key = Some( context + .scheduler .schedule_keyed_event(self.brew_time, Self::stop_brew, ()) .unwrap(), ); @@ -206,7 +207,7 @@ impl Tank { state.set_empty_key.cancel(); // Update the volume, saturating at 0 in case of rounding errors. - let time = context.time(); + let time = context.scheduler.time(); let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64(); self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0); @@ -231,7 +232,7 @@ impl Tank { pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context) { assert!(flow_rate >= 0.0); - let time = context.time(); + let time = context.scheduler.time(); // If the flow rate was non-zero up to now, update the volume. if let Some(state) = self.dynamic_state.take() { @@ -273,7 +274,10 @@ impl Tank { let duration_until_empty = Duration::from_secs_f64(duration_until_empty); // Schedule the next update. - match context.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) { + match context + .scheduler + .schedule_keyed_event(duration_until_empty, Self::set_empty, ()) + { Ok(set_empty_key) => { let state = TankDynamicState { last_volume_update: time, @@ -373,6 +377,8 @@ fn main() { .add_model(tank, tank_mbox, "tank") .init(t0); + let scheduler = simu.scheduler(); + // ---------- // Simulation. // ---------- @@ -426,13 +432,14 @@ fn main() { assert_eq!(flow_rate.next(), Some(0.0)); // Interrupt the brew after 15s by pressing again the brew button. - simu.schedule_event( - Duration::from_secs(15), - Controller::brew_cmd, - (), - &controller_addr, - ) - .unwrap(); + scheduler + .schedule_event( + Duration::from_secs(15), + Controller::brew_cmd, + (), + &controller_addr, + ) + .unwrap(); simu.process_event(Controller::brew_cmd, (), &controller_addr); assert_eq!(flow_rate.next(), Some(pump_flow_rate)); diff --git a/asynchronix/examples/external_input.rs b/asynchronix/examples/external_input.rs new file mode 100644 index 0000000..8aeb609 --- /dev/null +++ b/asynchronix/examples/external_input.rs @@ -0,0 +1,251 @@ +//! Example: a model that reads data from the external world. +//! +//! This example demonstrates in particular: +//! +//! * external world inputs (useful in cosimulation), +//! * system clock, +//! * periodic scheduling. +//! +//! ```text +//! ┌────────────────────────────────┐ +//! │ Simulation │ +//! ┌────────────┐ ┌────────────┐ │ ┌──────────┐ │ +//! │ │ UDP │ │ message │ message │ │ message │ ┌─────────────┐ +//! │ UDP Client ├─────────▶│ UDP Server ├──────────▶├─────────▶│ Listener ├─────────▶├──▶│ EventBuffer │ +//! │ │ message │ │ │ │ │ │ └─────────────┘ +//! └────────────┘ └────────────┘ │ └──────────┘ │ +//! └────────────────────────────────┘ +//! ``` + +use std::io::ErrorKind; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Arc; +use std::thread::{self, sleep, JoinHandle}; +use std::time::Duration; + +use atomic_wait::{wait, wake_one}; + +use mio::net::UdpSocket as MioUdpSocket; +use mio::{Events, Interest, Poll, Token}; + +use asynchronix::model::{Context, InitializedModel, Model, SetupContext}; +use asynchronix::ports::{EventBuffer, Output}; +use asynchronix::simulation::{Mailbox, SimInit}; +use asynchronix::time::{AutoSystemClock, MonotonicTime}; + +const DELTA: Duration = Duration::from_millis(2); +const PERIOD: Duration = Duration::from_millis(20); +const N: u32 = 10; +const SENDER: &str = "127.0.0.1:8000"; +const RECEIVER: &str = "127.0.0.1:9000"; + +/// Model that receives external input. +pub struct Listener { + /// Received message. + pub message: Output, + + /// Receiver of external messages. + rx: Receiver, + + /// External sender. + tx: Option>, + + /// Synchronization with client. + start: Arc, + + /// Synchronization with simulation. + stop: Arc, + + /// Handle to UDP Server. + external_handle: Option>, +} + +impl Listener { + /// Creates a Listener. + pub fn new(start: Arc) -> Self { + start.store(0, Ordering::Relaxed); + + let (tx, rx) = channel(); + Self { + message: Output::default(), + rx, + tx: Some(tx), + start, + stop: Arc::new(AtomicBool::new(false)), + external_handle: None, + } + } + + /// Periodically scheduled function that processes external events. + pub async fn process(&mut self) { + loop { + if let Ok(message) = self.rx.try_recv() { + self.message.send(message).await; + } else { + break; + } + } + } + + /// UDP server. + /// + /// Code is based on the MIO UDP example. + fn listener(tx: Sender, start: Arc, stop: Arc) { + const UDP_SOCKET: Token = Token(0); + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(10); + let mut socket = MioUdpSocket::bind(RECEIVER.parse().unwrap()).unwrap(); + poll.registry() + .register(&mut socket, UDP_SOCKET, Interest::READABLE) + .unwrap(); + let mut buf = [0; 1 << 16]; + + // Wake up the client. + start.store(1, Ordering::Relaxed); + wake_one(&*start); + + 'process: loop { + // Wait for UDP packet or end of simulation. + if let Err(err) = poll.poll(&mut events, Some(Duration::from_secs(1))) { + if err.kind() == ErrorKind::Interrupted { + // Exit if simulation is finished. + if stop.load(Ordering::Relaxed) { + break 'process; + } + continue; + } + break 'process; + } + + for event in events.iter() { + match event.token() { + UDP_SOCKET => loop { + match socket.recv_from(&mut buf) { + Ok((packet_size, _)) => { + if let Ok(message) = std::str::from_utf8(&buf[..packet_size]) { + // Inject external message into simulation. + if tx.send(message.into()).is_err() { + break 'process; + } + }; + } + Err(e) if e.kind() == ErrorKind::WouldBlock => { + break; + } + _ => { + break 'process; + } + } + }, + _ => { + panic!("Got event for unexpected token: {:?}", event); + } + } + } + // Exit if simulation is finished. + if stop.load(Ordering::Relaxed) { + break 'process; + } + } + + poll.registry().deregister(&mut socket).unwrap(); + } +} + +impl Model for Listener { + /// Start UDP Server on model setup. + fn setup(&mut self, _: &SetupContext) { + let tx = self.tx.take().unwrap(); + let start = Arc::clone(&self.start); + let stop = Arc::clone(&self.stop); + self.external_handle = Some(thread::spawn(move || { + Self::listener(tx, start, stop); + })); + } + + /// Initialize model. + async fn init(self, context: &Context) -> InitializedModel { + // Schedule periodic function that processes external events. + context + .scheduler + .schedule_periodic_event(DELTA, PERIOD, Listener::process, ()) + .unwrap(); + + self.into() + } +} + +impl Drop for Listener { + /// Notify UDP Server that simulation is over and wait for server shutdown. + fn drop(&mut self) { + self.stop.store(true, Ordering::Relaxed); + let handle = self.external_handle.take(); + if let Some(handle) = handle { + handle.join().unwrap(); + } + } +} + +fn main() { + // --------------- + // Bench assembly. + // --------------- + + // Models. + + // Client-server synchronization. + let start = Arc::new(AtomicU32::new(0)); + + let mut listener = Listener::new(Arc::clone(&start)); + + // Mailboxes. + let listener_mbox = Mailbox::new(); + + // Model handles for simulation. + let mut message = EventBuffer::new(); + listener.message.connect_sink(&message); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let mut simu = SimInit::new() + .add_model(listener, listener_mbox, "listener") + .set_clock(AutoSystemClock::new()) + .init(t0); + + // ---------- + // Simulation. + // ---------- + + // External client that sends UDP messages. + let sender_handle = thread::spawn(move || { + // Wait until UDP Server is ready. + wait(&start, 0); + + for i in 0..N { + let socket = UdpSocket::bind(SENDER).unwrap(); + socket.send_to(i.to_string().as_bytes(), RECEIVER).unwrap(); + if i % 3 == 0 { + sleep(PERIOD * i) + } + } + }); + + // Advance simulation, external messages will be collected. + simu.step_by(Duration::from_secs(2)); + + // Check collected external messages. + let mut packets = 0_u32; + for _ in 0..N { + // UDP can reorder packages, we are expecting that on not too loaded + // localhost packages would not be dropped + packets |= 1 << message.next().unwrap().parse::().unwrap(); + } + assert_eq!(packets, u32::MAX >> 22); + assert_eq!(message.next(), None); + + sender_handle.join().unwrap(); +} diff --git a/asynchronix/examples/stepper_motor.rs b/asynchronix/examples/stepper_motor.rs index 5b2130a..867f46b 100644 --- a/asynchronix/examples/stepper_motor.rs +++ b/asynchronix/examples/stepper_motor.rs @@ -58,7 +58,7 @@ impl Motor { println!( "Model instance {} at time {}: setting currents: {:.2} and {:.2}", context.name(), - context.time(), + context.scheduler.time(), current.0, current.1 ); @@ -91,7 +91,7 @@ impl Motor { println!( "Model instance {} at time {}: setting load: {:.2}", context.name(), - context.time(), + context.scheduler.time(), torque ); @@ -141,7 +141,7 @@ impl Driver { println!( "Model instance {} at time {}: setting pps: {:.2}", context.name(), - context.time(), + context.scheduler.time(), pps ); @@ -172,7 +172,7 @@ impl Driver { println!( "Model instance {} at time {}: sending pulse", context.name(), - context.time() + context.scheduler.time() ); async move { @@ -195,6 +195,7 @@ impl Driver { // Schedule the next pulse. context + .scheduler .schedule_event(pulse_duration, Self::send_pulse, ()) .unwrap(); } @@ -236,6 +237,8 @@ fn main() { .add_model(motor, motor_mbox, "motor") .init(t0); + let scheduler = simu.scheduler(); + // ---------- // Simulation. // ---------- @@ -247,13 +250,14 @@ fn main() { assert!(position.next().is_none()); // Start the motor in 2s with a PPS of 10Hz. - simu.schedule_event( - Duration::from_secs(2), - Driver::pulse_rate, - 10.0, - &driver_addr, - ) - .unwrap(); + scheduler + .schedule_event( + Duration::from_secs(2), + Driver::pulse_rate, + 10.0, + &driver_addr, + ) + .unwrap(); // Advance simulation time to two next events. simu.step(); diff --git a/asynchronix/src/lib.rs b/asynchronix/src/lib.rs index f8cb141..5ba0707 100644 --- a/asynchronix/src/lib.rs +++ b/asynchronix/src/lib.rs @@ -119,7 +119,7 @@ //! } //! impl Delay { //! pub fn input(&mut self, value: f64, context: &Context) { -//! context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! } //! //! async fn send(&mut self, value: f64) { @@ -190,7 +190,7 @@ //! # } //! # impl Delay { //! # pub fn input(&mut self, value: f64, context: &Context) { -//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -250,7 +250,7 @@ //! [`Simulation::process_event()`](simulation::Simulation::process_event) or //! [`Simulation::send_query()`](simulation::Simulation::process_query), //! 3. by scheduling events, using for instance -//! [`Simulation::schedule_event()`](simulation::Simulation::schedule_event). +//! [`Scheduler::schedule_event()`](simulation::Scheduler::schedule_event). //! //! When initialized with the default clock, the simulation will run as fast as //! possible, without regard for the actual wall clock time. Alternatively, the @@ -289,7 +289,7 @@ //! # } //! # impl Delay { //! # pub fn input(&mut self, value: f64, context: &Context) { -//! # context.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); +//! # context.scheduler.schedule_event(Duration::from_secs(1), Self::send, value).unwrap(); //! # } //! # async fn send(&mut self, value: f64) { // this method can be private //! # self.output.send(value).await; @@ -370,7 +370,7 @@ //! //! The first guarantee (and only the first) also extends to events scheduled //! from a simulation with a -//! [`Simulation::schedule_*()`](simulation::Simulation::schedule_event) method: +//! [`Scheduler::schedule_*()`](simulation::Scheduler::schedule_event) method: //! if the scheduler contains several events to be delivered at the same time to //! the same model, these events will always be processed in the order in which //! they were scheduled. diff --git a/asynchronix/src/model/context.rs b/asynchronix/src/model/context.rs index 2336196..b71742a 100644 --- a/asynchronix/src/model/context.rs +++ b/asynchronix/src/model/context.rs @@ -1,17 +1,7 @@ use std::fmt; -use std::sync::{Arc, Mutex}; -use std::time::Duration; -use crate::channel::Sender; use crate::executor::Executor; -use crate::ports::InputFn; -use crate::simulation::{ - self, schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, - schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, ActionKey, - Deadline, Mailbox, SchedulerQueue, SchedulingError, -}; -use crate::time::{MonotonicTime, TearableAtomicTime}; -use crate::util::sync_cell::SyncCellReader; +use crate::simulation::{self, LocalScheduler, Mailbox}; use super::Model; @@ -60,13 +50,13 @@ use super::Model; /// impl DelayedGreeter { /// // Triggers a greeting on the output port after some delay [input port]. /// pub async fn greet_with_delay(&mut self, delay: Duration, context: &Context) { -/// let time = context.time(); +/// let time = context.scheduler.time(); /// let greeting = format!("Hello, this message was scheduled at: {:?}.", time); /// /// if delay.is_zero() { /// self.msg_out.send(greeting).await; /// } else { -/// context.schedule_event(delay, Self::send_msg, greeting).unwrap(); +/// context.scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap(); /// } /// } /// @@ -82,320 +72,21 @@ use super::Model; // https://github.com/rust-lang/rust/issues/78649 pub struct Context { name: String, - sender: Sender, - scheduler_queue: Arc>, - time: SyncCellReader, + + /// Local scheduler. + pub scheduler: LocalScheduler, } impl Context { /// Creates a new local context. - pub(crate) fn new( - name: String, - sender: Sender, - scheduler_queue: Arc>, - time: SyncCellReader, - ) -> Self { - Self { - name, - sender, - scheduler_queue, - time, - } + pub(crate) fn new(name: String, scheduler: LocalScheduler) -> Self { + Self { name, scheduler } } /// Returns the model instance name. pub fn name(&self) -> &str { &self.name } - - /// Returns the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::time::MonotonicTime; - /// - /// fn is_third_millenium(context: &Context) -> bool { - /// let time = context.time(); - /// time >= MonotonicTime::new(978307200, 0).unwrap() - /// && time < MonotonicTime::new(32535216000, 0).unwrap() - /// } - /// ``` - pub fn time(&self) -> MonotonicTime { - self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") - } - - /// Schedules an event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// - /// // A timer. - /// pub struct Timer {} - /// - /// impl Timer { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: Duration, context: &Context) { - /// if context.schedule_event(setting, Self::ring, ()).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Rings [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg"); - /// } - /// } - /// - /// impl Model for Timer {} - /// ``` - pub fn schedule_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = self.sender.clone(); - schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - - Ok(()) - } - - /// Schedules a cancellable event at a future time and returns an action - /// key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time. - /// - /// # Examples - /// - /// ``` - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::simulation::ActionKey; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock that can be cancelled. - /// #[derive(Default)] - /// pub struct CancellableAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// self.cancel(); - /// match context.schedule_keyed_event(setting, Self::ring, ()) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels the current alarm, if any [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Rings the alarm [private input port]. - /// fn ring(&mut self) { - /// println!("Brringggg!"); - /// } - /// } - /// - /// impl Model for CancellableAlarmClock {} - /// ``` - pub fn schedule_keyed_event( - &self, - deadline: impl Deadline, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let sender = self.sender.clone(); - let event_key = - schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue); - - Ok(event_key) - } - - /// Schedules a periodically recurring event at a future time. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock beeping at 1Hz. - /// pub struct BeepingAlarmClock {} - /// - /// impl BeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// if context.schedule_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ).is_err() { - /// println!("The alarm clock can only be set for a future time"); - /// } - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for BeepingAlarmClock {} - /// ``` - pub fn schedule_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result<(), SchedulingError> - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = self.sender.clone(); - schedule_periodic_event_at_unchecked( - time, - period, - func, - arg, - sender, - &self.scheduler_queue, - ); - - Ok(()) - } - - /// Schedules a cancellable, periodically recurring event at a future time - /// and returns an action key. - /// - /// An error is returned if the specified deadline is not in the future of - /// the current simulation time or if the specified period is null. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use asynchronix::model::{Context, Model}; - /// use asynchronix::simulation::ActionKey; - /// use asynchronix::time::MonotonicTime; - /// - /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or - /// // stopped after it sets off. - /// #[derive(Default)] - /// pub struct CancellableBeepingAlarmClock { - /// event_key: Option, - /// } - /// - /// impl CancellableBeepingAlarmClock { - /// // Sets an alarm [input port]. - /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { - /// self.cancel(); - /// match context.schedule_keyed_periodic_event( - /// setting, - /// Duration::from_secs(1), // 1Hz = 1/1s - /// Self::beep, - /// () - /// ) { - /// Ok(event_key) => self.event_key = Some(event_key), - /// Err(_) => println!("The alarm clock can only be set for a future time"), - /// }; - /// } - /// - /// // Cancels or stops the alarm [input port]. - /// pub fn cancel(&mut self) { - /// self.event_key.take().map(|k| k.cancel()); - /// } - /// - /// // Emits a single beep [private input port]. - /// fn beep(&mut self) { - /// println!("Beep!"); - /// } - /// } - /// - /// impl Model for CancellableBeepingAlarmClock {} - /// ``` - pub fn schedule_keyed_periodic_event( - &self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - ) -> Result - where - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let sender = self.sender.clone(); - let event_key = schedule_periodic_keyed_event_at_unchecked( - time, - period, - func, - arg, - sender, - &self.scheduler_queue, - ); - - Ok(event_key) - } } impl fmt::Debug for Context { @@ -501,8 +192,7 @@ impl<'a, M: Model> SetupContext<'a, M> { model, mailbox, submodel_name, - self.context.scheduler_queue.clone(), - self.context.time.clone(), + self.context.scheduler.scheduler.clone(), self.executor, ); } diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index b960bf8..0be241a 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -557,6 +557,7 @@ mod tests { use crate::channel::Receiver; use crate::model::Context; + use crate::simulation::{Address, LocalScheduler, Scheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -616,9 +617,10 @@ mod tests { SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); let dummy_context = Context::new( String::new(), - dummy_address, - dummy_priority_queue, - dummy_time, + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), ); block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); } @@ -671,9 +673,10 @@ mod tests { SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); let dummy_context = Context::new( String::new(), - dummy_address, - dummy_priority_queue, - dummy_time, + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), ); block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index 95a07aa..b545b66 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -440,6 +440,7 @@ mod tests { use crate::channel::Receiver; use crate::model::Context; + use crate::simulation::{Address, LocalScheduler, Scheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; @@ -499,9 +500,10 @@ mod tests { SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); let dummy_context = Context::new( String::new(), - dummy_address, - dummy_priority_queue, - dummy_time, + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), ); block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); } @@ -554,9 +556,10 @@ mod tests { SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); let dummy_context = Context::new( String::new(), - dummy_address, - dummy_priority_queue, - dummy_time, + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), ); block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); diff --git a/asynchronix/src/simulation.rs b/asynchronix/src/simulation.rs index 53e3732..0fc81ec 100644 --- a/asynchronix/src/simulation.rs +++ b/asynchronix/src/simulation.rs @@ -127,12 +127,12 @@ mod scheduler; mod sim_init; pub use mailbox::{Address, Mailbox}; +pub use scheduler::{ + Action, ActionKey, AutoActionKey, Deadline, LocalScheduler, Scheduler, SchedulingError, +}; pub(crate) use scheduler::{ - schedule_event_at_unchecked, schedule_keyed_event_at_unchecked, - schedule_periodic_event_at_unchecked, schedule_periodic_keyed_event_at_unchecked, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, SchedulerQueue, }; -pub use scheduler::{Action, ActionKey, AutoActionKey, Deadline, SchedulingError}; pub use sim_init::SimInit; use std::error::Error; @@ -149,7 +149,7 @@ use crate::ports::{InputFn, ReplierFn}; use crate::time::{Clock, MonotonicTime, TearableAtomicTime}; use crate::util::seq_futures::SeqFuture; use crate::util::slot; -use crate::util::sync_cell::{SyncCell, SyncCellReader}; +use crate::util::sync_cell::SyncCell; /// Simulation environment. /// @@ -161,10 +161,10 @@ use crate::util::sync_cell::{SyncCell, SyncCellReader}; /// A [`Simulation`] object also manages an event scheduling queue and /// simulation time. The scheduling queue can be accessed from the simulation /// itself, but also from models via the optional -/// [`&Context`](crate::model::Context) argument of input and replier port methods. -/// Likewise, simulation time can be accessed with the [`Simulation::time()`] -/// method, or from models with the [`Context::time()`](crate::model::Context::time) -/// method. +/// [`&Context`](crate::model::Context) argument of input and replier port +/// methods. Likewise, simulation time can be accessed with the +/// [`Simulation::time()`] method, or from models with the +/// [`LocalScheduler::time()`](crate::simulation::LocalScheduler::time) method. /// /// Events and queries can be scheduled immediately, *i.e.* for the current /// simulation time, using [`process_event()`](Simulation::process_event) and @@ -173,7 +173,7 @@ use crate::util::sync_cell::{SyncCell, SyncCellReader}; /// completed. In the case of queries, the response is returned. /// /// Events can also be scheduled at a future simulation time using one of the -/// [`schedule_*()`](Simulation::schedule_event) method. These methods queue an +/// [`schedule_*()`](Scheduler::schedule_event) method. These methods queue an /// event without blocking. /// /// Finally, the [`Simulation`] instance manages simulation time. A call to @@ -257,192 +257,9 @@ impl Simulation { Ok(()) } - /// Schedules an action at a future time. - /// - /// An error is returned if the specified time is not in the future of the - /// current simulation time. - /// - /// If multiple actions send events at the same simulation time to the same - /// model, these events are guaranteed to be processed according to the - /// scheduling order of the actions. - pub fn schedule( - &mut self, - deadline: impl Deadline, - action: Action, - ) -> Result<(), SchedulingError> { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - - let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); - - // The channel ID is set to the same value for all actions. This - // ensures that the relative scheduling order of all source events is - // preserved, which is important if some of them target the same models. - // The value 0 was chosen as it prevents collisions with channel IDs as - // the latter are always non-zero. - scheduler_queue.insert((time, 0), action); - - Ok(()) - } - - /// Schedules an event at a future time. - /// - /// An error is returned if the specified time is not in the future of the - /// current simulation time. - /// - /// Events scheduled for the same time and targeting the same model are - /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`Context::schedule_event`](crate::model::Context::schedule_event). - pub fn schedule_event( - &mut self, - deadline: impl Deadline, - func: F, - arg: T, - address: impl Into>, - ) -> Result<(), SchedulingError> - where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue); - - Ok(()) - } - - /// Schedules a cancellable event at a future time and returns an event key. - /// - /// An error is returned if the specified time is not in the future of the - /// current simulation time. - /// - /// Events scheduled for the same time and targeting the same model are - /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`Context::schedule_keyed_event`](crate::model::Context::schedule_keyed_event). - pub fn schedule_keyed_event( - &mut self, - deadline: impl Deadline, - func: F, - arg: T, - address: impl Into>, - ) -> Result - where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - let event_key = schedule_keyed_event_at_unchecked( - time, - func, - arg, - address.into().0, - &self.scheduler_queue, - ); - - Ok(event_key) - } - - /// Schedules a periodically recurring event at a future time. - /// - /// An error is returned if the specified time is not in the future of the - /// current simulation time or if the specified period is null. - /// - /// Events scheduled for the same time and targeting the same model are - /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`Context::schedule_periodic_event`](crate::model::Context::schedule_periodic_event). - pub fn schedule_periodic_event( - &mut self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - address: impl Into>, - ) -> Result<(), SchedulingError> - where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - schedule_periodic_event_at_unchecked( - time, - period, - func, - arg, - address.into().0, - &self.scheduler_queue, - ); - - Ok(()) - } - - /// Schedules a cancellable, periodically recurring event at a future time - /// and returns an event key. - /// - /// An error is returned if the specified time is not in the future of the - /// current simulation time or if the specified period is null. - /// - /// Events scheduled for the same time and targeting the same model are - /// guaranteed to be processed according to the scheduling order. - /// - /// See also: [`Context::schedule_keyed_periodic_event`](crate::model::Context::schedule_keyed_periodic_event). - pub fn schedule_keyed_periodic_event( - &mut self, - deadline: impl Deadline, - period: Duration, - func: F, - arg: T, - address: impl Into>, - ) -> Result - where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, - { - let now = self.time(); - let time = deadline.into_time(now); - if now >= time { - return Err(SchedulingError::InvalidScheduledTime); - } - if period.is_zero() { - return Err(SchedulingError::NullRepetitionPeriod); - } - let event_key = schedule_periodic_keyed_event_at_unchecked( - time, - period, - func, - arg, - address.into().0, - &self.scheduler_queue, - ); - - Ok(event_key) + /// Returns scheduler. + pub fn scheduler(&self) -> Scheduler { + Scheduler::new(self.scheduler_queue.clone(), self.time.reader()) } /// Processes an action immediately, blocking until completion. @@ -630,6 +447,7 @@ impl Simulation { None => { // Update the simulation time. self.time.write(target_time); + self.clock.synchronize(target_time); return; } // The target time was not reached yet. @@ -667,13 +485,10 @@ pub(crate) fn add_model( mut model: M, mailbox: Mailbox, name: String, - scheduler_queue: Arc>, - time: SyncCellReader, + scheduler: Scheduler, executor: &Executor, ) { - let sender = mailbox.0.sender(); - - let context = Context::new(name, sender, scheduler_queue, time); + let context = Context::new(name, LocalScheduler::new(scheduler, mailbox.address())); let setup_context = SetupContext::new(&mailbox, &context, executor); model.setup(&setup_context); diff --git a/asynchronix/src/simulation/scheduler.rs b/asynchronix/src/simulation/scheduler.rs index f325912..6237dd0 100644 --- a/asynchronix/src/simulation/scheduler.rs +++ b/asynchronix/src/simulation/scheduler.rs @@ -17,8 +17,532 @@ use crate::channel::Sender; use crate::executor::Executor; use crate::model::Model; use crate::ports::InputFn; -use crate::time::MonotonicTime; +use crate::simulation::Address; +use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; +use crate::util::sync_cell::SyncCellReader; + +/// Scheduler. +#[derive(Clone)] +pub struct Scheduler { + scheduler_queue: Arc>, + time: SyncCellReader, +} + +impl Scheduler { + pub(crate) fn new( + scheduler_queue: Arc>, + time: SyncCellReader, + ) -> Self { + Self { + scheduler_queue, + time, + } + } + + /// Returns the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::simulation::Scheduler; + /// use asynchronix::time::MonotonicTime; + /// + /// fn is_third_millenium(scheduler: &Scheduler) -> bool { + /// let time = scheduler.time(); + /// time >= MonotonicTime::new(978307200, 0).unwrap() + /// && time < MonotonicTime::new(32535216000, 0).unwrap() + /// } + /// ``` + pub fn time(&self) -> MonotonicTime { + self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time") + } + + /// Schedules an action at a future time. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// If multiple actions send events at the same simulation time to the same + /// model, these events are guaranteed to be processed according to the + /// scheduling order of the actions. + pub fn schedule(&self, deadline: impl Deadline, action: Action) -> Result<(), SchedulingError> { + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + + // The channel ID is set to the same value for all actions. This + // ensures that the relative scheduling order of all source events is + // preserved, which is important if some of them target the same models. + // The value 0 was chosen as it prevents collisions with channel IDs as + // the latter are always non-zero. + scheduler_queue.insert((time, 0), action); + + Ok(()) + } + + /// Schedules an event at a future time. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`LocalScheduler::schedule_event`](LocalScheduler::schedule_event). + pub fn schedule_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + let sender = address.into().0; + let channel_id = sender.channel_id(); + let action = Action::new(OnceAction::new(process_event(func, arg, sender))); + + scheduler_queue.insert((time, channel_id), action); + + Ok(()) + } + + /// Schedules a cancellable event at a future time and returns an event key. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`LocalScheduler::schedule_keyed_event`](LocalScheduler::schedule_keyed_event). + pub fn schedule_keyed_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + address: impl Into>, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + let event_key = ActionKey::new(); + let sender = address.into().0; + let channel_id = sender.channel_id(); + let action = Action::new(KeyedOnceAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + event_key.clone(), + )); + + scheduler_queue.insert((time, channel_id), action); + + Ok(event_key) + } + + /// Schedules a periodically recurring event at a future time. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time or if the specified period is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`LocalScheduler::schedule_periodic_event`](LocalScheduler::schedule_periodic_event). + pub fn schedule_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result<(), SchedulingError> + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let sender = address.into().0; + let channel_id = sender.channel_id(); + + let action = Action::new(PeriodicAction::new( + || process_event(func, arg, sender), + period, + )); + + scheduler_queue.insert((time, channel_id), action); + + Ok(()) + } + + /// Schedules a cancellable, periodically recurring event at a future time + /// and returns an event key. + /// + /// An error is returned if the specified time is not in the future of the + /// current simulation time or if the specified period is null. + /// + /// Events scheduled for the same time and targeting the same model are + /// guaranteed to be processed according to the scheduling order. + /// + /// See also: [`LocalScheduler::schedule_keyed_periodic_event`](LocalScheduler::schedule_keyed_periodic_event). + pub fn schedule_keyed_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + address: impl Into>, + ) -> Result + where + M: Model, + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let mut scheduler_queue = self.scheduler_queue.lock().unwrap(); + let now = self.time(); + let time = deadline.into_time(now); + if now >= time { + return Err(SchedulingError::InvalidScheduledTime); + } + if period.is_zero() { + return Err(SchedulingError::NullRepetitionPeriod); + } + let event_key = ActionKey::new(); + let sender = address.into().0; + let channel_id = sender.channel_id(); + let action = Action::new(KeyedPeriodicAction::new( + |ek| send_keyed_event(ek, func, arg, sender), + period, + event_key.clone(), + )); + scheduler_queue.insert((time, channel_id), action); + + Ok(event_key) + } +} + +impl fmt::Debug for Scheduler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Scheduler") + .field("time", &self.time()) + .finish_non_exhaustive() + } +} + +/// Local scheduler. +pub struct LocalScheduler { + pub(crate) scheduler: Scheduler, + address: Address, +} + +impl LocalScheduler { + pub(crate) fn new(scheduler: Scheduler, address: Address) -> Self { + Self { scheduler, address } + } + + /// Returns the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::Model; + /// use asynchronix::simulation::LocalScheduler; + /// use asynchronix::time::MonotonicTime; + /// + /// fn is_third_millenium(scheduler: &LocalScheduler) -> bool { + /// let time = scheduler.time(); + /// time >= MonotonicTime::new(978307200, 0).unwrap() + /// && time < MonotonicTime::new(32535216000, 0).unwrap() + /// } + /// ``` + pub fn time(&self) -> MonotonicTime { + self.scheduler.time() + } + + /// Schedules an event at a future time. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// + /// // A timer. + /// pub struct Timer {} + /// + /// impl Timer { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: Duration, context: &Context) { + /// if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Rings [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg"); + /// } + /// } + /// + /// impl Model for Timer {} + /// ``` + pub fn schedule_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + self.scheduler + .schedule_event(deadline, func, arg, &self.address) + } + + /// Schedules a cancellable event at a future time and returns an action + /// key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time. + /// + /// # Examples + /// + /// ``` + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock that can be cancelled. + /// #[derive(Default)] + /// pub struct CancellableAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { + /// self.cancel(); + /// match context.scheduler.schedule_keyed_event(setting, Self::ring, ()) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels the current alarm, if any [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Rings the alarm [private input port]. + /// fn ring(&mut self) { + /// println!("Brringggg!"); + /// } + /// } + /// + /// impl Model for CancellableAlarmClock {} + /// ``` + pub fn schedule_keyed_event( + &self, + deadline: impl Deadline, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S>, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = self + .scheduler + .schedule_keyed_event(deadline, func, arg, &self.address)?; + + Ok(event_key) + } + + /// Schedules a periodically recurring event at a future time. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz. + /// pub struct BeepingAlarmClock {} + /// + /// impl BeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { + /// if context.scheduler.schedule_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ).is_err() { + /// println!("The alarm clock can only be set for a future time"); + /// } + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for BeepingAlarmClock {} + /// ``` + pub fn schedule_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result<(), SchedulingError> + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + self.scheduler + .schedule_periodic_event(deadline, period, func, arg, &self.address) + } + + /// Schedules a cancellable, periodically recurring event at a future time + /// and returns an action key. + /// + /// An error is returned if the specified deadline is not in the future of + /// the current simulation time or if the specified period is null. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use asynchronix::model::{Context, Model}; + /// use asynchronix::simulation::ActionKey; + /// use asynchronix::time::MonotonicTime; + /// + /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or + /// // stopped after it sets off. + /// #[derive(Default)] + /// pub struct CancellableBeepingAlarmClock { + /// event_key: Option, + /// } + /// + /// impl CancellableBeepingAlarmClock { + /// // Sets an alarm [input port]. + /// pub fn set(&mut self, setting: MonotonicTime, context: &Context) { + /// self.cancel(); + /// match context.scheduler.schedule_keyed_periodic_event( + /// setting, + /// Duration::from_secs(1), // 1Hz = 1/1s + /// Self::beep, + /// () + /// ) { + /// Ok(event_key) => self.event_key = Some(event_key), + /// Err(_) => println!("The alarm clock can only be set for a future time"), + /// }; + /// } + /// + /// // Cancels or stops the alarm [input port]. + /// pub fn cancel(&mut self) { + /// self.event_key.take().map(|k| k.cancel()); + /// } + /// + /// // Emits a single beep [private input port]. + /// fn beep(&mut self) { + /// println!("Beep!"); + /// } + /// } + /// + /// impl Model for CancellableBeepingAlarmClock {} + /// ``` + pub fn schedule_keyed_periodic_event( + &self, + deadline: impl Deadline, + period: Duration, + func: F, + arg: T, + ) -> Result + where + F: for<'a> InputFn<'a, M, T, S> + Clone, + T: Send + Clone + 'static, + S: Send + 'static, + { + let event_key = self.scheduler.schedule_keyed_periodic_event( + deadline, + period, + func, + arg, + &self.address, + )?; + + Ok(event_key) + } +} + +impl Clone for LocalScheduler { + fn clone(&self) -> Self { + Self { + scheduler: self.scheduler.clone(), + address: self.address.clone(), + } + } +} + +impl fmt::Debug for LocalScheduler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalScheduler") + .field("time", &self.time()) + .field("address", &self.address) + .finish_non_exhaustive() + } +} /// Shorthand for the scheduler queue type. @@ -214,120 +738,6 @@ pub(crate) trait ActionInner: Send + 'static { fn spawn_and_forget(self: Box, executor: &Executor); } -/// Schedules an event at a future time. -/// -/// This function does not check whether the specified time lies in the future -/// of the current simulation time. -pub(crate) fn schedule_event_at_unchecked( - time: MonotonicTime, - func: F, - arg: T, - sender: Sender, - scheduler_queue: &Mutex, -) where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, -{ - let channel_id = sender.channel_id(); - - let action = Action::new(OnceAction::new(process_event(func, arg, sender))); - - let mut scheduler_queue = scheduler_queue.lock().unwrap(); - scheduler_queue.insert((time, channel_id), action); -} - -/// Schedules an event at a future time, returning an action key. -/// -/// This function does not check whether the specified time lies in the future -/// of the current simulation time. -pub(crate) fn schedule_keyed_event_at_unchecked( - time: MonotonicTime, - func: F, - arg: T, - sender: Sender, - scheduler_queue: &Mutex, -) -> ActionKey -where - M: Model, - F: for<'a> InputFn<'a, M, T, S>, - T: Send + Clone + 'static, - S: Send + 'static, -{ - let event_key = ActionKey::new(); - let channel_id = sender.channel_id(); - let action = Action::new(KeyedOnceAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - event_key.clone(), - )); - - let mut scheduler_queue = scheduler_queue.lock().unwrap(); - scheduler_queue.insert((time, channel_id), action); - - event_key -} - -/// Schedules a periodic event at a future time. -/// -/// This function does not check whether the specified time lies in the future -/// of the current simulation time. -pub(crate) fn schedule_periodic_event_at_unchecked( - time: MonotonicTime, - period: Duration, - func: F, - arg: T, - sender: Sender, - scheduler_queue: &Mutex, -) where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, -{ - let channel_id = sender.channel_id(); - - let action = Action::new(PeriodicAction::new( - || process_event(func, arg, sender), - period, - )); - - let mut scheduler_queue = scheduler_queue.lock().unwrap(); - scheduler_queue.insert((time, channel_id), action); -} - -/// Schedules an event at a future time, returning an action key. -/// -/// This function does not check whether the specified time lies in the future -/// of the current simulation time. -pub(crate) fn schedule_periodic_keyed_event_at_unchecked( - time: MonotonicTime, - period: Duration, - func: F, - arg: T, - sender: Sender, - scheduler_queue: &Mutex, -) -> ActionKey -where - M: Model, - F: for<'a> InputFn<'a, M, T, S> + Clone, - T: Send + Clone + 'static, - S: Send + 'static, -{ - let event_key = ActionKey::new(); - let channel_id = sender.channel_id(); - let action = Action::new(KeyedPeriodicAction::new( - |ek| send_keyed_event(ek, func, arg, sender), - period, - event_key.clone(), - )); - - let mut scheduler_queue = scheduler_queue.lock().unwrap(); - scheduler_queue.insert((time, channel_id), action); - - event_key -} - pin_project! { /// An object that can be converted to a future performing a single /// non-cancellable action. diff --git a/asynchronix/src/simulation/sim_init.rs b/asynchronix/src/simulation/sim_init.rs index f22e1fc..273fa52 100644 --- a/asynchronix/src/simulation/sim_init.rs +++ b/asynchronix/src/simulation/sim_init.rs @@ -8,7 +8,7 @@ use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; -use super::{add_model, Mailbox, SchedulerQueue, Simulation}; +use super::{add_model, Mailbox, Scheduler, SchedulerQueue, Simulation}; /// Builder for a multi-threaded, discrete-event simulation. pub struct SimInit { @@ -58,17 +58,8 @@ impl SimInit { mailbox: Mailbox, name: impl Into, ) -> Self { - let scheduler_queue = self.scheduler_queue.clone(); - let time = self.time.reader(); - - add_model( - model, - mailbox, - name.into(), - scheduler_queue, - time, - &self.executor, - ); + let scheduler = Scheduler::new(self.scheduler_queue.clone(), self.time.reader()); + add_model(model, mailbox, name.into(), scheduler, &self.executor); self } diff --git a/asynchronix/src/time.rs b/asynchronix/src/time.rs index df5a613..456ccfb 100644 --- a/asynchronix/src/time.rs +++ b/asynchronix/src/time.rs @@ -31,7 +31,7 @@ //! //! // Sets an alarm [input port]. //! pub fn set(&mut self, setting: MonotonicTime, context: &Context) { -//! if context.schedule_event(setting, Self::ring, ()).is_err() { +//! if context.scheduler.schedule_event(setting, Self::ring, ()).is_err() { //! println!("The alarm clock can only be set for a future time"); //! } //! } diff --git a/asynchronix/tests/model_scheduling.rs b/asynchronix/tests/model_scheduling.rs index 2a96408..5b70a1c 100644 --- a/asynchronix/tests/model_scheduling.rs +++ b/asynchronix/tests/model_scheduling.rs @@ -16,7 +16,12 @@ fn model_schedule_event() { impl TestModel { fn trigger(&mut self, _: (), context: &Context) { context - .schedule_event(context.time() + Duration::from_secs(2), Self::action, ()) + .scheduler + .schedule_event( + context.scheduler.time() + Duration::from_secs(2), + Self::action, + (), + ) .unwrap(); } async fn action(&mut self) { @@ -53,10 +58,20 @@ fn model_cancel_future_keyed_event() { impl TestModel { fn trigger(&mut self, _: (), context: &Context) { context - .schedule_event(context.time() + Duration::from_secs(1), Self::action1, ()) + .scheduler + .schedule_event( + context.scheduler.time() + Duration::from_secs(1), + Self::action1, + (), + ) .unwrap(); self.key = context - .schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ()) + .scheduler + .schedule_keyed_event( + context.scheduler.time() + Duration::from_secs(2), + Self::action2, + (), + ) .ok(); } async fn action1(&mut self) { @@ -99,10 +114,20 @@ fn model_cancel_same_time_keyed_event() { impl TestModel { fn trigger(&mut self, _: (), context: &Context) { context - .schedule_event(context.time() + Duration::from_secs(2), Self::action1, ()) + .scheduler + .schedule_event( + context.scheduler.time() + Duration::from_secs(2), + Self::action1, + (), + ) .unwrap(); self.key = context - .schedule_keyed_event(context.time() + Duration::from_secs(2), Self::action2, ()) + .scheduler + .schedule_keyed_event( + context.scheduler.time() + Duration::from_secs(2), + Self::action2, + (), + ) .ok(); } async fn action1(&mut self) { @@ -144,8 +169,9 @@ fn model_schedule_periodic_event() { impl TestModel { fn trigger(&mut self, _: (), context: &Context) { context + .scheduler .schedule_periodic_event( - context.time() + Duration::from_secs(2), + context.scheduler.time() + Duration::from_secs(2), Duration::from_secs(3), Self::action, 42, @@ -192,8 +218,9 @@ fn model_cancel_periodic_event() { impl TestModel { fn trigger(&mut self, _: (), context: &Context) { self.key = context + .scheduler .schedule_keyed_periodic_event( - context.time() + Duration::from_secs(2), + context.scheduler.time() + Duration::from_secs(2), Duration::from_secs(3), Self::action, (), diff --git a/asynchronix/tests/simulation_scheduling.rs b/asynchronix/tests/simulation_scheduling.rs index 3076931..e432317 100644 --- a/asynchronix/tests/simulation_scheduling.rs +++ b/asynchronix/tests/simulation_scheduling.rs @@ -48,16 +48,20 @@ fn simulation_schedule_events() { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(t0); + let scheduler = simu.scheduler(); + // Queue 2 events at t0+3s and t0+2s, in reverse order. - simu.schedule_event(Duration::from_secs(3), PassThroughModel::input, (), &addr) + scheduler + .schedule_event(Duration::from_secs(3), PassThroughModel::input, (), &addr) + .unwrap(); + scheduler + .schedule_event( + t0 + Duration::from_secs(2), + PassThroughModel::input, + (), + &addr, + ) .unwrap(); - simu.schedule_event( - t0 + Duration::from_secs(2), - PassThroughModel::input, - (), - &addr, - ) - .unwrap(); // Move to the 1st event at t0+2s. simu.step(); @@ -65,7 +69,8 @@ fn simulation_schedule_events() { assert!(output.next().is_some()); // Schedule another event in 4s (at t0+6s). - simu.schedule_event(Duration::from_secs(4), PassThroughModel::input, (), &addr) + scheduler + .schedule_event(Duration::from_secs(4), PassThroughModel::input, (), &addr) .unwrap(); // Move to the 2nd event at t0+3s. @@ -85,7 +90,9 @@ fn simulation_schedule_keyed_events() { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(t0); - let event_t1 = simu + let scheduler = simu.scheduler(); + + let event_t1 = scheduler .schedule_keyed_event( t0 + Duration::from_secs(1), PassThroughModel::input, @@ -94,11 +101,12 @@ fn simulation_schedule_keyed_events() { ) .unwrap(); - let event_t2_1 = simu + let event_t2_1 = scheduler .schedule_keyed_event(Duration::from_secs(2), PassThroughModel::input, 21, &addr) .unwrap(); - simu.schedule_event(Duration::from_secs(2), PassThroughModel::input, 22, &addr) + scheduler + .schedule_event(Duration::from_secs(2), PassThroughModel::input, 22, &addr) .unwrap(); // Move to the 1st event at t0+1. @@ -124,23 +132,27 @@ fn simulation_schedule_periodic_events() { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(t0); + let scheduler = simu.scheduler(); + // Queue 2 periodic events at t0 + 3s + k*2s. - simu.schedule_periodic_event( - Duration::from_secs(3), - Duration::from_secs(2), - PassThroughModel::input, - 1, - &addr, - ) - .unwrap(); - simu.schedule_periodic_event( - t0 + Duration::from_secs(3), - Duration::from_secs(2), - PassThroughModel::input, - 2, - &addr, - ) - .unwrap(); + scheduler + .schedule_periodic_event( + Duration::from_secs(3), + Duration::from_secs(2), + PassThroughModel::input, + 1, + &addr, + ) + .unwrap(); + scheduler + .schedule_periodic_event( + t0 + Duration::from_secs(3), + Duration::from_secs(2), + PassThroughModel::input, + 2, + &addr, + ) + .unwrap(); // Move to the next events at t0 + 3s + k*2s. for k in 0..10 { @@ -160,16 +172,19 @@ fn simulation_schedule_periodic_keyed_events() { let t0 = MonotonicTime::EPOCH; let (mut simu, addr, mut output) = passthrough_bench(t0); + let scheduler = simu.scheduler(); + // Queue 2 periodic events at t0 + 3s + k*2s. - simu.schedule_periodic_event( - Duration::from_secs(3), - Duration::from_secs(2), - PassThroughModel::input, - 1, - &addr, - ) - .unwrap(); - let event2_key = simu + scheduler + .schedule_periodic_event( + Duration::from_secs(3), + Duration::from_secs(2), + PassThroughModel::input, + 1, + &addr, + ) + .unwrap(); + let event2_key = scheduler .schedule_keyed_periodic_event( t0 + Duration::from_secs(3), Duration::from_secs(2), @@ -279,14 +294,17 @@ fn simulation_system_clock_from_instant() { let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock); + let scheduler = simu.scheduler(); + // Queue a single event at t0 + 0.1s. - simu.schedule_event( - Duration::from_secs_f64(0.1), - TimestampModel::trigger, - (), - &addr, - ) - .unwrap(); + scheduler + .schedule_event( + Duration::from_secs_f64(0.1), + TimestampModel::trigger, + (), + &addr, + ) + .unwrap(); // Check the stamps. for expected_time in [ @@ -333,14 +351,17 @@ fn simulation_system_clock_from_system_time() { let (mut simu, addr, mut stamp) = timestamp_bench(t0, clock); + let scheduler = simu.scheduler(); + // Queue a single event at t0 + 0.1s. - simu.schedule_event( - Duration::from_secs_f64(0.1), - TimestampModel::trigger, - (), - &addr, - ) - .unwrap(); + scheduler + .schedule_event( + Duration::from_secs_f64(0.1), + TimestampModel::trigger, + (), + &addr, + ) + .unwrap(); // Check the stamps. for expected_time in [ @@ -376,24 +397,28 @@ fn simulation_auto_system_clock() { let (mut simu, addr, mut stamp) = timestamp_bench(t0, AutoSystemClock::new()); let instant_t0 = Instant::now(); + let scheduler = simu.scheduler(); + // Queue a periodic event at t0 + 0.2s + k*0.2s. - simu.schedule_periodic_event( - Duration::from_secs_f64(0.2), - Duration::from_secs_f64(0.2), - TimestampModel::trigger, - (), - &addr, - ) - .unwrap(); + scheduler + .schedule_periodic_event( + Duration::from_secs_f64(0.2), + Duration::from_secs_f64(0.2), + TimestampModel::trigger, + (), + &addr, + ) + .unwrap(); // Queue a single event at t0 + 0.3s. - simu.schedule_event( - Duration::from_secs_f64(0.3), - TimestampModel::trigger, - (), - &addr, - ) - .unwrap(); + scheduler + .schedule_event( + Duration::from_secs_f64(0.3), + TimestampModel::trigger, + (), + &addr, + ) + .unwrap(); // Check the stamps. for expected_time in [0.0, 0.2, 0.3, 0.4, 0.6] {