Skip to content

Commit

Permalink
Add woken state
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Jan 26, 2022
1 parent 6d0ccde commit 69f60cc
Showing 1 changed file with 61 additions and 29 deletions.
90 changes: 61 additions & 29 deletions futures-util/src/stream/stream/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const WAKING_STREAM: u8 = 0b10000;
/// The base stream and inner streams are being woken at the moment.
const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;

/// The stream was waked and will be polled.
const WOKEN: u8 = 0b100000;

/// Determines what needs to be polled, and is stream being polled at the
/// moment or not.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -74,7 +77,7 @@ impl SharedPollState {
})
.ok()
.map(|value| {
let bomb = PollStateBomb::new(self, |state| state.stop_polling(NEED_TO_POLL_ALL));
let bomb = PollStateBomb::new(self, SharedPollState::reset);

(value, bomb)
})
Expand All @@ -86,28 +89,53 @@ impl SharedPollState {
to_poll: u8,
waking: u8,
) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
let value = self.state.fetch_or(to_poll | waking, Ordering::SeqCst);
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
if value & (WOKEN | POLLING) == NONE {
Some(value | to_poll | waking)
} else {
Some(value | to_poll)
}
})
.unwrap();
let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));

(value, bomb)
}

/// Sets current state to `!POLLING` allowing to use wakers and `!WALING_ALL` as
/// - Wakers called during the `POLLING` phase won't propagate their calls
/// - `POLLING` phase can't start if some of the wakers are active
///
/// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
fn stop_polling(&self, to_poll: u8) -> u8 {
/// Sets current state to
/// - `!POLLING` allowing to use wakers
/// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
/// or `will_be_woken` flag supplied
/// - `!WAKING_ALL` as
/// * Wakers called during the `POLLING` phase won't propagate their calls
/// * `POLLING` phase can't start if some of the wakers are active
/// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
fn stop_polling(&self, mut to_poll: u8, will_be_woken: bool) -> u8 {
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
Some((value | to_poll) & !POLLING & !WAKING_ALL)
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
value &= NEED_TO_POLL_ALL;
if value != NONE || will_be_woken {
to_poll |= WOKEN;
}
to_poll |= value;

Some(to_poll & !POLLING & !WAKING_ALL)
})
.unwrap()
}

/// Toggles state to non-waking, allowing to start polling.
fn stop_waking(&self, waking: u8) -> u8 {
self.state.fetch_and(!waking, Ordering::SeqCst)
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| Some(value & !waking | WOKEN))
.unwrap()
}

/// Resets current state allowing to poll the stream and wake up wakers.
fn reset(&self) -> u8 {
self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
}
}

Expand Down Expand Up @@ -182,30 +210,30 @@ impl ArcWake for InnerWaker {
fn wake_by_ref(self_arc: &Arc<Self>) {
let (poll_state_value, state_bomb) = self_arc.start_waking();

// Only call waker if stream isn't being polled because of safety reasons
// Only call waker if the stream
// - isn't woken already
// - isn't being polled at the moment because of safety reasons
//
// Waker will be called at the end of polling if state was changed
if poll_state_value & POLLING == NONE {
if poll_state_value & (WOKEN | POLLING) == NONE {
// Safety: now state is not `POLLING`
let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };

if let Some(inner_waker) = waker_opt.clone() {
// Stop waking to allow polling stream
let poll_state_value = state_bomb.fire().unwrap();

// Here we want to optimize the case when two wakers are called at the same time
//
// In this case the best strategy will be to propagate only the latest waker's awake,
// and then poll both entities in a single `poll_next` call

// Check if this is the latest waker being waking
if poll_state_value & WAKING_ALL == self_arc.waking_state() {
// Check if the stream isn't woken yet
if poll_state_value & WOKEN == NONE {
// Wake up inner waker
inner_waker.wake();
}
}
} else {
// At the end of polling state will become `!WAKING_ALL`
// At the end of polling state will become `!WAKING_ALL` if stream is being polled
//
// And it's already non-waking if it's woken

state_bomb.deactivate();
}
}
Expand Down Expand Up @@ -422,20 +450,24 @@ where

// We didn't have any `poll_next` panic, so it's time to deactivate the bomb
state_bomb.deactivate();

let mut force_wake =
// we need to poll the stream and didn't reach the limit yet
need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
// or we need to poll inner streams again
|| need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;

// Stop polling and swap the latest state
poll_state_value = this.poll_state.stop_polling(need_to_poll_next);
poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
// If state was changed during `POLLING` phase, need to manually call a waker
force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;

let is_done = *this.is_stream_done && this.inner_streams.is_empty();

if next_item.is_some() || is_done {
Poll::Ready(next_item)
} else {
// We need to call the waker if state was changed during the polling phase
if poll_state_value & NEED_TO_POLL_ALL != NONE
// or we need to poll the stream and didn't reach the limit yet
|| need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
// or we need to poll inner streams again
|| need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE
{
if force_wake {
cx.waker().wake_by_ref();
}

Expand Down

0 comments on commit 69f60cc

Please sign in to comment.