1
0
forked from ROMEO/nexosim

Add tests for filter_map_connect (source & output)

This commit is contained in:
Serge Barral 2024-08-07 17:36:51 +02:00
parent c4d93f5c31
commit 525f708d55
2 changed files with 432 additions and 50 deletions

View File

@ -567,59 +567,68 @@ mod tests {
use futures_executor::block_on;
use crate::channel::Receiver;
use crate::model::Context;
use crate::simulation::{Address, LocalScheduler, Scheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::super::sender::{InputSender, ReplierSender};
use super::super::sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender,
};
use super::*;
use crate::model::Model;
use crate::model::{Context, Model};
struct Counter {
struct SumModel {
inner: Arc<AtomicUsize>,
}
impl Counter {
impl SumModel {
fn new(counter: Arc<AtomicUsize>) -> Self {
Self { inner: counter }
}
async fn inc(&mut self, by: usize) {
async fn increment(&mut self, by: usize) {
self.inner.fetch_add(by, Ordering::Relaxed);
}
async fn fetch_inc(&mut self, by: usize) -> usize {
let res = self.inner.fetch_add(by, Ordering::Relaxed);
res
}
impl Model for SumModel {}
struct DoubleModel {}
impl DoubleModel {
fn new() -> Self {
Self {}
}
async fn double(&mut self, value: usize) -> usize {
2 * value
}
}
impl Model for Counter {}
impl Model for DoubleModel {}
#[test]
fn broadcast_event_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;
let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address));
let sender = Box::new(InputSender::new(SumModel::increment, address));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(broadcaster.broadcast(1)).unwrap();
block_on(broadcaster.broadcast(MESSAGE)).unwrap();
});
let counter = Arc::new(AtomicUsize::new(0));
let sum = Arc::new(AtomicUsize::new(0));
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut sum_model = SumModel::new(sum.clone());
move || {
let dummy_address = Receiver::new(1).sender();
@ -633,7 +642,7 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap();
}
})
})
@ -644,38 +653,116 @@ mod tests {
th.join().unwrap();
}
assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(sum.load(Ordering::Relaxed), N_RECV * MESSAGE);
}
#[test]
fn broadcast_event_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.
let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
SumModel::increment,
address,
));
broadcaster.add(id_filter_sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(async {
// Send messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}
// Broadcast the special value to all receivers.
broadcaster.broadcast(BROADCAST_ALL).await.unwrap();
// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}
})
});
let sum = Arc::new(AtomicUsize::new(0));
// Spawn all models.
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut sum_model = SumModel::new(sum.clone());
move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(async {
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
});
}
})
})
.collect();
th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}
assert_eq!(
sum.load(Ordering::Relaxed),
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) // Twice the sum of all IDs + N_RECV times the special value
);
}
#[test]
fn broadcast_query_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;
let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
let sender = Box::new(ReplierSender::new(DoubleModel::double, address));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
let iter = block_on(broadcaster.broadcast(1)).unwrap();
let iter = block_on(broadcaster.broadcast(MESSAGE)).unwrap();
let sum = iter.fold(0, |acc, val| acc + val);
assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)}
sum
});
let counter = Arc::new(AtomicUsize::new(0));
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut double_model = DoubleModel::new();
move || {
let dummy_address = Receiver::new(1).sender();
@ -689,19 +776,123 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();
th_broadcast.join().unwrap();
let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}
assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(sum, N_RECV * MESSAGE * 2);
}
#[test]
fn broadcast_query_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.
let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|x| 3 * x,
DoubleModel::double,
address,
));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(async {
let mut sum = 0;
// Send messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}
// Broadcast the special value to all receivers.
sum += broadcaster
.broadcast(BROADCAST_ALL)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}
sum
})
});
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut double_model = DoubleModel::new();
move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(async {
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
});
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();
let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}
assert_eq!(
sum,
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) * 2 * 3, // Twice the sum of all IDs + N_RECV times the special value, then doubled and tripled
);
}
}

View File

