Skip to content

Commit

Permalink
[WIP] Re-add disabled tests
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jul 3, 2019
1 parent b385c4b commit f8e8506
Show file tree
Hide file tree
Showing 12 changed files with 1,460 additions and 1,332 deletions.
461 changes: 461 additions & 0 deletions futures/tests/all.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::SinkExt;
use futures::sink::SinkExt;
use futures::executor::{block_on, block_on_stream};
use futures::stream::StreamExt;
use futures::stream::{StreamExt, TryStreamExt};
use futures::channel::oneshot;
use futures::channel::mpsc;
use std::sync::mpsc as std_mpsc;
Expand All @@ -17,13 +17,13 @@ fn works() {
let t1 = thread::spawn(move || {
for _ in 0..N+1 {
let (mytx, myrx) = oneshot::channel();
tx = block_on(tx.send(myrx)).unwrap();
block_on(tx.send(myrx)).unwrap();
tx3.send(mytx).unwrap();
}
rx2.recv().unwrap();
for _ in 0..N {
let (mytx, myrx) = oneshot::channel();
tx = block_on(tx.send(myrx)).unwrap();
block_on(tx.send(myrx)).unwrap();
tx3.send(mytx).unwrap();
}
});
Expand Down
65 changes: 46 additions & 19 deletions futures/tests_disabled/eventual.rs → futures/tests/eventual.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
use futures::channel::oneshot;
use futures::future::{ok, err};
use futures::future::{self, ok, Future,FutureExt, TryFutureExt};
use futures::executor::block_on;
use std::sync::mpsc;
use std::thread;

mod support;
use support::*;

use std::fmt;

fn assert_done<T, F, Fut>(actual_fut: F, expected: T)
where
T: PartialEq + fmt::Debug,
F: FnOnce() -> Fut,
Fut: Future<Output = T> + Unpin,
{
let output = block_on(actual_fut());
assert_eq!(output, expected);
}

#[test]
fn join1() {
let (tx, rx) = mpsc::channel();
ok::<i32, i32>(1).join(ok(2))
.map(move |v| tx.send(v).unwrap())
.forget();
future::try_join(ok::<i32, i32>(1), ok(2))
.map(move |v| tx.send(v).unwrap())
;//.forget();
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
}
Expand All @@ -22,7 +30,9 @@ fn join2() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).map(move |v| tx.send(v).unwrap()).forget();
future::try_join(p1, p2)
.map(move |v| tx.send(v).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
c1.send(1).unwrap();
assert!(rx.try_recv().is_err());
Expand All @@ -36,7 +46,9 @@ fn join3() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget();
future::try_join(p1, p2)
.map_err(move |_v| tx.send(1).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
drop(c1);
assert_eq!(rx.recv(), Ok(1));
Expand All @@ -49,7 +61,9 @@ fn join4() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget();
future::try_join(p1, p2)
.map_err(move |v| tx.send(v).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
drop(c1);
assert!(rx.recv().is_ok());
Expand All @@ -63,7 +77,9 @@ fn join5() {
let (c2, p2) = oneshot::channel::<i32>();
let (c3, p3) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget();
future::try_join(future::try_join(p1, p2), p3)
.map(move |v| tx.send(v).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
c1.send(1).unwrap();
assert!(rx.try_recv().is_err());
Expand All @@ -79,15 +95,18 @@ fn select1() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.select(p2).map(move |v| tx.send(v).unwrap()).forget();
future::try_select(p1, p2)
.map(move |v| tx.send(v).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
c1.send(1).unwrap();
let (v, p2) = rx.recv().unwrap().into_inner();
assert_eq!(v, 1);
assert!(rx.recv().is_err());

let (tx, rx) = mpsc::channel();
p2.map(move |v| tx.send(v).unwrap()).forget();
p2.map(move |v| tx.send(v).unwrap())
;//.forget();
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
Expand All @@ -98,15 +117,18 @@ fn select2() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget();
future::try_select(p1, p2)
.map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
drop(c1);
let (v, p2) = rx.recv().unwrap();
assert_eq!(v, 1);
assert!(rx.recv().is_err());

let (tx, rx) = mpsc::channel();
p2.map(move |v| tx.send(v).unwrap()).forget();
p2.map(move |v| tx.send(v).unwrap())
;//.forget();
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
Expand All @@ -117,15 +139,18 @@ fn select3() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget();
future::try_select(p1, p2)
.map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())
;//.forget();
assert!(rx.try_recv().is_err());
drop(c1);
let (v, p2) = rx.recv().unwrap();
assert_eq!(v, 1);
assert!(rx.recv().is_err());

let (tx, rx) = mpsc::channel();
p2.map_err(move |_v| tx.send(2).unwrap()).forget();
p2.map_err(move |_v| tx.send(2).unwrap())
;//.forget();
drop(c2);
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
Expand All @@ -147,7 +172,9 @@ fn select4() {
let (c2, p2) = oneshot::channel::<i32>();

let tx3 = tx2.clone();
p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget();
future::try_select(p1, p2)
.map(move |_| tx3.send(()).unwrap())
;//.forget();
tx.send(c1).unwrap();
rx2.recv().unwrap();
drop(c2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::Async::*;
use futures::future;
use futures::stream::FuturesUnordered;
use futures::channel::oneshot;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures_test::task::noop_context;
use std::panic::{self, AssertUnwindSafe};

mod support;
use std::sync::{Arc, Barrier};
use std::thread;

trait AssertSendSync: Send + Sync {}
impl AssertSendSync for FuturesUnordered<()> {}
Expand All @@ -22,22 +23,20 @@ fn basic_usage() {
queue.push(rx2);
queue.push(rx3);

assert!(!queue.poll_next(cx).unwrap().is_ready());
assert!(!queue.poll_next_unpin(cx).is_ready());

tx2.send("hello").unwrap();

assert_eq!(Ready(Some("hello")), queue.poll_next(cx).unwrap());
assert!(!queue.poll_next(cx).unwrap().is_ready());
assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
assert!(!queue.poll_next_unpin(cx).is_ready());

tx1.send("world").unwrap();
tx3.send("world2").unwrap();

assert_eq!(Ready(Some("world")), queue.poll_next(cx).unwrap());
assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap());
assert_eq!(Ready(None), queue.poll_next(cx).unwrap());

Ok::<_, ()>(())
})).unwrap();
assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}

#[test]
Expand All @@ -52,22 +51,20 @@ fn resolving_errors() {
queue.push(rx2);
queue.push(rx3);

assert!(!queue.poll_next(cx).unwrap().is_ready());
assert!(!queue.poll_next_unpin(cx).is_ready());

drop(tx2);

assert!(queue.poll_next(cx).is_err());
assert!(!queue.poll_next(cx).unwrap().is_ready());
assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
assert!(!queue.poll_next_unpin(cx).is_ready());

drop(tx1);
tx3.send("world2").unwrap();

assert!(queue.poll_next(cx).is_err());
assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap());
assert_eq!(Ready(None), queue.poll_next(cx).unwrap());

Ok::<_, ()>(())
})).unwrap();
assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}

