From abab030b4a5f0ece91358cdcf918d62a42616646 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Mon, 17 Oct 2022 12:29:26 +0200 Subject: [PATCH] Restrict visibility of many items to `super` --- asynchronix/src/runtime/executor/pool.rs | 40 +++++++++++----------- asynchronix/src/runtime/executor/queue.rs | 26 +++++++------- asynchronix/src/runtime/executor/worker.rs | 8 ++--- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/asynchronix/src/runtime/executor/pool.rs b/asynchronix/src/runtime/executor/pool.rs index 1b076d8..8b8d278 100644 --- a/asynchronix/src/runtime/executor/pool.rs +++ b/asynchronix/src/runtime/executor/pool.rs @@ -9,10 +9,10 @@ use super::{GlobalQueue, Stealer}; /// A view of the thread pool shared between the executor and all workers. #[derive(Debug)] -pub(crate) struct Pool { - pub(crate) global_queue: GlobalQueue, - pub(crate) executor_id: usize, - pub(crate) executor_unparker: parking::Unparker, +pub(super) struct Pool { + pub(super) global_queue: GlobalQueue, + pub(super) executor_id: usize, + pub(super) executor_unparker: parking::Unparker, registry: PoolRegistry, stealers: Box<[Stealer]>, worker_unparkers: Box<[parking::Unparker]>, @@ -23,7 +23,7 @@ pub(crate) struct Pool { impl Pool { /// Creates a new pool. - pub(crate) fn new( + pub(super) fn new( executor_id: usize, executor_unparker: parking::Unparker, shared_data: impl Iterator, @@ -47,14 +47,14 @@ impl Pool { /// Marks all pool workers as active. /// /// Unparking the worker threads is the responsibility of the caller. - pub(crate) fn set_all_workers_active(&self) { + pub(super) fn set_all_workers_active(&self) { self.registry.set_all_active(); } /// Marks all pool workers as inactive. /// /// Unparking the executor threads is the responsibility of the caller. - pub(crate) fn set_all_workers_inactive(&self) { + pub(super) fn set_all_workers_inactive(&self) { self.registry.set_all_inactive(); } @@ -66,7 +66,7 @@ impl Pool { /// If this was the last active worker, `false` is returned and it is /// guaranteed that all memory operations performed by threads that called /// `activate_worker` will be visible. - pub(crate) fn try_set_worker_inactive(&self, worker_id: usize) -> bool { + pub(super) fn try_set_worker_inactive(&self, worker_id: usize) -> bool { self.registry.try_set_inactive(worker_id) } @@ -78,7 +78,7 @@ impl Pool { /// idle state without observing the tasks scheduled by this caller. If this /// is not tolerable (for instance if this method is called from a /// non-worker thread), use the more expensive `activate_worker`. - pub(crate) fn activate_worker_relaxed(&self) { + pub(super) fn activate_worker_relaxed(&self) { if let Some(worker_id) = self.registry.set_one_active_relaxed() { self.searching_workers.fetch_add(1, Ordering::Relaxed); self.worker_unparkers[worker_id].unpark(); @@ -88,7 +88,7 @@ impl Pool { /// Unparks an idle worker if any is found and mark it as active, or ensure /// that at least the last active worker will observe all memory operations /// performed before this call when calling `try_set_worker_inactive`. - pub(crate) fn activate_worker(&self) { + pub(super) fn activate_worker(&self) { if let Some(worker_id) = self.registry.set_one_active() { self.searching_workers.fetch_add(1, Ordering::Relaxed); self.worker_unparkers[worker_id].unpark(); @@ -99,28 +99,28 @@ impl Pool { /// /// If `true` is returned, it is guaranteed that all operations performed by /// the now-inactive workers become visible in this thread. - pub(crate) fn is_pool_idle(&self) -> bool { + pub(super) fn is_pool_idle(&self) -> bool { self.registry.is_idle() } /// Increments the count of workers actively searching for tasks. - pub(crate) fn begin_worker_search(&self) { + pub(super) fn begin_worker_search(&self) { self.searching_workers.fetch_add(1, Ordering::Relaxed); } /// Decrements the count of workers actively searching for tasks. - pub(crate) fn end_worker_search(&self) { + pub(super) fn end_worker_search(&self) { self.searching_workers.fetch_sub(1, Ordering::Relaxed); } /// Returns the count of workers actively searching for tasks. - pub(crate) fn searching_worker_count(&self) -> usize { + pub(super) fn searching_worker_count(&self) -> usize { self.searching_workers.load(Ordering::Relaxed) } /// Triggers the termination signal and unparks all worker threads so they /// can cleanly terminate. - pub(crate) fn trigger_termination(&self) { + pub(super) fn trigger_termination(&self) { self.terminate_signal.store(true, Ordering::Relaxed); self.registry.set_all_active(); @@ -130,7 +130,7 @@ impl Pool { } /// Returns true if the termination signal was triggered. - pub(crate) fn termination_is_triggered(&self) -> bool { + pub(super) fn termination_is_triggered(&self) -> bool { self.terminate_signal.load(Ordering::Relaxed) } @@ -139,7 +139,7 @@ impl Pool { /// If no panic is currently registered, the panic in argument is /// registered. If a panic was already registered by a worker and was not /// yet processed by the executor, then nothing is done. - pub(crate) fn register_panic(&self, panic: Box) { + pub(super) fn register_panic(&self, panic: Box) { let mut worker_panic = self.worker_panic.lock().unwrap(); if worker_panic.is_none() { *worker_panic = Some(panic); @@ -147,7 +147,7 @@ impl Pool { } /// Takes a worker panic if any is registered. - pub(crate) fn take_panic(&self) -> Option> { + pub(super) fn take_panic(&self) -> Option> { let mut worker_panic = self.worker_panic.lock().unwrap(); worker_panic.take() } @@ -156,7 +156,7 @@ impl Pool { /// workers, starting from a randomly selected active worker. The worker /// which ID is provided in argument (if any) is excluded from the pool of /// candidates. - pub(crate) fn shuffled_stealers<'a>( + pub(super) fn shuffled_stealers<'a>( &'a self, excluded_worker_id: Option, rng: &'_ rng::Rng, @@ -173,7 +173,7 @@ impl Pool { /// An iterator over active workers that yields their associated stealer, /// starting from a randomly selected worker. -pub(crate) struct ShuffledStealers<'a> { +pub(super) struct ShuffledStealers<'a> { stealers: &'a [Stealer], // A bit-rotated bit field of the remaining candidate workers to steal from. // If set, the LSB represents the next candidate. diff --git a/asynchronix/src/runtime/executor/queue.rs b/asynchronix/src/runtime/executor/queue.rs index 6089e9a..0236743 100644 --- a/asynchronix/src/runtime/executor/queue.rs +++ b/asynchronix/src/runtime/executor/queue.rs @@ -12,7 +12,7 @@ use crate::loom_exports::cell::UnsafeCell; use crate::loom_exports::sync::atomic::{AtomicU32, AtomicU64}; use crate::loom_exports::{debug_or_loom_assert, debug_or_loom_assert_eq}; -pub(crate) use buffers::*; +pub(super) use buffers::*; mod buffers; #[cfg(test)] @@ -180,13 +180,13 @@ impl> Drop for Queue { /// Handle for single-threaded FIFO push and pop operations. #[derive(Debug)] -pub(crate) struct Worker> { +pub(super) struct Worker> { queue: Arc>, } impl> Worker { /// Creates a new queue and returns a `Worker` handle. - pub(crate) fn new() -> Self { + pub(super) fn new() -> Self { let queue = Arc::new(Queue { heads: CachePadded::new(AtomicU64::new(0)), tail: CachePadded::new(AtomicU32::new(0)), @@ -201,7 +201,7 @@ impl> Worker { /// /// An arbitrary number of `Stealer` handles can be created, either using /// this method or cloning an existing `Stealer` handle. - pub(crate) fn stealer(&self) -> Stealer { + pub(super) fn stealer(&self) -> Stealer { Stealer { queue: self.queue.clone(), } @@ -212,7 +212,7 @@ impl> Worker { /// /// Note that that the spare capacity may be underestimated due to /// concurrent stealing operations. - pub(crate) fn spare_capacity(&self) -> usize { + pub(super) fn spare_capacity(&self) -> usize { let capacity = >::CAPACITY; let stealer_head = unpack_heads(self.queue.heads.load(Relaxed)).1; let tail = self.queue.tail.load(Relaxed); @@ -230,7 +230,7 @@ impl> Worker { /// /// This will fail if the queue is full, in which case the item is returned /// as the error field. - pub(crate) fn push(&self, item: T) -> Result<(), T> { + pub(super) fn push(&self, item: T) -> Result<(), T> { let stealer_head = unpack_heads(self.queue.heads.load(Acquire)).1; let tail = self.queue.tail.load(Relaxed); @@ -258,7 +258,7 @@ impl> Worker { /// spare capacity to accommodate all iterator items, for instance by /// calling `[Worker::spare_capacity]` beforehand. Otherwise, the iterator /// is dropped while still holding the items in excess. - pub(crate) fn extend>(&self, iter: I) { + pub(super) fn extend>(&self, iter: I) { let stealer_head = unpack_heads(self.queue.heads.load(Acquire)).1; let mut tail = self.queue.tail.load(Relaxed); @@ -284,7 +284,7 @@ impl> Worker { /// Attempts to pop one item from the head of the queue. /// /// This returns None if the queue is empty. - pub(crate) fn pop(&self) -> Option { + pub(super) fn pop(&self) -> Option { let mut heads = self.queue.heads.load(Acquire); let prev_worker_head = loop { @@ -343,7 +343,7 @@ impl> Worker { /// An error is returned in the following cases: /// 1) no item was stolen, either because the queue is empty or `N` is 0, /// 2) a concurrent stealing operation is ongoing. - pub(crate) fn drain(&self, count_fn: C) -> Result, StealError> + pub(super) fn drain(&self, count_fn: C) -> Result, StealError> where C: FnMut(usize) -> usize, { @@ -373,7 +373,7 @@ unsafe impl> Send for Worker {} /// This iterator is created by [`Worker::drain`]. See its documentation for /// more. #[derive(Debug)] -pub(crate) struct Drain<'a, T, B: Buffer> { +pub(super) struct Drain<'a, T, B: Buffer> { queue: &'a Queue, head: u32, from_head: u32, @@ -448,7 +448,7 @@ unsafe impl<'a, T: Send, B: Buffer> Sync for Drain<'a, T, B> {} /// Handle for multi-threaded stealing operations. #[derive(Debug)] -pub(crate) struct Stealer> { +pub(super) struct Stealer> { queue: Arc>, } @@ -474,7 +474,7 @@ impl> Stealer { /// an error as long as one element could be returned directly. This can /// occur if the destination queue is full, if the source queue has only one /// item or if `N` is 1. - pub(crate) fn steal_and_pop( + pub(super) fn steal_and_pop( &self, dest: &Worker, count_fn: C, @@ -557,7 +557,7 @@ unsafe impl> Sync for Stealer {} /// Error returned when stealing is unsuccessful. #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum StealError { +pub(super) enum StealError { /// No item was stolen. Empty, /// Another concurrent stealing operation is ongoing. diff --git a/asynchronix/src/runtime/executor/worker.rs b/asynchronix/src/runtime/executor/worker.rs index b989b02..c45dc29 100644 --- a/asynchronix/src/runtime/executor/worker.rs +++ b/asynchronix/src/runtime/executor/worker.rs @@ -8,14 +8,14 @@ use super::LocalQueue; /// A local worker with access to global executor resources. pub(crate) struct Worker { - pub(crate) local_queue: LocalQueue, - pub(crate) fast_slot: Cell>, - pub(crate) pool: Arc, + pub(super) local_queue: LocalQueue, + pub(super) fast_slot: Cell>, + pub(super) pool: Arc, } impl Worker { /// Creates a new worker. - pub(crate) fn new(local_queue: LocalQueue, pool: Arc) -> Self { + pub(super) fn new(local_queue: LocalQueue, pool: Arc) -> Self { Self { local_queue, fast_slot: Cell::new(None),