Skip to content

Commit

Permalink
Add FuturesUnordered::clear (#2415)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev authored and taiki-e committed May 10, 2021
1 parent 1c5a021 commit 0924ecb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 20 deletions.
30 changes: 22 additions & 8 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl<Fut> FuturesUnordered<Fut> {
(task, len)
}

/// Releases the task. It destorys the future inside and either drops
/// Releases the task. It destroys the future inside and either drops
/// the `Arc<Task>` or transfers ownership to the ready to run queue.
/// The task this method is called on must have been unlinked before.
fn release_task(&mut self, task: Arc<Task<Fut>>) {
Expand Down Expand Up @@ -553,19 +553,33 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
}
}

impl<Fut> FuturesUnordered<Fut> {
/// Clears the set, removing all futures.
pub fn clear(&mut self) {
self.clear_head_all();

// we just cleared all the tasks, and we have &mut self, so this is safe.
unsafe { self.ready_to_run_queue.clear() };

self.is_terminated.store(false, Relaxed);
}

fn clear_head_all(&mut self) {
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
}
}
}

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
unsafe {
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = self.unlink(head);
self.release_task(task);
}
}
self.clear_head_all();

// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand Down
37 changes: 25 additions & 12 deletions futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,38 @@ impl<Fut> ReadyToRunQueue<Fut> {
pub(super) fn stub(&self) -> *const Task<Fut> {
&*self.stub
}

// Clear the queue of tasks.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. This method just pulls out
// tasks and drops their refcounts.
//
// # Safety
//
// - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
// - The caller **must** guarantee unique access to `self`
pub(crate) unsafe fn clear(&self) {
loop {
// SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
}
}

impl<Fut> Drop for ReadyToRunQueue<Fut> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner<Fut>` we need to clear out
// the ready to run queue of tasks if there's anything left in there.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. All tasks should have had
// their futures dropped already by the `FuturesUnordered` destructor
// above, so we're just pulling out tasks and dropping their refcounts.

// All tasks have had their futures dropped already by the `FuturesUnordered`
// destructor above, and we have &mut self, so this is safe.
unsafe {
loop {
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
self.clear();
}
}
}
22 changes: 22 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,25 @@ fn polled_only_once_at_most_per_iteration() {
let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
}

#[test]
fn clear() {
let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);

assert_eq!(block_on(tasks.next()), Some(1));
assert!(!tasks.is_empty());

tasks.clear();
assert!(tasks.is_empty());

tasks.push(future::ready(3));
assert!(!tasks.is_empty());

tasks.clear();
assert!(tasks.is_empty());

assert_eq!(block_on(tasks.next()), None);
assert!(tasks.is_terminated());
tasks.clear();
assert!(!tasks.is_terminated());
}

0 comments on commit 0924ecb

Please sign in to comment.