Skip to content

Commit

Permalink
Actually fix the issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Aug 16, 2022
1 parent 54bcee7 commit 3b8ad5c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
15 changes: 10 additions & 5 deletions futures-util/src/stream/select_with_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl InternalState {
(InternalState::Start, PollNext::Right) => {
*self = InternalState::RightFinished;
}
(InternalState::LeftFinished, PollNext::Left)
| (InternalState::RightFinished, PollNext::Right) => {
(InternalState::LeftFinished, PollNext::Right)
| (InternalState::RightFinished, PollNext::Left) => {
*self = InternalState::BothFinished;
}
_ => {}
Expand Down Expand Up @@ -229,18 +229,23 @@ where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
match poll_side(select, side, cx) {
let first_done = match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => {
select.internal_state.finish(side);
true
}
Poll::Pending => (),
Poll::Pending => false,
};
let other = side.other();
match poll_side(select, other, cx) {
Poll::Ready(None) => {
select.internal_state.finish(other);
Poll::Ready(None)
if first_done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
a => a,
}
Expand Down
26 changes: 14 additions & 12 deletions futures/tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl Stream for SlowStream {
cx.waker().wake_by_ref();
return Poll::Pending;
}
if self.times_polled.get() == self.times_should_poll {
if self.times_polled.get() >= self.times_should_poll {
return Poll::Ready(None);
}
Poll::Ready(Some(self.times_polled.get()))
Expand All @@ -464,15 +464,17 @@ impl Stream for SlowStream {

#[test]
fn select_with_strategy_doesnt_terminate_early() {
let times_should_poll = 10;
let count = Rc::new(Cell::new(0));
let b = stream::iter([10, 20]);

let selected = stream::select_with_strategy(
SlowStream { times_should_poll, times_polled: count.clone() },
b,
|_: &mut ()| stream::PollNext::Left,
);
block_on(selected.for_each(|v| async move { println!("{}", v) }));
assert_eq!(count.get(), times_should_poll);
for side in [stream::PollNext::Left, stream::PollNext::Right] {
let times_should_poll = 10;
let count = Rc::new(Cell::new(0));
let b = stream::iter([10, 20]);

let mut selected = stream::select_with_strategy(
SlowStream { times_should_poll, times_polled: count.clone() },
b,
|_: &mut ()| side,
);
block_on(async move { while let Some(_) = selected.next().await {} });
assert_eq!(count.get(), times_should_poll + 1);
}
}

0 comments on commit 3b8ad5c

Please sign in to comment.