Skip to content

Commit

Permalink
[WIP] Re-add disabled tests
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jul 20, 2019
1 parent 2c54502 commit c61a197
Show file tree
Hide file tree
Showing 12 changed files with 1,447 additions and 1,331 deletions.
460 changes: 460 additions & 0 deletions futures/tests/all.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::SinkExt;
use futures::sink::SinkExt;
use futures::executor::{block_on, block_on_stream};
use futures::stream::StreamExt;
use futures::channel::oneshot;
Expand All @@ -17,20 +17,20 @@ fn works() {
let t1 = thread::spawn(move || {
for _ in 0..N+1 {
let (mytx, myrx) = oneshot::channel();
tx = block_on(tx.send(myrx)).unwrap();
block_on(tx.send(myrx)).unwrap();
tx3.send(mytx).unwrap();
}
rx2.recv().unwrap();
for _ in 0..N {
let (mytx, myrx) = oneshot::channel();
tx = block_on(tx.send(myrx)).unwrap();
block_on(tx.send(myrx)).unwrap();
tx3.send(mytx).unwrap();
}
});

let (tx4, rx4) = std_mpsc::channel();
let t2 = thread::spawn(move || {
for item in block_on_stream(rx.map_err(|_| panic!()).buffer_unordered(N)) {
for item in block_on_stream(rx.buffer_unordered(N)) {
tx4.send(item.unwrap()).unwrap();
}
});
Expand Down
37 changes: 19 additions & 18 deletions futures/tests_disabled/eventual.rs → futures/tests/eventual.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use futures::channel::oneshot;
use futures::future::{ok, err};
use futures::executor::ThreadPool;
use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
use futures::task::SpawnExt;
use std::sync::mpsc;
use std::thread;

mod support;
use support::*;

fn run<F: Future + Send + 'static>(future: F) {
let mut tp = ThreadPool::new().unwrap();
tp.spawn(future.map(drop)).unwrap();
}

#[test]
fn join1() {
let (tx, rx) = mpsc::channel();
ok::<i32, i32>(1).join(ok(2))
.map(move |v| tx.send(v).unwrap())
.forget();
run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
}
Expand All @@ -22,7 +23,7 @@ fn join2() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).map(move |v| tx.send(v).unwrap()).forget();
run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
assert!(rx.try_recv().is_err());
c1.send(1).unwrap();
assert!(rx.try_recv().is_err());
Expand All @@ -36,7 +37,7 @@ fn join3() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget();
run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
assert!(rx.try_recv().is_err());
drop(c1);
assert_eq!(rx.recv(), Ok(1));
Expand All @@ -49,7 +50,7 @@ fn join4() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget();
run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
assert!(rx.try_recv().is_err());
drop(c1);
assert!(rx.recv().is_ok());
Expand All @@ -63,7 +64,7 @@ fn join5() {
let (c2, p2) = oneshot::channel::<i32>();
let (c3, p3) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget();
run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
assert!(rx.try_recv().is_err());
c1.send(1).unwrap();
assert!(rx.try_recv().is_err());
Expand All @@ -79,15 +80,15 @@ fn select1() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.select(p2).map(move |v| tx.send(v).unwrap()).forget();
run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
assert!(rx.try_recv().is_err());
c1.send(1).unwrap();
let (v, p2) = rx.recv().unwrap().into_inner();
assert_eq!(v, 1);
assert!(rx.recv().is_err());

let (tx, rx) = mpsc::channel();
p2.map(move |v| tx.send(v).unwrap()).forget();
run(p2.map_ok(move |v| tx.send(v).unwrap()));
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
Expand All @@ -98,15 +99,15 @@ fn select2() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget();
run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
assert!(rx.try_recv().is_err());
drop(c1);
let (v, p2) = rx.recv().unwrap();
assert_eq!(v, 1);
assert!(rx.recv().is_err());

let (tx, rx) = mpsc::channel();
p2.map(move |v| tx.send(v).unwrap()).forget();
run(p2.map_ok(move |v| tx.send(v).unwrap()));
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
Expand All @@ -117,15 +118,15 @@ fn select3() {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
let (tx, rx) = mpsc::channel();
p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget();
run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
assert!(rx.try_recv().is_err());
drop(c1);
let (v, p2) = rx.recv().unwrap();
assert_eq!(v, 1);
assert!(rx.recv().is_err());

let (tx, rx) = mpsc::channel();
p2.map_err(move |_v| tx.send(2).unwrap()).forget();
run(p2.map_err(move |_v| tx.send(2).unwrap()));
drop(c2);
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
Expand All @@ -147,7 +148,7 @@ fn select4() {
let (c2, p2) = oneshot::channel::<i32>();

let tx3 = tx2.clone();
p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget();
run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
tx.send(c1).unwrap();
rx2.recv().unwrap();
drop(c2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::Async::*;
use futures::future;
use futures::stream::FuturesUnordered;
use futures::channel::oneshot;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures_test::task::noop_context;
use std::panic::{self, AssertUnwindSafe};

mod support;
use std::sync::{Arc, Barrier};
use std::thread;

trait AssertSendSync: Send + Sync {}
impl AssertSendSync for FuturesUnordered<()> {}
Expand All @@ -22,22 +23,20 @@ fn basic_usage() {
queue.push(rx2);
queue.push(rx3);

assert!(!queue.poll_next(cx).unwrap().is_ready());
assert!(!queue.poll_next_unpin(cx).is_ready());

tx2.send("hello").unwrap();

assert_eq!(Ready(Some("hello")), queue.poll_next(cx).unwrap());
assert!(!queue.poll_next(cx).unwrap().is_ready());
assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
assert!(!queue.poll_next_unpin(cx).is_ready());

tx1.send("world").unwrap();
tx3.send("world2").unwrap();

assert_eq!(Ready(Some("world")), queue.poll_next(cx).unwrap());
assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap());
assert_eq!(Ready(None), queue.poll_next(cx).unwrap());

Ok::<_, ()>(())
})).unwrap();
assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}

