1
0
forked from ROMEO/nexosim

Simplify task_set & satisfy clippy

This commit is contained in:
Serge Barral 2024-08-07 10:23:10 +02:00
parent b5187ded44
commit b544bcee92
2 changed files with 8 additions and 59 deletions

View File

@ -48,9 +48,9 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
self.shared.outputs.push(None);
// The storage is alway an empty vector so we just book some capacity.
self.shared.storage.as_mut().map(|s| {
let _ = s.try_reserve(self.senders.len());
});
if let Some(storage) = self.shared.storage.as_mut() {
let _ = storage.try_reserve(self.senders.len());
};
line_id
}
@ -83,6 +83,7 @@ impl<T: Clone, R> BroadcasterInner<T, R> {
/// Return a list of futures broadcasting an event or query to multiple
/// addresses.
#[allow(clippy::type_complexity)]
fn futures(
&mut self,
arg: T,

View File

@ -196,60 +196,12 @@ impl TaskSet {
next: AtomicU32::new(SLEEPING),
}));
}
return;
}
// Try to shrink the vector of tasks.
//
// The main issue when shrinking the vector of tasks is that stale
// wakers may still be around and may at any moment be scheduled and
// insert their task index in the list of scheduled tasks. If it cannot
// be guaranteed that this will not happen, then the vector of tasks
// cannot be shrunk further, otherwise the iterator for scheduled tasks
// will later fail when reaching a task with an invalid index.
//
// We follow a 2-steps strategy:
//
// 1) remove all tasks currently in the list of scheduled task and set
// them to `SLEEPING` state in case some of them might have an index
// that will be invalidated when the vector of tasks is shrunk;
//
// 2) attempt to iteratively shrink the vector of tasks by removing
// tasks starting from the back of the vector:
// - If a task is in the `SLEEPING` state, then its `next` pointer is
// changed to an arbitrary value other than`SLEEPING`, but the task
// is not inserted in the list of scheduled tasks; this way, the
// task will be effectively rendered inactive. The task can now be
// removed from the vector.
// - If a task is found in a non-`SLEEPING` state (meaning that there
// was a race and the task was scheduled after step 1) then abandon
// further shrinking and leave this task in the vector; the iterator
// for scheduled tasks mitigates such situation by only yielding
// task indices that are within the expected range.
// Step 1: unscheduled tasks that may be scheduled.
self.discard_scheduled();
// Step 2: attempt to remove tasks starting at the back of the vector.
while self.tasks.len() > len {
// There is at least one task since `len()` was non-zero.
let task = self.tasks.last().unwrap();
// Ordering: Relaxed ordering is sufficient since the task is
// effectively discarded.
if task
.next
.compare_exchange(SLEEPING, EMPTY, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
// The task could not be removed for now so the set of tasks cannot
// be shrunk further.
break;
}
self.tasks.pop();
}
// The vector of tasks is never shrunk as this is a fairly costly
// operation and is not strictly necessary. Typically, inactive tasks
// left at the back of the vector are never waken anyway, and if they
// are, they are filtered out by the task iterator.
}
/// Returns `true` if one or more sub-tasks are currently scheduled.
@ -271,10 +223,6 @@ impl TaskSet {
waker_ref(&self.tasks[idx])
}
pub(crate) fn len(&self) -> usize {
self.task_count
}
}
/// Internals shared between a `TaskSet` and its associated `Task`s.