@ -469,59 +469,68 @@ mod tests {
use futures_executor::block_on;
use crate::channel::Receiver;
use crate::model::Context;
use crate::simulation::{Address, LocalScheduler, Scheduler};
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCell;
use super::super::sender::{InputSender, ReplierSender};
use super::super::sender::{
FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender,
};
use super::*;
use crate::model::Model;
use crate::model::{Context, Model};
struct Counter {
struct SumModel {
inner: Arc<AtomicUsize>,
}
impl Counter {
impl SumModel {
fn new(counter: Arc<AtomicUsize>) -> Self {
Self { inner: counter }
}
async fn inc(&mut self, by: usize) {
async fn increment(&mut self, by: usize) {
self.inner.fetch_add(by, Ordering::Relaxed);
}
async fn fetch_inc(&mut self, by: usize) -> usize {
let res = self.inner.fetch_add(by, Ordering::Relaxed);
res
}
impl Model for SumModel {}
struct DoubleModel {}
impl DoubleModel {
fn new() -> Self {
Self {}
}
async fn double(&mut self, value: usize) -> usize {
2 * value
}
}
impl Model for Counter {}
impl Model for DoubleModel {}
#[test]
fn broadcast_event_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;
let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(InputSender::new(Counter::inc, address));
let sender = Box::new(InputSender::new(SumModel::increment, address));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(broadcaster.broadcast(1)).unwrap();
block_on(broadcaster.broadcast(MESSAGE)).unwrap();
});
let counter = Arc::new(AtomicUsize::new(0));
let sum = Arc::new(AtomicUsize::new(0));
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut sum_model = SumModel::new(sum.clone());
move || {
let dummy_address = Receiver::new(1).sender();
@ -535,7 +544,7 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap();
}
})
})
@ -546,38 +555,116 @@ mod tests {
th.join().unwrap();
}
assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(sum.load(Ordering::Relaxed), N_RECV * MESSAGE);
}
#[test]
fn broadcast_event_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.
let mut mailboxes = Vec::new();
let mut broadcaster = EventBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let id_filter_sender = Box::new(FilterMapInputSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
SumModel::increment,
address,
));
broadcaster.add(id_filter_sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(async {
// Send messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}
// Broadcast the special value to all receivers.
broadcaster.broadcast(BROADCAST_ALL).await.unwrap();
// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
broadcaster.broadcast(id).await.unwrap();
}
})
});
let sum = Arc::new(AtomicUsize::new(0));
// Spawn all models.
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut sum_model = SumModel::new(sum.clone());
move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(async {
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
mailbox.recv(&mut sum_model, &dummy_context).await.unwrap();
});
}
})
})
.collect();
th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}
assert_eq!(
sum.load(Ordering::Relaxed),
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) // Twice the sum of all IDs + N_RECV times the special value
);
}
#[test]
fn broadcast_query_smoke() {
const N_RECV: usize = 4;
const MESSAGE: usize = 42;
let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for _ in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address));
let sender = Box::new(ReplierSender::new(DoubleModel::double, address));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
let iter = block_on(broadcaster.broadcast(1)).unwrap();
let iter = block_on(broadcaster.broadcast(MESSAGE)).unwrap();
let sum = iter.fold(0, |acc, val| acc + val);
assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)}
sum
});
let counter = Arc::new(AtomicUsize::new(0));
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut counter = Counter::new(counter.clone());
let mut double_model = DoubleModel::new();
move || {
let dummy_address = Receiver::new(1).sender();
@ -591,19 +678,123 @@ mod tests {
Address(dummy_address),
),
);
block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap();
block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();
th_broadcast.join().unwrap();
let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}
assert_eq!(counter.load(Ordering::Relaxed), N_RECV);
assert_eq!(sum, N_RECV * MESSAGE * 2);
}
#[test]
fn broadcast_query_filter_map() {
const N_RECV: usize = 4;
const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers.
let mut mailboxes = Vec::new();
let mut broadcaster = QueryBroadcaster::default();
for id in 0..N_RECV {
let mailbox = Receiver::new(10);
let address = mailbox.sender();
let sender = Box::new(FilterMapReplierSender::new(
move |x| (x == id || x == BROADCAST_ALL).then_some(x),
|x| 3 * x,
DoubleModel::double,
address,
));
broadcaster.add(sender);
mailboxes.push(mailbox);
}
let th_broadcast = thread::spawn(move || {
block_on(async {
let mut sum = 0;
// Send messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}
// Broadcast the special value to all receivers.
sum += broadcaster
.broadcast(BROADCAST_ALL)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
// Send again messages reaching only one receiver each.
for id in 0..N_RECV {
sum += broadcaster
.broadcast(id)
.await
.unwrap()
.fold(0, |acc, val| acc + val);
}
sum
})
});
let th_recv: Vec<_> = mailboxes
.into_iter()
.map(|mut mailbox| {
thread::spawn({
let mut double_model = DoubleModel::new();
move || {
let dummy_address = Receiver::new(1).sender();
let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new()));
let dummy_time =
SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_context = Context::new(
String::new(),
LocalScheduler::new(
Scheduler::new(dummy_priority_queue, dummy_time),
Address(dummy_address),
),
);
block_on(async {
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
mailbox
.recv(&mut double_model, &dummy_context)
.await
.unwrap();
});
thread::sleep(std::time::Duration::from_millis(100));
}
})
})
.collect();
let sum = th_broadcast.join().unwrap();
for th in th_recv {
th.join().unwrap();
}
assert_eq!(
sum,
N_RECV * ((N_RECV - 1) + BROADCAST_ALL) * 2 * 3, // Twice the sum of all IDs + N_RECV times the special value, then doubled and tripled
);
}
}