forked from ROMEO/nexosim
Merge pull request #69 from asynchronics/feature-unirequestor
Add UniRequestor port
This commit is contained in:
commit
d88c527cb7
@ -25,8 +25,8 @@ where
|
||||
|
||||
/// Observable state.
|
||||
///
|
||||
/// This object encapsulates state. Every state change is propagated to the
|
||||
/// output.
|
||||
/// This object encapsulates state. Every state change access is propagated to
|
||||
/// the output.
|
||||
#[derive(Debug)]
|
||||
pub struct ObservableState<S, T>
|
||||
where
|
||||
|
@ -63,6 +63,7 @@ tracing-subscriber = { version= "0.3.18", optional = true }
|
||||
futures-util = "0.3"
|
||||
futures-executor = "0.3"
|
||||
tracing-subscriber = { version= "0.3.18", features=["env-filter"] }
|
||||
nexosim-util = {version = "0.1.0", path = "../nexosim-util"}
|
||||
|
||||
[target.'cfg(nexosim_loom)'.dev-dependencies]
|
||||
loom = "0.7"
|
||||
|
174
nexosim/examples/uni_requestor.rs
Normal file
174
nexosim/examples/uni_requestor.rs
Normal file
@ -0,0 +1,174 @@
|
||||
//! Example: sensor reading data from the environment model.
|
||||
//!
|
||||
//! This example demonstrates in particular:
|
||||
//!
|
||||
//! * cyclical self-scheduling methods,
|
||||
//! * model initialization,
|
||||
//! * simulation monitoring with buffered event sinks,
|
||||
//! * connection with mapping,
|
||||
//! * UniRequestor port.
|
||||
//!
|
||||
//! ```text
|
||||
//! ┌─────────────┐ ┌──────────┐
|
||||
//! │ │ temperature │ │ overheat
|
||||
//! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────►
|
||||
//! │ │ │ │ state
|
||||
//! └─────────────┘ └──────────┘
|
||||
//! ```
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use nexosim_util::observables::ObservableValue;
|
||||
|
||||
use nexosim::model::{Context, InitializedModel, Model};
|
||||
use nexosim::ports::{EventBuffer, Output, UniRequestor};
|
||||
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
|
||||
use nexosim::time::MonotonicTime;
|
||||
|
||||
/// Sensor model
|
||||
pub struct Sensor {
|
||||
/// Temperature [deg C] -- requestor port.
|
||||
pub temp: UniRequestor<(), f64>,
|
||||
|
||||
/// Overheat detection [-] -- output port.
|
||||
pub overheat: Output<bool>,
|
||||
|
||||
/// Temperature threshold [deg C] -- parameter.
|
||||
threshold: f64,
|
||||
|
||||
/// Overheat detection [-] -- observable state.
|
||||
oh: ObservableValue<bool>,
|
||||
}
|
||||
|
||||
impl Sensor {
|
||||
/// Creates new Sensor with overheat threshold set [deg C].
|
||||
pub fn new(threshold: f64, temp: UniRequestor<(), f64>) -> Self {
|
||||
let overheat = Output::new();
|
||||
Self {
|
||||
temp,
|
||||
overheat: overheat.clone(),
|
||||
threshold,
|
||||
oh: ObservableValue::new(overheat),
|
||||
}
|
||||
}
|
||||
|
||||
/// Cyclically scheduled method that reads data from environment and
|
||||
/// avaluates overheat state.
|
||||
pub async fn tick(&mut self) {
|
||||
let temp = self.temp.send(()).await.unwrap();
|
||||
if temp > self.threshold {
|
||||
if !self.oh.get() {
|
||||
self.oh.set(true).await;
|
||||
}
|
||||
} else if *self.oh.get() {
|
||||
self.oh.set(false).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Model for Sensor {
|
||||
/// Propagate state and schedule cyclic method.
|
||||
async fn init(mut self, context: &mut Context<Self>) -> InitializedModel<Self> {
|
||||
self.oh.propagate().await;
|
||||
|
||||
context
|
||||
.schedule_periodic_event(
|
||||
Duration::from_millis(500),
|
||||
Duration::from_millis(500),
|
||||
Self::tick,
|
||||
(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
self.into()
|
||||
}
|
||||
}
|
||||
|
||||
/// Environment model.
|
||||
pub struct Env {
|
||||
/// Temperature [deg F] -- internal state.
|
||||
temp: f64,
|
||||
}
|
||||
|
||||
impl Env {
|
||||
/// Creates new environment model with the temperature [deg F] set.
|
||||
pub fn new(temp: f64) -> Self {
|
||||
Self { temp }
|
||||
}
|
||||
|
||||
/// Sets temperature [deg F].
|
||||
pub async fn set_temp(&mut self, temp: f64) {
|
||||
self.temp = temp;
|
||||
}
|
||||
|
||||
/// Gets temperature [deg F].
|
||||
pub async fn get_temp(&mut self, _: ()) -> f64 {
|
||||
self.temp
|
||||
}
|
||||
}
|
||||
|
||||
impl Model for Env {}
|
||||
|
||||
/// Converts Fahrenheit to Celsius.
|
||||
pub fn fahr_to_cels(t: f64) -> f64 {
|
||||
5.0 * (t - 32.0) / 9.0
|
||||
}
|
||||
|
||||
fn main() -> Result<(), SimulationError> {
|
||||
// ---------------
|
||||
// Bench assembly.
|
||||
// ---------------
|
||||
|
||||
// Mailboxes.
|
||||
let sensor_mbox = Mailbox::new();
|
||||
let env_mbox = Mailbox::new();
|
||||
|
||||
// Connect data line and convert Fahrenheit degrees to Celsius.
|
||||
let temp_req = UniRequestor::with_map(|x| *x, fahr_to_cels, Env::get_temp, &env_mbox);
|
||||
|
||||
// Models.
|
||||
let mut sensor = Sensor::new(100.0, temp_req);
|
||||
let env = Env::new(0.0);
|
||||
|
||||
// Model handles for simulation.
|
||||
let env_addr = env_mbox.address();
|
||||
|
||||
let mut overheat = EventBuffer::new();
|
||||
sensor.overheat.connect_sink(&overheat);
|
||||
|
||||
// Start time (arbitrary since models do not depend on absolute time).
|
||||
let t0 = MonotonicTime::EPOCH;
|
||||
|
||||
// Assembly and initialization.
|
||||
let (mut simu, scheduler) = SimInit::new()
|
||||
.add_model(sensor, sensor_mbox, "sensor")
|
||||
.add_model(env, env_mbox, "env")
|
||||
.init(t0)?;
|
||||
|
||||
// ----------
|
||||
// Simulation.
|
||||
// ----------
|
||||
|
||||
// Check initial conditions.
|
||||
assert_eq!(simu.time(), t0);
|
||||
assert_eq!(overheat.next(), Some(false));
|
||||
assert!(overheat.next().is_none());
|
||||
|
||||
// Change temperature in 2s.
|
||||
scheduler
|
||||
.schedule_event(Duration::from_secs(2), Env::set_temp, 105.0, &env_addr)
|
||||
.unwrap();
|
||||
|
||||
// Change temperature in 4s.
|
||||
scheduler
|
||||
.schedule_event(Duration::from_secs(4), Env::set_temp, 213.0, &env_addr)
|
||||
.unwrap();
|
||||
|
||||
simu.step_until(Duration::from_secs(3))?;
|
||||
assert!(overheat.next().is_none());
|
||||
|
||||
simu.step_until(Duration::from_secs(5))?;
|
||||
assert_eq!(overheat.next(), Some(true));
|
||||
|
||||
Ok(())
|
||||
}
|
@ -86,7 +86,7 @@ mod source;
|
||||
|
||||
pub use input::markers;
|
||||
pub use input::{InputFn, ReplierFn};
|
||||
pub use output::{Output, Requestor};
|
||||
pub use output::{Output, Requestor, UniRequestor};
|
||||
pub use sink::{
|
||||
event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter,
|
||||
};
|
||||
|
@ -10,7 +10,7 @@ use crate::simulation::Address;
|
||||
use crate::util::cached_rw_lock::CachedRwLock;
|
||||
|
||||
use broadcaster::{EventBroadcaster, QueryBroadcaster};
|
||||
use sender::FilterMapReplierSender;
|
||||
use sender::{FilterMapReplierSender, Sender};
|
||||
|
||||
use self::sender::{
|
||||
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
|
||||
@ -300,3 +300,117 @@ impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for Requestor<T, R
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A requestor port with exactly one connection.
|
||||
///
|
||||
/// A `UniRequestor` port is connected to a unique replier port, i.e. to an
|
||||
/// asynchronous model method that returns a value.
|
||||
#[derive(Clone)]
|
||||
pub struct UniRequestor<T: Clone + Send + 'static, R: Send + 'static> {
|
||||
sender: Box<dyn Sender<T, R>>,
|
||||
}
|
||||
|
||||
impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
|
||||
/// Creates a new `UniRequestor` port connected to a replier port of the model
|
||||
/// specified by the address.
|
||||
///
|
||||
/// The replier port must be an asynchronous method of a model of type `M`
|
||||
/// returning a value of type `R` and taking as argument a value of type `T`
|
||||
/// plus, optionally, a context reference.
|
||||
pub fn new<M, F, S>(replier: F, address: impl Into<Address<M>>) -> Self
|
||||
where
|
||||
M: Model,
|
||||
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
|
||||
S: Send + 'static,
|
||||
{
|
||||
let sender = Box::new(ReplierSender::new(replier, address.into().0));
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Creates a new `UniRequestor` port connected with auto-conversion to a
|
||||
/// replier port of the model specified by the address.
|
||||
///
|
||||
/// Queries and replies are mapped to other types using the closures
|
||||
/// provided in argument.
|
||||
///
|
||||
/// The replier port must be an asynchronous method of a model of type `M`
|
||||
/// returning a value of the type returned by the reply mapping closure and
|
||||
/// taking as argument a value of the type returned by the query mapping
|
||||
/// closure plus, optionally, a context reference.
|
||||
pub fn with_map<M, C, D, F, U, Q, S>(
|
||||
query_map: C,
|
||||
reply_map: D,
|
||||
replier: F,
|
||||
address: impl Into<Address<M>>,
|
||||
) -> Self
|
||||
where
|
||||
M: Model,
|
||||
C: Fn(&T) -> U + Send + Sync + 'static,
|
||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||
U: Send + 'static,
|
||||
Q: Send + 'static,
|
||||
S: Send + 'static,
|
||||
{
|
||||
let sender = Box::new(MapReplierSender::new(
|
||||
query_map,
|
||||
reply_map,
|
||||
replier,
|
||||
address.into().0,
|
||||
));
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Creates a new `UniRequestor` port connected with filtering and
|
||||
/// auto-conversion to a replier port of the model specified by the address.
|
||||
///
|
||||
/// Queries and replies are mapped to other types using the closures
|
||||
/// provided in argument, or ignored if the query closure returns `None`.
|
||||
///
|
||||
/// The replier port must be an asynchronous method of a model of type `M`
|
||||
/// returning a value of the type returned by the reply mapping closure and
|
||||
/// taking as argument a value of the type returned by the query mapping
|
||||
/// closure plus, optionally, a context reference.
|
||||
pub fn with_filter_map<M, C, D, F, U, Q, S>(
|
||||
query_filer_map: C,
|
||||
reply_map: D,
|
||||
replier: F,
|
||||
address: impl Into<Address<M>>,
|
||||
) -> Self
|
||||
where
|
||||
M: Model,
|
||||
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
|
||||
D: Fn(Q) -> R + Send + Sync + 'static,
|
||||
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
|
||||
U: Send + 'static,
|
||||
Q: Send + 'static,
|
||||
S: Send + 'static,
|
||||
{
|
||||
let sender = Box::new(FilterMapReplierSender::new(
|
||||
query_filer_map,
|
||||
reply_map,
|
||||
replier,
|
||||
address.into().0,
|
||||
));
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Sends a query to the connected replier port.
|
||||
pub async fn send(&mut self, arg: T) -> Option<R> {
|
||||
if let Some(fut) = self.sender.send_owned(arg) {
|
||||
let output = fut.await.unwrap();
|
||||
Some(output)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for UniRequestor<T, R> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "UniRequestor")
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user