From b3abec4bf22a8d82053e76f363a4039c9ddc1e45 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Thu, 8 Aug 2019 00:13:09 +0900 Subject: [PATCH] Re-add some disabled tests --- .../buffer_unordered.rs | 11 +- futures/{tests_disabled => tests}/eventual.rs | 37 +- .../{tests_disabled => tests}/ready_queue.rs | 84 ++- futures/tests/sink.rs | 491 ++++++++++++++++++ .../stream_catch_unwind.rs | 14 +- futures/tests/stream_select_all.rs | 34 +- futures/tests_disabled/sink.rs | 469 ----------------- futures/tests_disabled/stream_select_all.rs | 35 -- 8 files changed, 588 insertions(+), 587 deletions(-) rename futures/{tests_disabled => tests}/buffer_unordered.rs (86%) rename futures/{tests_disabled => tests}/eventual.rs (74%) rename futures/{tests_disabled => tests}/ready_queue.rs (58%) create mode 100644 futures/tests/sink.rs rename futures/{tests_disabled => tests}/stream_catch_unwind.rs (53%) delete mode 100644 futures/tests_disabled/sink.rs delete mode 100644 futures/tests_disabled/stream_select_all.rs diff --git a/futures/tests_disabled/buffer_unordered.rs b/futures/tests/buffer_unordered.rs similarity index 86% rename from futures/tests_disabled/buffer_unordered.rs rename to futures/tests/buffer_unordered.rs index bcc012fed3..972e555884 100644 --- a/futures/tests_disabled/buffer_unordered.rs +++ b/futures/tests/buffer_unordered.rs @@ -1,8 +1,7 @@ -use futures::SinkExt; +use futures::channel::{oneshot, mpsc}; use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; use futures::stream::StreamExt; -use futures::channel::oneshot; -use futures::channel::mpsc; use std::sync::mpsc as std_mpsc; use std::thread; @@ -17,20 +16,20 @@ fn works() { let t1 = thread::spawn(move || { for _ in 0..N+1 { let (mytx, myrx) = oneshot::channel(); - tx = block_on(tx.send(myrx)).unwrap(); + block_on(tx.send(myrx)).unwrap(); tx3.send(mytx).unwrap(); } rx2.recv().unwrap(); for _ in 0..N { let (mytx, myrx) = oneshot::channel(); - tx = block_on(tx.send(myrx)).unwrap(); + block_on(tx.send(myrx)).unwrap(); tx3.send(mytx).unwrap(); } }); let (tx4, rx4) = std_mpsc::channel(); let t2 = thread::spawn(move || { - for item in block_on_stream(rx.map_err(|_| panic!()).buffer_unordered(N)) { + for item in block_on_stream(rx.buffer_unordered(N)) { tx4.send(item.unwrap()).unwrap(); } }); diff --git a/futures/tests_disabled/eventual.rs b/futures/tests/eventual.rs similarity index 74% rename from futures/tests_disabled/eventual.rs rename to futures/tests/eventual.rs index c68828ec12..12b26aafa5 100644 --- a/futures/tests_disabled/eventual.rs +++ b/futures/tests/eventual.rs @@ -1,18 +1,19 @@ use futures::channel::oneshot; -use futures::future::{ok, err}; +use futures::executor::ThreadPool; +use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; +use futures::task::SpawnExt; use std::sync::mpsc; use std::thread; -mod support; -use support::*; - +fn run(future: F) { + let mut tp = ThreadPool::new().unwrap(); + tp.spawn(future.map(drop)).unwrap(); +} #[test] fn join1() { let (tx, rx) = mpsc::channel(); - ok::(1).join(ok(2)) - .map(move |v| tx.send(v).unwrap()) - .forget(); + run(future::try_join(ok::(1), ok(2)).map_ok(move |v| tx.send(v).unwrap())); assert_eq!(rx.recv(), Ok((1, 2))); assert!(rx.recv().is_err()); } @@ -22,7 +23,7 @@ fn join2() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).map(move |v| tx.send(v).unwrap()).forget(); + run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap())); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); assert!(rx.try_recv().is_err()); @@ -36,7 +37,7 @@ fn join3() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget(); + run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap())); assert!(rx.try_recv().is_err()); drop(c1); assert_eq!(rx.recv(), Ok(1)); @@ -49,7 +50,7 @@ fn join4() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget(); + run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap())); assert!(rx.try_recv().is_err()); drop(c1); assert!(rx.recv().is_ok()); @@ -63,7 +64,7 @@ fn join5() { let (c2, p2) = oneshot::channel::(); let (c3, p3) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget(); + run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap())); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); assert!(rx.try_recv().is_err()); @@ -79,7 +80,7 @@ fn select1() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.select(p2).map(move |v| tx.send(v).unwrap()).forget(); + run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap())); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); let (v, p2) = rx.recv().unwrap().into_inner(); @@ -87,7 +88,7 @@ fn select1() { assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); - p2.map(move |v| tx.send(v).unwrap()).forget(); + run(p2.map_ok(move |v| tx.send(v).unwrap())); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); @@ -98,7 +99,7 @@ fn select2() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget(); + run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); assert!(rx.try_recv().is_err()); drop(c1); let (v, p2) = rx.recv().unwrap(); @@ -106,7 +107,7 @@ fn select2() { assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); - p2.map(move |v| tx.send(v).unwrap()).forget(); + run(p2.map_ok(move |v| tx.send(v).unwrap())); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); @@ -117,7 +118,7 @@ fn select3() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget(); + run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); assert!(rx.try_recv().is_err()); drop(c1); let (v, p2) = rx.recv().unwrap(); @@ -125,7 +126,7 @@ fn select3() { assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); - p2.map_err(move |_v| tx.send(2).unwrap()).forget(); + run(p2.map_err(move |_v| tx.send(2).unwrap())); drop(c2); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); @@ -147,7 +148,7 @@ fn select4() { let (c2, p2) = oneshot::channel::(); let tx3 = tx2.clone(); - p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget(); + run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap())); tx.send(c1).unwrap(); rx2.recv().unwrap(); drop(c2); diff --git a/futures/tests_disabled/ready_queue.rs b/futures/tests/ready_queue.rs similarity index 58% rename from futures/tests_disabled/ready_queue.rs rename to futures/tests/ready_queue.rs index c3e07baabf..ae0b7c356d 100644 --- a/futures/tests_disabled/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -1,11 +1,12 @@ +use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; -use futures::Async::*; use futures::future; -use futures::stream::FuturesUnordered; -use futures::channel::oneshot; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; use std::panic::{self, AssertUnwindSafe}; - -mod support; +use std::sync::{Arc, Barrier}; +use std::thread; trait AssertSendSync: Send + Sync {} impl AssertSendSync for FuturesUnordered<()> {} @@ -22,22 +23,20 @@ fn basic_usage() { queue.push(rx2); queue.push(rx3); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert!(!queue.poll_next_unpin(cx).is_ready()); tx2.send("hello").unwrap(); - assert_eq!(Ready(Some("hello")), queue.poll_next(cx).unwrap()); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); tx1.send("world").unwrap(); tx3.send("world2").unwrap(); - assert_eq!(Ready(Some("world")), queue.poll_next(cx).unwrap()); - assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap()); - assert_eq!(Ready(None), queue.poll_next(cx).unwrap()); - - Ok::<_, ()>(()) - })).unwrap(); + assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); } #[test] @@ -52,22 +51,20 @@ fn resolving_errors() { queue.push(rx2); queue.push(rx3); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx2); - assert!(queue.poll_next(cx).is_err()); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx1); tx3.send("world2").unwrap(); - assert!(queue.poll_next(cx).is_err()); - assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap()); - assert_eq!(Ready(None), queue.poll_next(cx).unwrap()); - - Ok::<_, ()>(()) - })).unwrap(); + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); } #[test] @@ -82,29 +79,25 @@ fn dropping_ready_queue() { queue.push(rx2); queue.push(rx3); - support::noop_waker_lw(|cx| { - assert!(!tx1.poll_cancel(cx).unwrap().is_ready()); - assert!(!tx2.poll_cancel(cx).unwrap().is_ready()); - assert!(!tx3.poll_cancel(cx).unwrap().is_ready()); + { + let cx = &mut noop_context(); + assert!(!tx1.poll_cancel(cx).is_ready()); + assert!(!tx2.poll_cancel(cx).is_ready()); + assert!(!tx3.poll_cancel(cx).is_ready()); drop(queue); - assert!(tx1.poll_cancel(cx).unwrap().is_ready()); - assert!(tx2.poll_cancel(cx).unwrap().is_ready()); - assert!(tx3.poll_cancel(cx).unwrap().is_ready()); - }); - - Ok::<_, ()>(()).into_future() - })).unwrap(); + assert!(tx1.poll_cancel(cx).is_ready()); + assert!(tx2.poll_cancel(cx).is_ready()); + assert!(tx3.poll_cancel(cx).is_ready()); + } + })); } #[test] fn stress() { const ITER: usize = 300; - use std::sync::{Arc, Barrier}; - use std::thread; - for i in 0..ITER { let n = (i % 10) + 1; @@ -129,10 +122,7 @@ fn stress() { let mut sync = block_on_stream(queue); - let mut rx: Vec<_> = (&mut sync) - .take(n) - .map(|res| res.unwrap()) - .collect(); + let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect(); assert_eq!(rx.len(), n); @@ -151,15 +141,11 @@ fn stress() { fn panicking_future_dropped() { block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); - queue.push(future::poll_fn(|_| -> Poll { - panic!() - })); + queue.push(future::poll_fn(|_| -> Poll> { panic!() })); - let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next(cx))); + let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx))); assert!(r.is_err()); assert!(queue.is_empty()); - assert_eq!(Ready(None), queue.poll_next(cx).unwrap()); - - Ok::<_, ()>(()) - })).unwrap(); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); } diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs new file mode 100644 index 0000000000..be5e1453bd --- /dev/null +++ b/futures/tests/sink.rs @@ -0,0 +1,491 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +fn sassert_next(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), + } +} + +fn unwrap(x: Poll>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), + } +} + +#[test] +fn either_sink() { + let mut s = if true { + Vec::::new().left_sink() + } else { + VecDeque::::new().right_sink() + }; + + Pin::new(&mut s).start_send(0).unwrap(); +} + +#[test] +fn vec_sink() { + let mut v = Vec::new(); + Pin::new(&mut v).start_send(0).unwrap(); + Pin::new(&mut v).start_send(1).unwrap(); + assert_eq!(v, vec![0, 1]); + block_on(v.flush()).unwrap(); + assert_eq!(v, vec![0, 1]); +} + +#[test] +fn vecdeque_sink() { + let mut deque = VecDeque::new(); + Pin::new(&mut deque).start_send(2).unwrap(); + Pin::new(&mut deque).start_send(3).unwrap(); + + assert_eq!(deque.pop_front(), Some(2)); + assert_eq!(deque.pop_front(), Some(3)); + assert_eq!(deque.pop_front(), None); +} + +#[test] +fn send() { + let mut v = Vec::new(); + + block_on(v.send(0)).unwrap(); + assert_eq!(v, vec![0]); + + block_on(v.send(1)).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send(2)).unwrap(); + assert_eq!(v, vec![0, 1, 2]); +} + +#[test] +fn send_all() { + let mut v = Vec::new(); + + block_on(v.send_all(&mut stream::iter(vec![0, 1]))).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send_all(&mut stream::iter(vec![2, 3]))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3]); + + block_on(v.send_all(&mut stream::iter(vec![4, 5]))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); +} + +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); + +impl Flag { + fn new() -> Arc { + Arc::new(Self(AtomicBool::new(false))) + } + + fn get(&self) -> bool { + self.0.load(Ordering::SeqCst) + } + + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } +} + +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc) { + arc_self.set(true) + } +} + +fn flag_cx(f: F) -> R +where + F: FnOnce(Arc, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} + +// Sends a value on an i32 channel sink +struct StartSendFut + Unpin, Item: Unpin>(Option, Option); + +impl + Unpin, Item: Unpin> StartSendFut { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) + } +} + +impl + Unpin, Item: Unpin> Future for StartSendFut { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; + } + Poll::Ready(Ok(inner.take().unwrap())) + } +} + +// Test that `start_send` on an `mpsc` channel does indeed block when the +// channel is full +#[test] +fn mpsc_blocking_start_send() { + let (mut tx, mut rx) = mpsc::channel::(0); + + block_on(future::lazy(|_| { + tx.start_send(0).unwrap(); + + flag_cx(|flag, cx| { + let mut task = StartSendFut::new(tx, 1); + + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.get()); + sassert_next(&mut rx, 0); + assert!(flag.get()); + flag.set(false); + unwrap(task.poll_unpin(cx)); + assert!(!flag.get()); + sassert_next(&mut rx, 1); + }) + })); +} + +// test `flush` by using `with` to make the first insertion into a sink block +// until a oneshot is completed +#[test] +fn with_flush() { + let (tx, rx) = oneshot::channel(); + let mut block = rx.boxed(); + let mut sink = Vec::new().with(|elem| { + mem::replace(&mut block, future::ok(()).boxed()) + .map_ok(move |()| elem + 1) + .map_err(|_| -> Never { panic!() }) + }); + + assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(())); + + flag_cx(|flag, cx| { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + tx.send(()).unwrap(); + assert!(flag.get()); + + unwrap(task.poll_unpin(cx)); + + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2]); + }) +} + +// test simple use of with to change data +#[test] +fn with_as_map() { + let mut sink = Vec::new().with(|item| future::ok::(item * 2)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 2, 4]); +} + +// test simple use of with_flat_map +#[test] +fn with_flat_map() { + let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + block_on(sink.send(3)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]); +} + +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush { + data: Vec, + waiting_tasks: Vec, +} + +impl Sink> for ManualFlush { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +impl ManualFlush { + fn new() -> Self { + Self { + data: Vec::new(), + waiting_tasks: Vec::new(), + } + } + + fn force_flush(&mut self) -> Vec { + for task in self.waiting_tasks.drain(..) { + task.wake() + } + mem::replace(&mut self.data, Vec::new()) + } +} + +// test that the `with` sink doesn't require the underlying sink to flush, +// but doesn't claim to be flushed until the underlying sink is +#[test] +fn with_flush_propagate() { + let mut sink = ManualFlush::new().with(|x| future::ok::, ()>(x)); + flag_cx(|flag, cx| { + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(0)).unwrap(); + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(1)).unwrap(); + + { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.get()); + } + assert_eq!(sink.get_mut().force_flush(), vec![0, 1]); + assert!(flag.get()); + unwrap(sink.flush().poll_unpin(cx)); + }) +} + +// test that a buffer is a no-nop around a sink that always accepts sends +#[test] +fn buffer_noop() { + let mut sink = Vec::new().buffer(0); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); + + let mut sink = Vec::new().buffer(1); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); +} + +struct ManualAllow { + data: Vec, + allow: Rc, +} + +struct Allow { + flag: Cell, + tasks: RefCell>, +} + +impl Allow { + fn new() -> Self { + Self { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false + } + } + + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); + } + } +} + +impl Sink for ManualAllow { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.allow.check(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow() -> (ManualAllow, Rc) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) +} + +// test basic buffer functionality, including both filling up to capacity, +// and writing out when the underlying sink is ready +#[test] +fn buffer() { + let (sink, allow) = manual_allow::(); + let sink = sink.buffer(2); + + let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.get()); + allow.start(); + assert!(flag.get()); + unwrap(task.poll_unpin(cx)); + assert_eq!(sink.get_ref().data, vec![0, 1, 2]); + }) +} + +#[test] +fn fanout_smoke() { + let sink1 = Vec::new(); + let sink2 = Vec::new(); + let mut sink = sink1.fanout(sink2); + block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]))).unwrap(); + let (sink1, sink2) = sink.into_inner(); + assert_eq!(sink1, vec![1, 2, 3]); + assert_eq!(sink2, vec![1, 2, 3]); +} + +#[test] +fn fanout_backpressure() { + let (left_send, mut left_recv) = mpsc::channel(0); + let (right_send, mut right_recv) = mpsc::channel(0); + let sink = left_send.fanout(right_send); + + let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(!flag.get()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(left_recv.next()), Some(0)); + assert!(flag.get()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(right_recv.next()), Some(0)); + assert!(flag.get()); + + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(left_recv.next()), Some(2)); + assert!(flag.get()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(right_recv.next()), Some(2)); + assert!(flag.get()); + + unwrap(task.poll_unpin(cx)); + // make sure receivers live until end of test to prevent send errors + drop(left_recv); + drop(right_recv); + }) +} + +#[test] +fn sink_map_err() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx = tx.sink_map_err(|_| ()); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!( + Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), + Err(()) + ); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct ErrIntoTest; + +impl From for ErrIntoTest { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +#[test] +fn err_into() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx: SinkErrInto, _, ErrIntoTest> = tx.sink_err_into(); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!( + Pin::new(&mut tx.sink_err_into()).start_send(()), + Err(ErrIntoTest) + ); +} diff --git a/futures/tests_disabled/stream_catch_unwind.rs b/futures/tests/stream_catch_unwind.rs similarity index 53% rename from futures/tests_disabled/stream_catch_unwind.rs rename to futures/tests/stream_catch_unwind.rs index 54d7cf4b0c..8b23a0a7ef 100644 --- a/futures/tests_disabled/stream_catch_unwind.rs +++ b/futures/tests/stream_catch_unwind.rs @@ -1,27 +1,27 @@ use futures::executor::block_on_stream; -use futures::stream; +use futures::stream::{self, StreamExt}; #[test] fn panic_in_the_middle_of_the_stream() { - let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]); + let stream = stream::iter(vec![Some(10), None, Some(11)]); // panic on second element let stream_panicking = stream.map(|o| o.unwrap()); let mut iter = block_on_stream(stream_panicking.catch_unwind()); - assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); + assert_eq!(10, iter.next().unwrap().ok().unwrap()); assert!(iter.next().unwrap().is_err()); assert!(iter.next().is_none()); } #[test] fn no_panic() { - let stream = stream::iter_ok::<_, bool>(vec![10, 11, 12]); + let stream = stream::iter(vec![10, 11, 12]); let mut iter = block_on_stream(stream.catch_unwind()); - assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); - assert_eq!(Ok(11), iter.next().unwrap().ok().unwrap()); - assert_eq!(Ok(12), iter.next().unwrap().ok().unwrap()); + assert_eq!(10, iter.next().unwrap().ok().unwrap()); + assert_eq!(11, iter.next().unwrap().ok().unwrap()); + assert_eq!(12, iter.next().unwrap().ok().unwrap()); assert!(iter.next().is_none()); } diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index c6db7084e9..eb711dda0c 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -1,8 +1,7 @@ -#![feature(async_await)] - +use futures::channel::mpsc; use futures::executor::block_on_stream; use futures::future::{self, FutureExt}; -use futures::stream::{self, FusedStream, SelectAll, StreamExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; use futures::task::Poll; use futures_test::task::noop_context; @@ -48,3 +47,32 @@ fn issue_1626() { assert_eq!(s.next(), Some(14)); assert_eq!(s.next(), None); } + +#[test] +fn works_1() { + let (a_tx, a_rx) = mpsc::unbounded::(); + let (b_tx, b_rx) = mpsc::unbounded::(); + let (c_tx, c_rx) = mpsc::unbounded::(); + + let streams = vec![a_rx, b_rx, c_rx]; + + let mut stream = block_on_stream(select_all(streams)); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + c_tx.unbounded_send(42).unwrap(); + assert_eq!(Some(42), stream.next()); + a_tx.unbounded_send(43).unwrap(); + assert_eq!(Some(43), stream.next()); + + drop((a_tx, b_tx, c_tx)); + assert_eq!(None, stream.next()); +} diff --git a/futures/tests_disabled/sink.rs b/futures/tests_disabled/sink.rs deleted file mode 100644 index b4d69e39ca..0000000000 --- a/futures/tests_disabled/sink.rs +++ /dev/null @@ -1,469 +0,0 @@ -use futures::future::ok; -use futures::stream; -use futures::channel::{oneshot, mpsc}; -use futures::task::{self, Wake, Waker}; -use futures::executor::block_on; -use futures::sink::SinkErrInto; -use std::cell::{Cell, RefCell}; -use std::collections::VecDeque; -use std::mem; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicBool}; - -mod support; -use support::*; - -#[test] -fn either_sink() { - let mut s = if true { - Vec::::new().left_sink() - } else { - VecDeque::::new().right_sink() - }; - - s.start_send(0).unwrap(); -} - -#[test] -fn vec_sink() { - let mut v = Vec::new(); - v.start_send(0).unwrap(); - v.start_send(1).unwrap(); - assert_eq!(v, vec![0, 1]); - assert_done(move || v.flush(), Ok(vec![0, 1])); -} - -#[test] -fn vecdeque_sink() { - let mut deque = VecDeque::new(); - deque.start_send(2).unwrap(); - deque.start_send(3).unwrap(); - - assert_eq!(deque.pop_front(), Some(2)); - assert_eq!(deque.pop_front(), Some(3)); - assert_eq!(deque.pop_front(), None); -} - -#[test] -fn send() { - let v = Vec::new(); - - let v = block_on(v.send(0)).unwrap(); - assert_eq!(v, vec![0]); - - let v = block_on(v.send(1)).unwrap(); - assert_eq!(v, vec![0, 1]); - - assert_done(move || v.send(2), - Ok(vec![0, 1, 2])); -} - -#[test] -fn send_all() { - let v = Vec::new(); - - let (v, _) = block_on(v.send_all(stream::iter_ok(vec![0, 1]))).unwrap(); - assert_eq!(v, vec![0, 1]); - - let (v, _) = block_on(v.send_all(stream::iter_ok(vec![2, 3]))).unwrap(); - assert_eq!(v, vec![0, 1, 2, 3]); - - assert_done( - move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v), - Ok(vec![0, 1, 2, 3, 4, 5])); -} - -// An Unpark struct that records unpark events for inspection -struct Flag(pub AtomicBool); - -impl Flag { - fn new() -> Arc { - Arc::new(Flag(AtomicBool::new(false))) - } - - fn get(&self) -> bool { - self.0.load(Ordering::SeqCst) - } - - fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } -} - -impl Wake for Flag { - fn wake(arc_self: &Arc) { - arc_self.set(true) - } -} - -fn flag_cx(f: F) -> R - where F: FnOnce(Arc, &mut Context<'_>) -> R -{ - let flag = Flag::new(); - let map = &mut task::LocalMap::new(); - let waker = Waker::from(flag.clone()); - let exec = &mut support::PanicExec; - - let cx = &mut Context::new(map, &waker, exec); - f(flag, cx) -} - -// Sends a value on an i32 channel sink -struct StartSendFut(Option, Option); - -impl StartSendFut { - fn new(sink: S, item: S::SinkItem) -> StartSendFut { - StartSendFut(Some(sink), Some(item)) - } -} - -impl Future for StartSendFut { - type Item = S; - type Error = S::SinkError; - - fn poll(&mut self, cx: &mut Context<'_>) -> Poll { - { - let inner = self.0.as_mut().unwrap(); - ready!(inner.poll_ready(cx))?; - inner.start_send(self.1.take().unwrap())?; - } - Ok(Poll::Ready(self.0.take().unwrap())) - } -} - -#[test] -// Test that `start_send` on an `mpsc` channel does indeed block when the -// channel is full -fn mpsc_blocking_start_send() { - let (mut tx, mut rx) = mpsc::channel::(0); - - block_on(futures::future::lazy(|_| { - tx.start_send(0).unwrap(); - - flag_cx(|flag, cx| { - let mut task = StartSendFut::new(tx, 1); - - assert!(task.poll(cx).unwrap().is_pending()); - assert!(!flag.get()); - sassert_next(&mut rx, 0); - assert!(flag.get()); - flag.set(false); - assert!(task.poll(cx).unwrap().is_ready()); - assert!(!flag.get()); - sassert_next(&mut rx, 1); - - Ok::<(), ()>(()) - }) - })).unwrap(); -} - -#[test] -// test `flush` by using `with` to make the first insertion into a sink block -// until a oneshot is completed -fn with_flush() { - let (tx, rx) = oneshot::channel(); - let mut block = Box::new(rx) as Box>; - let mut sink = Vec::new().with(|elem| { - mem::replace(&mut block, Box::new(ok(()))) - .map(move |_| elem + 1).map_err(|_| -> Never { panic!() }) - }); - - assert_eq!(sink.start_send(0), Ok(())); - - flag_cx(|flag, cx| { - let mut task = sink.flush(); - assert!(task.poll(cx).unwrap().is_pending()); - tx.send(()).unwrap(); - assert!(flag.get()); - - let sink = match task.poll(cx).unwrap() { - Poll::Ready(sink) => sink, - _ => panic!() - }; - - assert_eq!(block_on(sink.send(1)).unwrap().get_ref(), &[1, 2]); - }) -} - -#[test] -// test simple use of with to change data -fn with_as_map() { - let sink = Vec::new().with(|item| -> Result { - Ok(item * 2) - }); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - let sink = block_on(sink.send(2)).unwrap(); - assert_eq!(sink.get_ref(), &[0, 2, 4]); -} - -#[test] -// test simple use of with_flat_map -fn with_flat_map() { - let sink = Vec::new().with_flat_map(|item| { - stream::iter_ok(vec![item; item]) - }); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - let sink = block_on(sink.send(2)).unwrap(); - let sink = block_on(sink.send(3)).unwrap(); - assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]); -} - -// Immediately accepts all requests to start pushing, but completion is managed -// by manually flushing -struct ManualFlush { - data: Vec, - waiting_tasks: Vec, -} - -impl Sink for ManualFlush { - type SinkItem = Option; // Pass None to flush - type SinkError = (); - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<(), Self::SinkError> { - Ok(Poll::Ready(())) - } - - fn start_send(&mut self, f: Self::SinkItem) -> Result<(), Self::SinkError> { - if let Some(item) = f { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) - } - - fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::SinkError> { - if self.data.is_empty() { - Ok(Poll::Ready(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Ok(Poll::Pending) - } - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::SinkError> { - self.poll_flush(cx) - } -} - -impl ManualFlush { - fn new() -> ManualFlush { - ManualFlush { - data: Vec::new(), - waiting_tasks: Vec::new() - } - } - - fn force_flush(&mut self) -> Vec { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) - } -} - -#[test] -// test that the `with` sink doesn't require the underlying sink to flush, -// but doesn't claim to be flushed until the underlying sink is -fn with_flush_propagate() { - let mut sink = ManualFlush::new().with(|x| -> Result, ()> { Ok(x) }); - flag_cx(|flag, cx| { - assert!(sink.poll_ready(cx).unwrap().is_ready()); - sink.start_send(Some(0)).unwrap(); - assert!(sink.poll_ready(cx).unwrap().is_ready()); - sink.start_send(Some(1)).unwrap(); - - let mut task = sink.flush(); - assert!(task.poll(cx).unwrap().is_pending()); - assert!(!flag.get()); - assert_eq!(task.get_mut().unwrap().get_mut().force_flush(), vec![0, 1]); - assert!(flag.get()); - assert!(task.poll(cx).unwrap().is_ready()); - }) -} - -#[test] -// test that a buffer is a no-nop around a sink that always accepts sends -fn buffer_noop() { - let sink = Vec::new().buffer(0); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - assert_eq!(sink.get_ref(), &[0, 1]); - - let sink = Vec::new().buffer(1); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - assert_eq!(sink.get_ref(), &[0, 1]); -} - -struct ManualAllow { - data: Vec, - allow: Rc, -} - -struct Allow { - flag: Cell, - tasks: RefCell>, -} - -impl Allow { - fn new() -> Allow { - Allow { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } - } -} - -impl Sink for ManualAllow { - type SinkItem = T; - type SinkError = Never; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::SinkError> { - if self.allow.check(cx) { - Ok(Poll::Ready(())) - } else { - Ok(Poll::Pending) - } - } - - fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { - self.data.push(item); - Ok(()) - } - - fn poll_flush(&mut self, _: &mut Context<'_>) -> Poll<(), Self::SinkError> { - Ok(Poll::Ready(())) - } - - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<(), Self::SinkError> { - Ok(Poll::Ready(())) - } -} - -fn manual_allow() -> (ManualAllow, Rc) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) -} - -#[test] -// test basic buffer functionality, including both filling up to capacity, -// and writing out when the underlying sink is ready -fn buffer() { - let (sink, allow) = manual_allow::(); - let sink = sink.buffer(2); - - let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); - let sink = block_on(StartSendFut::new(sink, 1)).unwrap(); - - flag_cx(|flag, cx| { - let mut task = sink.send(2); - assert!(task.poll(cx).unwrap().is_pending()); - assert!(!flag.get()); - allow.start(); - assert!(flag.get()); - match task.poll(cx).unwrap() { - Poll::Ready(sink) => { - assert_eq!(sink.get_ref().data, vec![0, 1, 2]); - } - _ => panic!() - } - }) -} - -#[test] -fn fanout_smoke() { - let sink1 = Vec::new(); - let sink2 = Vec::new(); - let sink = sink1.fanout(sink2); - let stream = futures::stream::iter_ok(vec![1,2,3]); - let (sink, _) = block_on(sink.send_all(stream)).unwrap(); - let (sink1, sink2) = sink.into_inner(); - assert_eq!(sink1, vec![1,2,3]); - assert_eq!(sink2, vec![1,2,3]); -} - -#[test] -fn fanout_backpressure() { - let (left_send, left_recv) = mpsc::channel(0); - let (right_send, right_recv) = mpsc::channel(0); - let sink = left_send.fanout(right_send); - - let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); - - flag_cx(|flag, cx| { - let mut task = sink.send(2); - assert!(!flag.get()); - assert!(task.poll(cx).unwrap().is_pending()); - let (item, left_recv) = block_on(left_recv.next()).unwrap(); - assert_eq!(item, Some(0)); - assert!(flag.get()); - assert!(task.poll(cx).unwrap().is_pending()); - let (item, right_recv) = block_on(right_recv.next()).unwrap(); - assert_eq!(item, Some(0)); - assert!(flag.get()); - assert!(task.poll(cx).unwrap().is_ready()); - // make sure receivers live until end of test to prevent send errors - drop(left_recv); - drop(right_recv); - }) -} - -#[test] -fn map_err() { - panic_waker_cx(|cx| { - let (tx, _rx) = mpsc::channel(1); - let mut tx = tx.sink_map_err(|_| ()); - assert_eq!(tx.start_send(()), Ok(())); - assert_eq!(tx.poll_flush(cx), Ok(Poll::Ready(()))); - }); - - let tx = mpsc::channel(0).0; - assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(())); -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -struct FromErrTest; - -impl From for FromErrTest { - fn from(_: mpsc::SendError) -> FromErrTest { - FromErrTest - } -} - -#[test] -fn from_err() { - panic_waker_cx(|cx| { - let (tx, _rx) = mpsc::channel(1); - let mut tx: SinkErrInto, FromErrTest> = tx.sink_err_into(); - assert_eq!(tx.start_send(()), Ok(())); - assert_eq!(tx.poll_flush(cx), Ok(Poll::Ready(()))); - }); - - let tx = mpsc::channel(0).0; - assert_eq!(tx.sink_err_into().start_send(()), Err(FromErrTest)); -} diff --git a/futures/tests_disabled/stream_select_all.rs b/futures/tests_disabled/stream_select_all.rs deleted file mode 100644 index b728492b41..0000000000 --- a/futures/tests_disabled/stream_select_all.rs +++ /dev/null @@ -1,35 +0,0 @@ -use futures::executor::block_on_stream; -use futures::channel::mpsc; -use futures::stream::select_all; -use std::mem; - -mod support; - -#[test] -fn works_1() { - let (a_tx, a_rx) = mpsc::unbounded::(); - let (b_tx, b_rx) = mpsc::unbounded::(); - let (c_tx, c_rx) = mpsc::unbounded::(); - - let streams = vec![a_rx, b_rx, c_rx]; - - let mut stream = block_on_stream(select_all(streams)); - - b_tx.unbounded_send(99).unwrap(); - a_tx.unbounded_send(33).unwrap(); - assert_eq!(Some(Ok(33)), stream.next()); - assert_eq!(Some(Ok(99)), stream.next()); - - b_tx.unbounded_send(99).unwrap(); - a_tx.unbounded_send(33).unwrap(); - assert_eq!(Some(Ok(33)), stream.next()); - assert_eq!(Some(Ok(99)), stream.next()); - - c_tx.unbounded_send(42).unwrap(); - assert_eq!(Some(Ok(42)), stream.next()); - a_tx.unbounded_send(43).unwrap(); - assert_eq!(Some(Ok(43)), stream.next()); - - mem::drop((a_tx, b_tx, c_tx)); - assert_eq!(None, stream.next()); -}