Skip to content

Commit

Permalink
tests: cfg-ignore compat tests if there is no compat feature (rust-la…
Browse files Browse the repository at this point in the history
…ng#2353)

Signed-off-by: Nick Cameron <nrc@ncameron.org>
  • Loading branch information
nrc authored Feb 22, 2021
1 parent f86bb46 commit c9a81c0
Show file tree
Hide file tree
Showing 193 changed files with 1,696 additions and 2,203 deletions.
6 changes: 2 additions & 4 deletions examples/functional/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ fn main() {
// responsible for transmission
pool.spawn_ok(fut_tx_result);

let fut_values = rx
.map(|v| v * 2)
.collect();
let fut_values = rx.map(|v| v * 2).collect();

// Use the executor provided to this async block to wait for the
// future to complete.
Expand All @@ -45,4 +43,4 @@ fn main() {
let values: Vec<i32> = executor::block_on(fut_values);

println!("Values={:?}", values);
}
}
4 changes: 2 additions & 2 deletions examples/imperative/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
// of the stream to be available.
while let Some(v) = rx.next().await {
pending.push(v * 2);
};
}

pending
};
Expand All @@ -45,4 +45,4 @@ fn main() {
let values: Vec<i32> = executor::block_on(fut_values);

println!("Values={:?}", values);
}
}
15 changes: 3 additions & 12 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use {
futures::{
channel::mpsc::{self, Sender, UnboundedSender},
ready,
stream::{Stream, StreamExt},
sink::Sink,
stream::{Stream, StreamExt},
task::{Context, Poll},
},
futures_test::task::noop_context,
Expand All @@ -25,7 +25,6 @@ fn unbounded_1_tx(b: &mut Bencher) {
// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

Expand Down Expand Up @@ -73,7 +72,6 @@ fn unbounded_uncontended(b: &mut Bencher) {
})
}


/// A Stream that continuously sends incrementing number of the queue
struct TestSender {
tx: Sender<u32>,
Expand All @@ -84,9 +82,7 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);

Expand Down Expand Up @@ -123,12 +119,7 @@ fn bounded_100_tx(b: &mut Bencher) {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);

let mut tx: Vec<_> = (0..100).map(|_| {
TestSender {
tx: tx.clone(),
last: 0
}
}).collect();
let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();