#[test]
Expand All @@ -52,22 +51,20 @@ fn resolving_errors() {
queue.push(rx2);
queue.push(rx3);

assert!(!queue.poll_next(cx).unwrap().is_ready());
assert!(!queue.poll_next_unpin(cx).is_ready());

drop(tx2);

assert!(queue.poll_next(cx).is_err());
assert!(!queue.poll_next(cx).unwrap().is_ready());
assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
assert!(!queue.poll_next_unpin(cx).is_ready());

drop(tx1);
tx3.send("world2").unwrap();

assert!(queue.poll_next(cx).is_err());
assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap());
assert_eq!(Ready(None), queue.poll_next(cx).unwrap());

Ok::<_, ()>(())
})).unwrap();
assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}

#[test]
Expand All @@ -82,29 +79,25 @@ fn dropping_ready_queue() {
queue.push(rx2);
queue.push(rx3);

support::noop_waker_lw(|cx| {
assert!(!tx1.poll_cancel(cx).unwrap().is_ready());
assert!(!tx2.poll_cancel(cx).unwrap().is_ready());
assert!(!tx3.poll_cancel(cx).unwrap().is_ready());
{
let cx = &mut noop_context();
assert!(!tx1.poll_cancel(cx).is_ready());
assert!(!tx2.poll_cancel(cx).is_ready());
assert!(!tx3.poll_cancel(cx).is_ready());

drop(queue);

assert!(tx1.poll_cancel(cx).unwrap().is_ready());
assert!(tx2.poll_cancel(cx).unwrap().is_ready());
assert!(tx3.poll_cancel(cx).unwrap().is_ready());
});

Ok::<_, ()>(()).into_future()
})).unwrap();
assert!(tx1.poll_cancel(cx).is_ready());
assert!(tx2.poll_cancel(cx).is_ready());
assert!(tx3.poll_cancel(cx).is_ready());
}
}));
}

#[test]
fn stress() {
const ITER: usize = 300;

use std::sync::{Arc, Barrier};
use std::thread;

for i in 0..ITER {
let n = (i % 10) + 1;

Expand All @@ -129,10 +122,7 @@ fn stress() {

let mut sync = block_on_stream(queue);

let mut rx: Vec<_> = (&mut sync)
.take(n)
.map(|res| res.unwrap())
.collect();
let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();

assert_eq!(rx.len(), n);

Expand All @@ -151,15 +141,11 @@ fn stress() {
fn panicking_future_dropped() {
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
queue.push(future::poll_fn(|_| -> Poll<i32, i32> {
panic!()
}));
queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));

let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next(cx)));
let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
assert!(r.is_err());
assert!(queue.is_empty());
assert_eq!(Ready(None), queue.poll_next(cx).unwrap());

Ok::<_, ()>(())
})).unwrap();
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
}
Loading

0 comments on commit c61a197

Please sign in to comment.