From 0924ecb1b993585f917fb674c6626381d7fec7cd Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 10 May 2021 09:13:20 -0400 Subject: [PATCH] Add FuturesUnordered::clear (#2415) --- .../src/stream/futures_unordered/mod.rs | 30 +++++++++++---- .../futures_unordered/ready_to_run_queue.rs | 37 +++++++++++++------ futures/tests/stream_futures_unordered.rs | 22 +++++++++++ 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 89ce113d64..a25fbe03ef 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -236,7 +236,7 @@ impl FuturesUnordered { (task, len) } - /// Releases the task. It destorys the future inside and either drops + /// Releases the task. It destroys the future inside and either drops /// the `Arc` or transfers ownership to the ready to run queue. /// The task this method is called on must have been unlinked before. fn release_task(&mut self, task: Arc>) { @@ -553,19 +553,33 @@ impl Debug for FuturesUnordered { } } +impl FuturesUnordered { + /// Clears the set, removing all futures. + pub fn clear(&mut self) { + self.clear_head_all(); + + // we just cleared all the tasks, and we have &mut self, so this is safe. + unsafe { self.ready_to_run_queue.clear() }; + + self.is_terminated.store(false, Relaxed); + } + + fn clear_head_all(&mut self) { + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + } + } +} + impl Drop for FuturesUnordered { fn drop(&mut self) { // When a `FuturesUnordered` is dropped we want to drop all futures // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. - unsafe { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = self.unlink(head); - self.release_task(task); - } - } + self.clear_head_all(); // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 3b34dc6e27..5ef6cde83d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -85,25 +85,38 @@ impl ReadyToRunQueue { pub(super) fn stub(&self) -> *const Task { &*self.stub } + + // Clear the queue of tasks. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. This method just pulls out + // tasks and drops their refcounts. + // + // # Safety + // + // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) + // - The caller **must** guarantee unique access to `self` + pub(crate) unsafe fn clear(&self) { + loop { + // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } + } } impl Drop for ReadyToRunQueue { fn drop(&mut self) { // Once we're in the destructor for `Inner` we need to clear out // the ready to run queue of tasks if there's anything left in there. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. All tasks should have had - // their futures dropped already by the `FuturesUnordered` destructor - // above, so we're just pulling out tasks and dropping their refcounts. + + // All tasks have had their futures dropped already by the `FuturesUnordered` + // destructor above, and we have &mut self, so this is safe. unsafe { - loop { - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } + self.clear(); } } } diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 3a5d41853d..4b9afccaf9 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -345,3 +345,25 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); } + +#[test] +fn clear() { + let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(future::ready(3)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +}