From 54bcee74bd8e120f7808a0a77775a9a124a41be1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Aug 2022 09:36:48 -0700 Subject: [PATCH] Add example failing test --- futures/tests/stream.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 6781a102d2..d0b3ef9e45 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -1,5 +1,9 @@ +use std::cell::Cell; use std::iter; +use std::pin::Pin; +use std::rc::Rc; use std::sync::Arc; +use std::task::Context; use futures::channel::mpsc; use futures::executor::block_on; @@ -9,6 +13,7 @@ use futures::sink::SinkExt; use futures::stream::{self, StreamExt}; use futures::task::Poll; use futures::{ready, FutureExt}; +use futures_core::Stream; use futures_test::task::noop_context; #[test] @@ -436,3 +441,38 @@ fn ready_chunks() { assert_eq!(s.next().await.unwrap(), vec![4]); }); } + +struct SlowStream { + times_should_poll: usize, + times_polled: Rc>, +} +impl Stream for SlowStream { + type Item = usize; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.times_polled.set(self.times_polled.get() + 1); + if self.times_polled.get() % 2 == 0 { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if self.times_polled.get() == self.times_should_poll { + return Poll::Ready(None); + } + Poll::Ready(Some(self.times_polled.get())) + } +} + +#[test] +fn select_with_strategy_doesnt_terminate_early() { + let times_should_poll = 10; + let count = Rc::new(Cell::new(0)); + let b = stream::iter([10, 20]); + + let selected = stream::select_with_strategy( + SlowStream { times_should_poll, times_polled: count.clone() }, + b, + |_: &mut ()| stream::PollNext::Left, + ); + block_on(selected.for_each(|v| async move { println!("{}", v) })); + assert_eq!(count.get(), times_should_poll); +}