Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed flat_map and flatten #701

Merged
merged 11 commits into from
Jun 18, 2020
7 changes: 4 additions & 3 deletions src/stream/stream/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ where
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
match futures_core::ready!(inner.poll_next(cx)) {
item @ Some(_) => return Poll::Ready(item),
None => this.inner_stream.set(None),
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/stream/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ where
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
match futures_core::ready!(inner.poll_next(cx)) {
item @ Some(_) => return Poll::Ready(item),
None => this.inner_stream.set(None),
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
Expand Down
73 changes: 73 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::convert::identity;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -108,3 +110,74 @@ fn merge_works_with_unfused_streams() {
assert_eq!(xs, vec![92, 92]);
});
}

struct S<T>(T);

impl<T: Stream + Unpin> Stream for S<T> {
type Item = T::Item;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe { Pin::new_unchecked(&mut self.0) }.poll_next(ctx)
}
}

struct StrictOnce {
polled: bool,
}

impl Stream for StrictOnce {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
assert!(!self.polled, "Polled after completion!");
self.polled = true;
Poll::Ready(None)
}
}

struct Interchanger {
polled: bool,
}

impl Stream for Interchanger {
type Item = S<Box<dyn Stream<Item = ()> + Unpin>>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if self.polled {
self.polled = false;
ctx.waker().wake_by_ref();
Poll::Pending
} else {
self.polled = true;
Poll::Ready(Some(S(Box::new(StrictOnce { polled: false }))))
}
}
}

#[test]
fn flat_map_doesnt_poll_completed_inner_stream() {
task::block_on(async {
assert_eq!(
Interchanger { polled: false }
.take(2)
.flat_map(identity)
.count()
.await,
0
);
});
}

#[test]
fn flatten_doesnt_poll_completed_inner_stream() {
task::block_on(async {
assert_eq!(
Interchanger { polled: false }
.take(2)
.flatten()
.count()
.await,
0
);
});
}