for i in 0..10 {
for x in &mut tx {
Expand Down
2 changes: 0 additions & 2 deletions futures-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
//! library is activated, and it is activated by default.
#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
Expand Down
10 changes: 3 additions & 7 deletions futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::stream::StreamExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

Expand All @@ -11,9 +11,7 @@ fn sequence() {
let (tx, rx) = mpsc::channel(1);

let amt = 20;
let t = thread::spawn(move || {
block_on(send_sequence(amt, tx))
});
let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
let list: Vec<_> = block_on(rx.collect());
let mut list = list.into_iter();
for i in (1..=amt).rev() {
Expand All @@ -34,9 +32,7 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
let f = poll_fn(|cx| {
rx.poll_next_unpin(cx)
});
let f = poll_fn(|cx| rx.poll_next_unpin(cx));
assert_eq!(block_on(f), None)
}

Expand Down
28 changes: 13 additions & 15 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use std::time::{Duration, Instant};
fn smoke() {
let (mut sender, receiver) = mpsc::channel(1);

let t = thread::spawn(move || {
while let Ok(()) = block_on(sender.send(42)) {}
});
let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});

// `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
Expand Down Expand Up @@ -166,7 +164,7 @@ fn stress_try_send_as_receiver_closes() {
struct TestRx {
rx: mpsc::Receiver<Arc<()>>,
// The number of times to query `rx` before dropping it.
poll_count: usize
poll_count: usize,
}
struct TestTask {
command_rx: mpsc::Receiver<TestRx>,
Expand All @@ -190,14 +188,11 @@ fn stress_try_send_as_receiver_closes() {
impl Future for TestTask {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the test channel, if one is present.
if let Some(rx) = &mut self.test_rx {
if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
let _ = v.expect("test finished unexpectedly!");
let _ = v.expect("test finished unexpectedly!");
}
self.countdown -= 1;
// Busy-poll until the countdown is finished.
Expand All @@ -209,9 +204,9 @@ fn stress_try_send_as_receiver_closes() {
self.test_rx = Some(rx);
self.countdown = poll_count;
cx.waker().wake_by_ref();
},
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => {},
Poll::Pending => {}
}
if self.countdown == 0 {
// Countdown complete -- drop the Receiver.
Expand Down Expand Up @@ -255,10 +250,14 @@ fn stress_try_send_as_receiver_closes() {
if prev_weak.upgrade().is_none() {
break;
}
assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
assert!(
t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
"item not dropped on iteration {} after \
{} sends ({} successful). spin=({})",
i, attempted_sends, successful_sends, spins
i,
attempted_sends,
successful_sends,
spins
);
spins += 1;
thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
Expand All @@ -273,6 +272,5 @@ fn stress_try_send_as_receiver_closes() {
}
}
drop(cmd_tx);
bg.join()
.expect("background thread join");
bg.join().expect("background thread join");
}
33 changes: 15 additions & 18 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
use futures::future::{FutureExt, poll_fn};
use futures::stream::{Stream, StreamExt};
use futures::future::{poll_fn, FutureExt};
use futures::pin_mut;
use futures::sink::{Sink, SinkExt};
use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures::pin_mut;
use futures_test::task::{new_count_waker, noop_context};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

trait AssertSend: Send {}
Expand Down Expand Up @@ -77,7 +77,7 @@ fn send_shared_recv() {
fn send_recv_threads() {
let (mut tx, rx) = mpsc::channel::<i32>(16);

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
block_on(tx.send(1)).unwrap();
});

Expand Down Expand Up @@ -204,7 +204,7 @@ fn stress_shared_unbounded() {
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
Expand All @@ -215,7 +215,7 @@ fn stress_shared_unbounded() {
for _ in 0..NTHREADS {
let tx = tx.clone();

thread::spawn(move|| {
thread::spawn(move || {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
Expand All @@ -233,7 +233,7 @@ fn stress_shared_bounded_hard() {
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);

let t = thread::spawn(move|| {
let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
Expand Down Expand Up @@ -297,9 +297,9 @@ fn stress_receiver_multi_task_bounded_hard() {
}
Poll::Ready(None) => {
*rx_opt = None;
break
},
Poll::Pending => {},
break;
}
Poll::Pending => {}
}
}
} else {
Expand All @@ -311,7 +311,6 @@ fn stress_receiver_multi_task_bounded_hard() {
th.push(t);
}


for i in 0..AMT {
block_on(tx.send(i)).unwrap();
}
Expand All @@ -328,7 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() {
/// after sender dropped.
#[test]
fn stress_drop_sender() {
fn list() -> impl Stream<Item=i32> {
fn list() -> impl Stream<Item = i32> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(send_one_two_three(tx));
Expand Down Expand Up @@ -407,9 +406,7 @@ fn stress_poll_ready() {
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
threads.push(thread::spawn(move || {
block_on(stress_poll_ready_sender(sender, AMT))
}));
threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
}
drop(tx);

Expand All @@ -436,7 +433,7 @@ fn try_send_1() {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
break
break;
}
}
}
Expand Down Expand Up @@ -542,8 +539,8 @@ fn is_connected_to() {

#[test]
fn hash_receiver() {
use std::hash::Hasher;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;

let mut hasher_a1 = DefaultHasher::new();
let mut hasher_a2 = DefaultHasher::new();
Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{FutureExt, poll_fn};
use futures::future::{poll_fn, FutureExt};
use futures::task::{Context, Poll};
use futures_test::task::panic_waker_ref;
use std::sync::mpsc;
Expand Down Expand Up @@ -70,7 +70,7 @@ fn close() {
rx.close();
block_on(poll_fn(|cx| {
match rx.poll_unpin(cx) {
Poll::Ready(Err(_)) => {},
Poll::Ready(Err(_)) => {}
_ => panic!(),
};
assert!(tx.poll_canceled(cx).is_ready());
Expand Down
10 changes: 4 additions & 6 deletions futures-core/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ pub trait TryFuture: Future + private_try_future::Sealed {
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Future` trait; in the future it won't be
/// needed.
fn try_poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Ok, Self::Error>>;
fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>;
}

impl<F, T, E> TryFuture for F
where F: ?Sized + Future<Output = Result<T, E>>
where
F: ?Sized + Future<Output = Result<T, E>>,
{
type Ok = T;
type Error = E;
Expand All @@ -87,8 +85,8 @@ impl<F, T, E> TryFuture for F

#[cfg(feature = "alloc")]
mod if_alloc {
use alloc::boxed::Box;
use super::*;
use alloc::boxed::Box;

impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> {
fn is_terminated(&self) -> bool {
Expand Down
8 changes: 4 additions & 4 deletions futures-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//! Core traits and types for asynchronous operations in Rust.
#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
Expand All @@ -17,10 +15,12 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
extern crate alloc;

pub mod future;
#[doc(hidden)] pub use self::future::{Future, FusedFuture, TryFuture};
#[doc(hidden)]
pub use self::future::{FusedFuture, Future, TryFuture};

pub mod stream;
#[doc(hidden)] pub use self::stream::{Stream, FusedStream, TryStream};
#[doc(hidden)]
pub use self::stream::{FusedStream, Stream, TryStream};

#[macro_use]
pub mod task;
Expand Down
Loading

0 comments on commit c9a81c0

Please sign in to comment.