diff --git a/asynchronix/src/ports/output/broadcaster.rs b/asynchronix/src/ports/output/broadcaster.rs index 07027d2..4300392 100644 --- a/asynchronix/src/ports/output/broadcaster.rs +++ b/asynchronix/src/ports/output/broadcaster.rs @@ -48,9 +48,9 @@ impl BroadcasterInner { self.shared.outputs.push(None); // The storage is alway an empty vector so we just book some capacity. - self.shared.storage.as_mut().map(|s| { - let _ = s.try_reserve(self.senders.len()); - }); + if let Some(storage) = self.shared.storage.as_mut() { + let _ = storage.try_reserve(self.senders.len()); + }; line_id } @@ -83,6 +83,7 @@ impl BroadcasterInner { /// Return a list of futures broadcasting an event or query to multiple /// addresses. + #[allow(clippy::type_complexity)] fn futures( &mut self, arg: T, diff --git a/asynchronix/src/util/task_set.rs b/asynchronix/src/util/task_set.rs index e1145e8..3ad9b4f 100644 --- a/asynchronix/src/util/task_set.rs +++ b/asynchronix/src/util/task_set.rs @@ -196,60 +196,12 @@ impl TaskSet { next: AtomicU32::new(SLEEPING), })); } - - return; } - // Try to shrink the vector of tasks. - // - // The main issue when shrinking the vector of tasks is that stale - // wakers may still be around and may at any moment be scheduled and - // insert their task index in the list of scheduled tasks. If it cannot - // be guaranteed that this will not happen, then the vector of tasks - // cannot be shrunk further, otherwise the iterator for scheduled tasks - // will later fail when reaching a task with an invalid index. - // - // We follow a 2-steps strategy: - // - // 1) remove all tasks currently in the list of scheduled task and set - // them to `SLEEPING` state in case some of them might have an index - // that will be invalidated when the vector of tasks is shrunk; - // - // 2) attempt to iteratively shrink the vector of tasks by removing - // tasks starting from the back of the vector: - // - If a task is in the `SLEEPING` state, then its `next` pointer is - // changed to an arbitrary value other than`SLEEPING`, but the task - // is not inserted in the list of scheduled tasks; this way, the - // task will be effectively rendered inactive. The task can now be - // removed from the vector. - // - If a task is found in a non-`SLEEPING` state (meaning that there - // was a race and the task was scheduled after step 1) then abandon - // further shrinking and leave this task in the vector; the iterator - // for scheduled tasks mitigates such situation by only yielding - // task indices that are within the expected range. - - // Step 1: unscheduled tasks that may be scheduled. - self.discard_scheduled(); - - // Step 2: attempt to remove tasks starting at the back of the vector. - while self.tasks.len() > len { - // There is at least one task since `len()` was non-zero. - let task = self.tasks.last().unwrap(); - - // Ordering: Relaxed ordering is sufficient since the task is - // effectively discarded. - if task - .next - .compare_exchange(SLEEPING, EMPTY, Ordering::Relaxed, Ordering::Relaxed) - .is_err() - { - // The task could not be removed for now so the set of tasks cannot - // be shrunk further. - break; - } - - self.tasks.pop(); - } + // The vector of tasks is never shrunk as this is a fairly costly + // operation and is not strictly necessary. Typically, inactive tasks + // left at the back of the vector are never waken anyway, and if they + // are, they are filtered out by the task iterator. } /// Returns `true` if one or more sub-tasks are currently scheduled. @@ -271,10 +223,6 @@ impl TaskSet { waker_ref(&self.tasks[idx]) } - - pub(crate) fn len(&self) -> usize { - self.task_count - } } /// Internals shared between a `TaskSet` and its associated `Task`s.