From 525f708d5537829a76ff58c922257b44d6d0f69d Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Wed, 7 Aug 2024 17:36:51 +0200 Subject: [PATCH] Add tests for filter_map_connect (source & output) --- asynchronix/src/ports/output/broadcaster.rs | 241 ++++++++++++++++++-- asynchronix/src/ports/source/broadcaster.rs | 241 ++++++++++++++++++-- 2 files changed, 432 insertions(+), 50 deletions(-) diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index f3cc423..9611c8f 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -567,59 +567,68 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::model::Context; use crate::simulation::{Address, LocalScheduler, Scheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; - use super::super::sender::{InputSender, ReplierSender}; + use super::super::sender::{ + FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender, + }; use super::*; - use crate::model::Model; + use crate::model::{Context, Model}; - struct Counter { + struct SumModel { inner: Arc, } - impl Counter { + impl SumModel { fn new(counter: Arc) -> Self { Self { inner: counter } } - async fn inc(&mut self, by: usize) { + async fn increment(&mut self, by: usize) { self.inner.fetch_add(by, Ordering::Relaxed); } - async fn fetch_inc(&mut self, by: usize) -> usize { - let res = self.inner.fetch_add(by, Ordering::Relaxed); - res + } + impl Model for SumModel {} + + struct DoubleModel {} + impl DoubleModel { + fn new() -> Self { + Self {} + } + async fn double(&mut self, value: usize) -> usize { + 2 * value } } - impl Model for Counter {} + impl Model for DoubleModel {} #[test] fn broadcast_event_smoke() { const N_RECV: usize = 4; + const MESSAGE: usize = 42; let mut mailboxes = Vec::new(); let mut broadcaster = EventBroadcaster::default(); for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(InputSender::new(Counter::inc, address)); + let sender = Box::new(InputSender::new(SumModel::increment, address)); broadcaster.add(sender); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - block_on(broadcaster.broadcast(1)).unwrap(); + block_on(broadcaster.broadcast(MESSAGE)).unwrap(); }); - let counter = Arc::new(AtomicUsize::new(0)); + let sum = Arc::new(AtomicUsize::new(0)); let th_recv: Vec<_> = mailboxes .into_iter() .map(|mut mailbox| { thread::spawn({ - let mut counter = Counter::new(counter.clone()); + let mut sum_model = SumModel::new(sum.clone()); move || { let dummy_address = Receiver::new(1).sender(); @@ -633,7 +642,7 @@ mod tests { Address(dummy_address), ), ); - block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap(); } }) }) @@ -644,38 +653,116 @@ mod tests { th.join().unwrap(); } - assert_eq!(counter.load(Ordering::Relaxed), N_RECV); + assert_eq!(sum.load(Ordering::Relaxed), N_RECV * MESSAGE); + } + + #[test] + fn broadcast_event_filter_map() { + const N_RECV: usize = 4; + const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers. + + let mut mailboxes = Vec::new(); + let mut broadcaster = EventBroadcaster::default(); + for id in 0..N_RECV { + let mailbox = Receiver::new(10); + let address = mailbox.sender(); + let id_filter_sender = Box::new(FilterMapInputSender::new( + move |x| (x == id || x == BROADCAST_ALL).then_some(x), + SumModel::increment, + address, + )); + + broadcaster.add(id_filter_sender); + mailboxes.push(mailbox); + } + + let th_broadcast = thread::spawn(move || { + block_on(async { + // Send messages reaching only one receiver each. + for id in 0..N_RECV { + broadcaster.broadcast(id).await.unwrap(); + } + + // Broadcast the special value to all receivers. + broadcaster.broadcast(BROADCAST_ALL).await.unwrap(); + + // Send again messages reaching only one receiver each. + for id in 0..N_RECV { + broadcaster.broadcast(id).await.unwrap(); + } + }) + }); + + let sum = Arc::new(AtomicUsize::new(0)); + + // Spawn all models. + let th_recv: Vec<_> = mailboxes + .into_iter() + .map(|mut mailbox| { + thread::spawn({ + let mut sum_model = SumModel::new(sum.clone()); + + move || { + let dummy_address = Receiver::new(1).sender(); + let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); + let dummy_time = + SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_context = Context::new( + String::new(), + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), + ); + block_on(async { + mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + }); + } + }) + }) + .collect(); + + th_broadcast.join().unwrap(); + for th in th_recv { + th.join().unwrap(); + } + + assert_eq!( + sum.load(Ordering::Relaxed), + N_RECV * ((N_RECV - 1) + BROADCAST_ALL) // Twice the sum of all IDs + N_RECV times the special value + ); } #[test] fn broadcast_query_smoke() { const N_RECV: usize = 4; + const MESSAGE: usize = 42; let mut mailboxes = Vec::new(); let mut broadcaster = QueryBroadcaster::default(); for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); + let sender = Box::new(ReplierSender::new(DoubleModel::double, address)); broadcaster.add(sender); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - let iter = block_on(broadcaster.broadcast(1)).unwrap(); + let iter = block_on(broadcaster.broadcast(MESSAGE)).unwrap(); let sum = iter.fold(0, |acc, val| acc + val); - assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)} + sum }); - let counter = Arc::new(AtomicUsize::new(0)); - let th_recv: Vec<_> = mailboxes .into_iter() .map(|mut mailbox| { thread::spawn({ - let mut counter = Counter::new(counter.clone()); + let mut double_model = DoubleModel::new(); move || { let dummy_address = Receiver::new(1).sender(); @@ -689,19 +776,123 @@ mod tests { Address(dummy_address), ), ); - block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) }) .collect(); - th_broadcast.join().unwrap(); + let sum = th_broadcast.join().unwrap(); for th in th_recv { th.join().unwrap(); } - assert_eq!(counter.load(Ordering::Relaxed), N_RECV); + assert_eq!(sum, N_RECV * MESSAGE * 2); + } + + #[test] + fn broadcast_query_filter_map() { + const N_RECV: usize = 4; + const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers. + + let mut mailboxes = Vec::new(); + let mut broadcaster = QueryBroadcaster::default(); + for id in 0..N_RECV { + let mailbox = Receiver::new(10); + let address = mailbox.sender(); + let sender = Box::new(FilterMapReplierSender::new( + move |x| (x == id || x == BROADCAST_ALL).then_some(x), + |x| 3 * x, + DoubleModel::double, + address, + )); + + broadcaster.add(sender); + mailboxes.push(mailbox); + } + + let th_broadcast = thread::spawn(move || { + block_on(async { + let mut sum = 0; + + // Send messages reaching only one receiver each. + for id in 0..N_RECV { + sum += broadcaster + .broadcast(id) + .await + .unwrap() + .fold(0, |acc, val| acc + val); + } + + // Broadcast the special value to all receivers. + sum += broadcaster + .broadcast(BROADCAST_ALL) + .await + .unwrap() + .fold(0, |acc, val| acc + val); + + // Send again messages reaching only one receiver each. + for id in 0..N_RECV { + sum += broadcaster + .broadcast(id) + .await + .unwrap() + .fold(0, |acc, val| acc + val); + } + + sum + }) + }); + + let th_recv: Vec<_> = mailboxes + .into_iter() + .map(|mut mailbox| { + thread::spawn({ + let mut double_model = DoubleModel::new(); + + move || { + let dummy_address = Receiver::new(1).sender(); + let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); + let dummy_time = + SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_context = Context::new( + String::new(), + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), + ); + + block_on(async { + mailbox + .recv(&mut double_model, &dummy_context) + .await + .unwrap(); + mailbox + .recv(&mut double_model, &dummy_context) + .await + .unwrap(); + mailbox + .recv(&mut double_model, &dummy_context) + .await + .unwrap(); + }); + thread::sleep(std::time::Duration::from_millis(100)); + } + }) + }) + .collect(); + + let sum = th_broadcast.join().unwrap(); + for th in th_recv { + th.join().unwrap(); + } + + assert_eq!( + sum, + N_RECV * ((N_RECV - 1) + BROADCAST_ALL) * 2 * 3, // Twice the sum of all IDs + N_RECV times the special value, then doubled and tripled + ); } } diff --git a/asynchronix/src/ports/source/broadcaster.rs b/asynchronix/src/ports/source/broadcaster.rs index bb36054..4291a54 100644 --- a/asynchronix/src/ports/source/broadcaster.rs +++ b/asynchronix/src/ports/source/broadcaster.rs @@ -469,59 +469,68 @@ mod tests { use futures_executor::block_on; use crate::channel::Receiver; - use crate::model::Context; use crate::simulation::{Address, LocalScheduler, Scheduler}; use crate::time::{MonotonicTime, TearableAtomicTime}; use crate::util::priority_queue::PriorityQueue; use crate::util::sync_cell::SyncCell; - use super::super::sender::{InputSender, ReplierSender}; + use super::super::sender::{ + FilterMapInputSender, FilterMapReplierSender, InputSender, ReplierSender, + }; use super::*; - use crate::model::Model; + use crate::model::{Context, Model}; - struct Counter { + struct SumModel { inner: Arc, } - impl Counter { + impl SumModel { fn new(counter: Arc) -> Self { Self { inner: counter } } - async fn inc(&mut self, by: usize) { + async fn increment(&mut self, by: usize) { self.inner.fetch_add(by, Ordering::Relaxed); } - async fn fetch_inc(&mut self, by: usize) -> usize { - let res = self.inner.fetch_add(by, Ordering::Relaxed); - res + } + impl Model for SumModel {} + + struct DoubleModel {} + impl DoubleModel { + fn new() -> Self { + Self {} + } + async fn double(&mut self, value: usize) -> usize { + 2 * value } } - impl Model for Counter {} + impl Model for DoubleModel {} #[test] fn broadcast_event_smoke() { const N_RECV: usize = 4; + const MESSAGE: usize = 42; let mut mailboxes = Vec::new(); let mut broadcaster = EventBroadcaster::default(); for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(InputSender::new(Counter::inc, address)); + let sender = Box::new(InputSender::new(SumModel::increment, address)); broadcaster.add(sender); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - block_on(broadcaster.broadcast(1)).unwrap(); + block_on(broadcaster.broadcast(MESSAGE)).unwrap(); }); - let counter = Arc::new(AtomicUsize::new(0)); + let sum = Arc::new(AtomicUsize::new(0)); let th_recv: Vec<_> = mailboxes .into_iter() .map(|mut mailbox| { thread::spawn({ - let mut counter = Counter::new(counter.clone()); + let mut sum_model = SumModel::new(sum.clone()); move || { let dummy_address = Receiver::new(1).sender(); @@ -535,7 +544,7 @@ mod tests { Address(dummy_address), ), ); - block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut sum_model, &dummy_context)).unwrap(); } }) }) @@ -546,38 +555,116 @@ mod tests { th.join().unwrap(); } - assert_eq!(counter.load(Ordering::Relaxed), N_RECV); + assert_eq!(sum.load(Ordering::Relaxed), N_RECV * MESSAGE); + } + + #[test] + fn broadcast_event_filter_map() { + const N_RECV: usize = 4; + const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers. + + let mut mailboxes = Vec::new(); + let mut broadcaster = EventBroadcaster::default(); + for id in 0..N_RECV { + let mailbox = Receiver::new(10); + let address = mailbox.sender(); + let id_filter_sender = Box::new(FilterMapInputSender::new( + move |x| (x == id || x == BROADCAST_ALL).then_some(x), + SumModel::increment, + address, + )); + + broadcaster.add(id_filter_sender); + mailboxes.push(mailbox); + } + + let th_broadcast = thread::spawn(move || { + block_on(async { + // Send messages reaching only one receiver each. + for id in 0..N_RECV { + broadcaster.broadcast(id).await.unwrap(); + } + + // Broadcast the special value to all receivers. + broadcaster.broadcast(BROADCAST_ALL).await.unwrap(); + + // Send again messages reaching only one receiver each. + for id in 0..N_RECV { + broadcaster.broadcast(id).await.unwrap(); + } + }) + }); + + let sum = Arc::new(AtomicUsize::new(0)); + + // Spawn all models. + let th_recv: Vec<_> = mailboxes + .into_iter() + .map(|mut mailbox| { + thread::spawn({ + let mut sum_model = SumModel::new(sum.clone()); + + move || { + let dummy_address = Receiver::new(1).sender(); + let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); + let dummy_time = + SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_context = Context::new( + String::new(), + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), + ); + block_on(async { + mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + mailbox.recv(&mut sum_model, &dummy_context).await.unwrap(); + }); + } + }) + }) + .collect(); + + th_broadcast.join().unwrap(); + for th in th_recv { + th.join().unwrap(); + } + + assert_eq!( + sum.load(Ordering::Relaxed), + N_RECV * ((N_RECV - 1) + BROADCAST_ALL) // Twice the sum of all IDs + N_RECV times the special value + ); } #[test] fn broadcast_query_smoke() { const N_RECV: usize = 4; + const MESSAGE: usize = 42; let mut mailboxes = Vec::new(); let mut broadcaster = QueryBroadcaster::default(); for _ in 0..N_RECV { let mailbox = Receiver::new(10); let address = mailbox.sender(); - let sender = Box::new(ReplierSender::new(Counter::fetch_inc, address)); + let sender = Box::new(ReplierSender::new(DoubleModel::double, address)); broadcaster.add(sender); mailboxes.push(mailbox); } let th_broadcast = thread::spawn(move || { - let iter = block_on(broadcaster.broadcast(1)).unwrap(); + let iter = block_on(broadcaster.broadcast(MESSAGE)).unwrap(); let sum = iter.fold(0, |acc, val| acc + val); - assert_eq!(sum, N_RECV * (N_RECV - 1) / 2); // sum of {0, 1, 2, ..., (N_RECV - 1)} + sum }); - let counter = Arc::new(AtomicUsize::new(0)); - let th_recv: Vec<_> = mailboxes .into_iter() .map(|mut mailbox| { thread::spawn({ - let mut counter = Counter::new(counter.clone()); + let mut double_model = DoubleModel::new(); move || { let dummy_address = Receiver::new(1).sender(); @@ -591,19 +678,123 @@ mod tests { Address(dummy_address), ), ); - block_on(mailbox.recv(&mut counter, &dummy_context)).unwrap(); + block_on(mailbox.recv(&mut double_model, &dummy_context)).unwrap(); thread::sleep(std::time::Duration::from_millis(100)); } }) }) .collect(); - th_broadcast.join().unwrap(); + let sum = th_broadcast.join().unwrap(); for th in th_recv { th.join().unwrap(); } - assert_eq!(counter.load(Ordering::Relaxed), N_RECV); + assert_eq!(sum, N_RECV * MESSAGE * 2); + } + + #[test] + fn broadcast_query_filter_map() { + const N_RECV: usize = 4; + const BROADCAST_ALL: usize = 42; // special ID signaling that the message must reach all receivers. + + let mut mailboxes = Vec::new(); + let mut broadcaster = QueryBroadcaster::default(); + for id in 0..N_RECV { + let mailbox = Receiver::new(10); + let address = mailbox.sender(); + let sender = Box::new(FilterMapReplierSender::new( + move |x| (x == id || x == BROADCAST_ALL).then_some(x), + |x| 3 * x, + DoubleModel::double, + address, + )); + + broadcaster.add(sender); + mailboxes.push(mailbox); + } + + let th_broadcast = thread::spawn(move || { + block_on(async { + let mut sum = 0; + + // Send messages reaching only one receiver each. + for id in 0..N_RECV { + sum += broadcaster + .broadcast(id) + .await + .unwrap() + .fold(0, |acc, val| acc + val); + } + + // Broadcast the special value to all receivers. + sum += broadcaster + .broadcast(BROADCAST_ALL) + .await + .unwrap() + .fold(0, |acc, val| acc + val); + + // Send again messages reaching only one receiver each. + for id in 0..N_RECV { + sum += broadcaster + .broadcast(id) + .await + .unwrap() + .fold(0, |acc, val| acc + val); + } + + sum + }) + }); + + let th_recv: Vec<_> = mailboxes + .into_iter() + .map(|mut mailbox| { + thread::spawn({ + let mut double_model = DoubleModel::new(); + + move || { + let dummy_address = Receiver::new(1).sender(); + let dummy_priority_queue = Arc::new(Mutex::new(PriorityQueue::new())); + let dummy_time = + SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader(); + let dummy_context = Context::new( + String::new(), + LocalScheduler::new( + Scheduler::new(dummy_priority_queue, dummy_time), + Address(dummy_address), + ), + ); + + block_on(async { + mailbox + .recv(&mut double_model, &dummy_context) + .await + .unwrap(); + mailbox + .recv(&mut double_model, &dummy_context) + .await + .unwrap(); + mailbox + .recv(&mut double_model, &dummy_context) + .await + .unwrap(); + }); + thread::sleep(std::time::Duration::from_millis(100)); + } + }) + }) + .collect(); + + let sum = th_broadcast.join().unwrap(); + for th in th_recv { + th.join().unwrap(); + } + + assert_eq!( + sum, + N_RECV * ((N_RECV - 1) + BROADCAST_ALL) * 2 * 3, // Twice the sum of all IDs + N_RECV times the special value, then doubled and tripled + ); } }