#[test]
Expand All @@ -82,29 +79,25 @@ fn dropping_ready_queue() {
queue.push(rx2);
queue.push(rx3);

support::noop_waker_lw(|cx| {
assert!(!tx1.poll_cancel(cx).unwrap().is_ready());
assert!(!tx2.poll_cancel(cx).unwrap().is_ready());
assert!(!tx3.poll_cancel(cx).unwrap().is_ready());
{
let cx = &mut noop_context();
assert!(!tx1.poll_cancel(cx).is_ready());
assert!(!tx2.poll_cancel(cx).is_ready());
assert!(!tx3.poll_cancel(cx).is_ready());

drop(queue);

assert!(tx1.poll_cancel(cx).unwrap().is_ready());
assert!(tx2.poll_cancel(cx).unwrap().is_ready());
assert!(tx3.poll_cancel(cx).unwrap().is_ready());
});

Ok::<_, ()>(()).into_future()
})).unwrap();
assert!(tx1.poll_cancel(cx).is_ready());
assert!(tx2.poll_cancel(cx).is_ready());
assert!(tx3.poll_cancel(cx).is_ready());
}
}));
}

#[test]
fn stress() {
const ITER: usize = 300;

use std::sync::{Arc, Barrier};
use std::thread;

for i in 0..ITER {
let n = (i % 10) + 1;

Expand All @@ -129,10 +122,7 @@ fn stress() {

let mut sync = block_on_stream(queue);

let mut rx: Vec<_> = (&mut sync)
.take(n)
.map(|res| res.unwrap())
.collect();
let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();

assert_eq!(rx.len(), n);

Expand All @@ -151,15 +141,11 @@ fn stress() {
fn panicking_future_dropped() {
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
queue.push(future::poll_fn(|_| -> Poll<i32, i32> {
panic!()
}));
queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));

let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next(cx)));
let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
assert!(r.is_err());
assert!(queue.is_empty());
assert_eq!(Ready(None), queue.poll_next(cx).unwrap());

Ok::<_, ()>(())
})).unwrap();
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}
Loading

0 comments on commit f8e8506

Please sign in to comment.