Skip to content

Commit

Permalink
Poll both in case if ther's nothing to poll (poll was invoked indepen…
Browse files Browse the repository at this point in the history
…dently from wakers) + update benchmark
  • Loading branch information
olegnn committed Jun 12, 2020
1 parent 03a3814 commit 1324ea9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
17 changes: 10 additions & 7 deletions futures-util/benches/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::collections::VecDeque;
use std::thread;

#[bench]
fn inner_oneshots(b: &mut Bencher) {
const STREAM_COUNT: usize = 1_000;
const STREAM_ITEM_COUNT: usize = 10;
fn oneshot_streams(b: &mut Bencher) {
const STREAM_COUNT: usize = 10_000;
const STREAM_ITEM_COUNT: usize = 1;

b.iter(|| {
let mut txs = VecDeque::with_capacity(STREAM_COUNT);
Expand All @@ -27,10 +27,10 @@ fn inner_oneshots(b: &mut Bencher) {
}

thread::spawn(move || {
let mut m = 1;
let mut last = 1;
while let Some(tx) = txs.pop_front() {
let _ = tx.send(stream::iter(m..m + STREAM_ITEM_COUNT));
m += STREAM_ITEM_COUNT;
let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
last += STREAM_ITEM_COUNT;
}
});

Expand All @@ -52,11 +52,14 @@ fn inner_oneshots(b: &mut Bencher) {
loop {
match flatten.poll_next_unpin(cx) {
Poll::Ready(None) => break,
Poll::Ready(Some(_)) => count += 1,
Poll::Ready(Some(_)) => {
count += 1;
}
_ => {}
}
}
assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);

Poll::Ready(())
}))
});
Expand Down
21 changes: 13 additions & 8 deletions futures-util/src/stream/stream/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const NEED_TO_POLL_FUTURES: u8 = 1;
const NEED_TO_POLL_STREAM: u8 = 0b10;

/// Indicates that it needs to poll something.
const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;
const NEED_TO_POLL_BOTH: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;

/// Indicates that current stream is polled at the moment.
const POLLING: u8 = 0b100;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl SharedPollState {
/// with non-`POLLING` state, and returns disjunction result.
fn end_polling(&self, mut to_poll: u8) -> u8 {
to_poll |= self.state.swap(!POLLING & !WOKEN, Ordering::SeqCst);
self.state.fetch_and(to_poll & !POLLING & !WOKEN, Ordering::SeqCst);
self.state.swap(to_poll & !POLLING & !WOKEN, Ordering::SeqCst);
to_poll
}
}
Expand Down Expand Up @@ -101,8 +101,8 @@ impl ArcWake for PollWaker {
// Only call waker if stream isn't being polled because it will be called
// at the end of polling if state was changed.
if poll_state_value & (POLLING | WOKEN) == NONE {
self_arc.poll_state.set_or(WOKEN);
if let Some(Some(inner_waker)) = unsafe { self_arc.inner_waker.get().as_ref() } {
self_arc.poll_state.set_or(WOKEN);
inner_waker.wake_by_ref();
}
}
Expand Down Expand Up @@ -248,7 +248,12 @@ where
let mut this = self.project();

let mut poll_state_value = this.poll_state.begin_polling();
let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL == NEED_TO_POLL && !stream_will_be_woken;

if poll_state_value & NEED_TO_POLL_BOTH == NONE {
poll_state_value = NEED_TO_POLL_STREAM | if this.futures.is_empty() { NONE } else { NEED_TO_POLL_FUTURES };
}

let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL_BOTH == NEED_TO_POLL_BOTH && !stream_will_be_woken;

if poll_state_value & NEED_TO_POLL_STREAM != NONE {
if !stream_will_be_woken {
Expand Down Expand Up @@ -323,18 +328,18 @@ where

let is_done = *this.is_stream_done && this.futures.is_empty();

if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL != NONE
if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL_BOTH != NONE
&& (polling_with_two_wakers
|| poll_state_value & NEED_TO_POLL_FUTURES != NONE && !futures_will_be_woken
|| poll_state_value & NEED_TO_POLL_STREAM != NONE && !stream_will_be_woken)
{
ctx.waker().wake_by_ref();
}

if next_item.is_none() && !is_done {
Poll::Pending
} else {
if next_item.is_some() || is_done {
Poll::Ready(next_item)
} else {
Poll::Pending
}
}
}
Expand Down

0 comments on commit 1324ea9

Please sign in to comment.