Skip to content

Commit

Permalink
Merge pull request #701 from olegnn/flat_map_fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jun 18, 2020
2 parents a602a91 + 42425f6 commit 5d55fa7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 6 deletions.
7 changes: 4 additions & 3 deletions src/stream/stream/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ where
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
match futures_core::ready!(inner.poll_next(cx)) {
item @ Some(_) => return Poll::Ready(item),
None => this.inner_stream.set(None),
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/stream/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ where
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
match futures_core::ready!(inner.poll_next(cx)) {
item @ Some(_) => return Poll::Ready(item),
None => this.inner_stream.set(None),
}
}

match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
Expand Down
73 changes: 73 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::convert::identity;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -108,3 +110,74 @@ fn merge_works_with_unfused_streams() {
assert_eq!(xs, vec![92, 92]);
});
}

struct S<T>(T);

impl<T: Stream + Unpin> Stream for S<T> {
type Item = T::Item;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe { Pin::new_unchecked(&mut self.0) }.poll_next(ctx)
}
}

struct StrictOnce {
polled: bool,
}

impl Stream for StrictOnce {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
assert!(!self.polled, "Polled after completion!");
self.polled = true;
Poll::Ready(None)
}
}

struct Interchanger {
polled: bool,
}

impl Stream for Interchanger {
type Item = S<Box<dyn Stream<Item = ()> + Unpin>>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if self.polled {
self.polled = false;
ctx.waker().wake_by_ref();
Poll::Pending
} else {
self.polled = true;
Poll::Ready(Some(S(Box::new(StrictOnce { polled: false }))))
}
}
}

#[test]
fn flat_map_doesnt_poll_completed_inner_stream() {
task::block_on(async {
assert_eq!(
Interchanger { polled: false }
.take(2)
.flat_map(identity)
.count()
.await,
0
);
});
}

#[test]
fn flatten_doesnt_poll_completed_inner_stream() {
task::block_on(async {
assert_eq!(
Interchanger { polled: false }
.take(2)
.flatten()
.count()
.await,
0
);
});
}

0 comments on commit 5d55fa7

Please sign in to comment.