diff --git a/.travis.yml b/.travis.yml index c339685c88..8664450198 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,3 +28,6 @@ jobs: - script: cargo test --features=doctest-readme --all name: "doctest readme" rust: nightly + - script: (cd tracing-futures/test_std_future && cargo test) + name: "futures nightly" + rust: nightly diff --git a/tracing-futures/Cargo.toml b/tracing-futures/Cargo.toml index 264e54d8fa..df5901000a 100644 --- a/tracing-futures/Cargo.toml +++ b/tracing-futures/Cargo.toml @@ -5,10 +5,13 @@ authors = ["Eliza Weisman "] edition = "2018" [features] -default = ["tokio"] +default = ["futures-01", "tokio"] +futures-01 = ["futures"] +std-future = ["pin-utils"] [dependencies] -futures = "0.1" +futures = { version = "0.1", optional = true } +pin-utils = { version = "0.1.0-alpha.4", optional = true } tracing = "0.1" tokio = { version = "0.1", optional = true } tokio-executor = { version = "0.1", optional = true } diff --git a/tracing-futures/src/executor.rs b/tracing-futures/src/executor.rs index 521f09dfc7..3a6a2bd2b4 100644 --- a/tracing-futures/src/executor.rs +++ b/tracing-futures/src/executor.rs @@ -1,15 +1,14 @@ use crate::{Instrument, Instrumented, WithDispatch}; +#[cfg(feature = "futures-01")] use futures::{ future::{ExecuteError, Executor}, Future, }; +#[cfg(feature = "futures-01")] +use tokio::executor::{Executor as TokioExecutor, SpawnError}; +use tokio::runtime::{current_thread, Runtime, TaskExecutor}; -#[cfg(feature = "tokio")] -use tokio::{ - executor::{Executor as TokioExecutor, SpawnError}, - runtime::{current_thread, Runtime, TaskExecutor}, -}; - +#[cfg(feature = "futures-01")] macro_rules! deinstrument_err { ($e:expr) => { $e.map_err(|e| { @@ -20,6 +19,7 @@ macro_rules! deinstrument_err { }; } +#[cfg(feature = "futures-01")] impl Executor for Instrumented where T: Executor>, @@ -31,7 +31,7 @@ where } } -#[cfg(feature = "tokio")] +#[cfg(all(feature = "futures-01", feature = "tokio"))] impl TokioExecutor for Instrumented where T: TokioExecutor, @@ -56,6 +56,7 @@ impl Instrumented { /// /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`, /// instrumenting the spawned future beforehand. + #[cfg(feature = "futures-01")] pub fn spawn(&mut self, future: F) -> &mut Self where F: Future + Send + 'static, @@ -80,6 +81,7 @@ impl Instrumented { /// /// This function panics if the executor is at capacity, if the provided /// future panics, or if called within an asynchronous execution context. + #[cfg(feature = "futures-01")] pub fn block_on(&mut self, future: F) -> Result where F: Send + 'static + Future, @@ -108,6 +110,7 @@ impl Instrumented { /// /// This method simply wraps a call to `current_thread::Runtime::spawn`, /// instrumenting the spawned future beforehand. + #[cfg(feature = "futures-01")] pub fn spawn(&mut self, future: F) -> &mut Self where F: Future + 'static, @@ -141,6 +144,7 @@ impl Instrumented { /// /// This function panics if the executor is at capacity, if the provided /// future panics, or if called within an asynchronous execution context. + #[cfg(feature = "futures-01")] pub fn block_on(&mut self, future: F) -> Result where F: 'static + Future, @@ -165,6 +169,7 @@ impl Instrumented { } } +#[cfg(feature = "futures-01")] impl Executor for WithDispatch where T: Executor>, @@ -176,7 +181,7 @@ where } } -#[cfg(feature = "tokio")] +#[cfg(all(feature = "futures-01", feature = "tokio"))] impl TokioExecutor for WithDispatch where T: TokioExecutor, @@ -202,6 +207,7 @@ impl WithDispatch { /// /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`, /// instrumenting the spawned future beforehand. + #[cfg(feature = "futures-01")] pub fn spawn(&mut self, future: F) -> &mut Self where F: Future + Send + 'static, @@ -227,6 +233,7 @@ impl WithDispatch { /// /// This function panics if the executor is at capacity, if the provided /// future panics, or if called within an asynchronous execution context. + #[cfg(feature = "futures-01")] pub fn block_on(&mut self, future: F) -> Result where F: Send + 'static + Future, @@ -257,6 +264,7 @@ impl WithDispatch { /// /// This method simply wraps a call to `current_thread::Runtime::spawn`, /// instrumenting the spawned future beforehand. + #[cfg(feature = "futures-01")] pub fn spawn(&mut self, future: F) -> &mut Self where F: Future + 'static, @@ -290,6 +298,7 @@ impl WithDispatch { /// /// This function panics if the executor is at capacity, if the provided /// future panics, or if called within an asynchronous execution context. + #[cfg(feature = "futures-01")] pub fn block_on(&mut self, future: F) -> Result where F: 'static + Future, diff --git a/tracing-futures/src/lib.rs b/tracing-futures/src/lib.rs index 48fb80ed54..8ba527d771 100644 --- a/tracing-futures/src/lib.rs +++ b/tracing-futures/src/lib.rs @@ -14,7 +14,10 @@ //! //! [`Instrument`]: trait.Instrument.html //! [`WithSubscriber`]: trait.WithSubscriber.html +#[cfg(feature = "futures-01")] extern crate futures; +#[cfg(feature = "std-future")] +extern crate pin_utils; #[cfg(feature = "tokio")] extern crate tokio; #[cfg(feature = "tokio-executor")] @@ -22,9 +25,16 @@ extern crate tokio_executor; #[cfg_attr(test, macro_use)] extern crate tracing; -use futures::{Future, Poll, Sink, StartSend, Stream}; -use tracing::{dispatcher, Dispatch, Span}; +#[cfg(feature = "std-future")] +use std::{pin::Pin, task::Context}; +#[cfg(feature = "futures-01")] +use futures::{Sink, StartSend, Stream}; +#[cfg(feature = "futures-01")] +use tracing::dispatcher; +use tracing::{Dispatch, Span}; + +#[cfg(feature = "tokio")] pub mod executor; // TODO: seal? @@ -60,26 +70,48 @@ pub struct WithDispatch { impl Instrument for T {} -impl Future for Instrumented { +#[cfg(feature = "std-future")] +impl Instrumented { + pin_utils::unsafe_pinned!(inner: T); +} + +#[cfg(feature = "std-future")] +impl std::future::Future for Instrumented { + type Output = T::Output; + + fn poll(mut self: Pin<&mut Self>, lw: &mut Context) -> std::task::Poll { + let span = self.as_ref().span.clone(); + let _enter = span.enter(); + self.as_mut().inner().poll(lw) + } +} + +#[cfg(feature = "std-future")] +impl Unpin for Instrumented {} + +#[cfg(feature = "futures-01")] +impl futures::Future for Instrumented { type Item = T::Item; type Error = T::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> futures::Poll { let _enter = self.span.enter(); self.inner.poll() } } +#[cfg(feature = "futures-01")] impl Stream for Instrumented { type Item = T::Item; type Error = T::Error; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll(&mut self) -> futures::Poll, Self::Error> { let _enter = self.span.enter(); self.inner.poll() } } +#[cfg(feature = "futures-01")] impl Sink for Instrumented { type SinkItem = T::SinkItem; type SinkError = T::SinkError; @@ -89,7 +121,7 @@ impl Sink for Instrumented { self.inner.start_send(item) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { let _enter = self.span.enter(); self.inner.poll_complete() } @@ -116,17 +148,19 @@ impl Instrumented { impl WithSubscriber for T {} -impl Future for WithDispatch { +#[cfg(feature = "futures-01")] +impl futures::Future for WithDispatch { type Item = T::Item; type Error = T::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> futures::Poll { let inner = &mut self.inner; dispatcher::with_default(&self.dispatch, || inner.poll()) } } impl WithDispatch { + #[cfg(feature = "tokio")] pub(crate) fn with_dispatch(&self, inner: U) -> WithDispatch { WithDispatch { dispatch: self.dispatch.clone(), @@ -154,11 +188,7 @@ pub mod support; #[cfg(test)] mod tests { - extern crate tokio; - use super::{test_support::*, *}; - use futures::{future, stream, task, Async}; - use tracing::{subscriber::with_default, Level}; struct PollN { and_return: Option>, @@ -166,23 +196,6 @@ mod tests { polls: usize, } - impl Future for PollN { - type Item = T; - type Error = E; - fn poll(&mut self) -> Poll { - self.polls += 1; - if self.polls == self.finish_at { - self.and_return - .take() - .expect("polled after ready") - .map(Async::Ready) - } else { - task::current().notify(); - Ok(Async::NotReady) - } - } - } - impl PollN<(), ()> { fn new_ok(finish_at: usize) -> Self { Self { @@ -201,95 +214,123 @@ mod tests { } } - #[test] - fn future_enter_exit_is_reasonable() { - let (subscriber, handle) = subscriber::mock() - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .drop_span(span::mock().named("foo")) - .done() - .run_with_handle(); - with_default(subscriber, || { - PollN::new_ok(2) - .instrument(span!(Level::TRACE, "foo")) - .wait() - .unwrap(); - }); - handle.assert_finished(); - } + #[cfg(feature = "futures-01")] + mod futures_tests { + extern crate tokio; + + use futures::{future, stream, task, Async, Future}; + use tracing::{subscriber::with_default, Level}; + + use super::*; + + impl futures::Future for PollN { + type Item = T; + type Error = E; + fn poll(&mut self) -> futures::Poll { + self.polls += 1; + if self.polls == self.finish_at { + self.and_return + .take() + .expect("polled after ready") + .map(Async::Ready) + } else { + task::current().notify(); + Ok(Async::NotReady) + } + } + } - #[test] - fn future_error_ends_span() { - let (subscriber, handle) = subscriber::mock() - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .drop_span(span::mock().named("foo")) - .done() - .run_with_handle(); - with_default(subscriber, || { - PollN::new_err(2) - .instrument(span!(Level::TRACE, "foo")) - .wait() - .unwrap_err(); - }); - - handle.assert_finished(); - } + #[test] + fn future_enter_exit_is_reasonable() { + let (subscriber, handle) = subscriber::mock() + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .drop_span(span::mock().named("foo")) + .done() + .run_with_handle(); + with_default(subscriber, || { + PollN::new_ok(2) + .instrument(span!(Level::TRACE, "foo")) + .wait() + .unwrap(); + }); + handle.assert_finished(); + } - #[test] - fn stream_enter_exit_is_reasonable() { - let (subscriber, handle) = subscriber::mock() - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .enter(span::mock().named("foo")) - .exit(span::mock().named("foo")) - .drop_span(span::mock().named("foo")) - .run_with_handle(); - with_default(subscriber, || { - stream::iter_ok::<_, ()>(&[1, 2, 3]) - .instrument(span!(Level::TRACE, "foo")) - .for_each(|_| future::ok(())) - .wait() - .unwrap(); - }); - handle.assert_finished(); - } + #[cfg(feature = "futures-01")] + #[test] + fn future_error_ends_span() { + let (subscriber, handle) = subscriber::mock() + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .drop_span(span::mock().named("foo")) + .done() + .run_with_handle(); + with_default(subscriber, || { + PollN::new_err(2) + .instrument(span!(Level::TRACE, "foo")) + .wait() + .unwrap_err(); + }); + + handle.assert_finished(); + } - #[test] - fn span_follows_future_onto_threadpool() { - let (subscriber, handle) = subscriber::mock() - .enter(span::mock().named("a")) - .enter(span::mock().named("b")) - .exit(span::mock().named("b")) - .enter(span::mock().named("b")) - .exit(span::mock().named("b")) - .drop_span(span::mock().named("b")) - .exit(span::mock().named("a")) - .drop_span(span::mock().named("a")) - .done() - .run_with_handle(); - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - with_default(subscriber, || { - span!(Level::TRACE, "a").in_scope(|| { - let future = PollN::new_ok(2) - .instrument(span!(Level::TRACE, "b")) - .map(|_| { - span!(Level::TRACE, "c").in_scope(|| { - // "c" happens _outside_ of the instrumented future's - // span, so we don't expect it. - }) - }); - runtime.block_on(Box::new(future)).unwrap(); - }) - }); - handle.assert_finished(); + #[test] + fn stream_enter_exit_is_reasonable() { + let (subscriber, handle) = subscriber::mock() + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .drop_span(span::mock().named("foo")) + .run_with_handle(); + with_default(subscriber, || { + stream::iter_ok::<_, ()>(&[1, 2, 3]) + .instrument(span!(Level::TRACE, "foo")) + .for_each(|_| future::ok(())) + .wait() + .unwrap(); + }); + handle.assert_finished(); + } + + #[test] + fn span_follows_future_onto_threadpool() { + let (subscriber, handle) = subscriber::mock() + .enter(span::mock().named("a")) + .enter(span::mock().named("b")) + .exit(span::mock().named("b")) + .enter(span::mock().named("b")) + .exit(span::mock().named("b")) + .drop_span(span::mock().named("b")) + .exit(span::mock().named("a")) + .drop_span(span::mock().named("a")) + .done() + .run_with_handle(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + with_default(subscriber, || { + span!(Level::TRACE, "a").in_scope(|| { + let future = PollN::new_ok(2) + .instrument(span!(Level::TRACE, "b")) + .map(|_| { + span!(Level::TRACE, "c").in_scope(|| { + // "c" happens _outside_ of the instrumented future's + // span, so we don't expect it. + }) + }); + runtime.block_on(Box::new(future)).unwrap(); + }) + }); + handle.assert_finished(); + } } } diff --git a/tracing-futures/test_std_future/Cargo.toml b/tracing-futures/test_std_future/Cargo.toml new file mode 100644 index 0000000000..7788cf5da4 --- /dev/null +++ b/tracing-futures/test_std_future/Cargo.toml @@ -0,0 +1,18 @@ +# Note: these tests depend on crates that use nightly features which do not +# run under stable and beta toolchains. +# +# Do not add these tests to the other tracing-futures tests unless the +# minimum Rust version is at least 1.36. +[workspace] + +[package] +name = "test_std_future" +version = "0.1.0" +publish = false +edition = "2018" + +[dependencies] +tokio-test = { git = "https://github.com/tokio-rs/tokio.git" } +tracing = "0.1" +tracing-core = "0.1.2" +tracing-futures = { path = "..", features = ["std-future"] } diff --git a/tracing-futures/test_std_future/tests/test.rs b/tracing-futures/test_std_future/tests/test.rs new file mode 100644 index 0000000000..e02c16fd32 --- /dev/null +++ b/tracing-futures/test_std_future/tests/test.rs @@ -0,0 +1,111 @@ +#[macro_use] +extern crate tracing; +extern crate tracing_core; + +pub use self::support as test_support; +// This has to have the same name as the module in `tracing`. +#[path = "../../../tracing/tests/support/mod.rs"] +pub mod support; + +use std::pin::Pin; +use std::task::Context; + +use support::*; +use tokio_test::task::MockTask; +use tracing::{subscriber::with_default, Level}; +use tracing_futures::Instrument; + +struct PollN { + and_return: Option>, + finish_at: usize, + polls: usize, +} + +impl std::future::Future for PollN +where + T: Unpin, + E: Unpin, +{ + type Output = Result; + fn poll(self: Pin<&mut Self>, lw: &mut Context) -> std::task::Poll { + let this = self.get_mut(); + + this.polls += 1; + if this.polls == this.finish_at { + let value = this.and_return.take().expect("polled after ready"); + + std::task::Poll::Ready(value) + } else { + lw.waker().wake_by_ref(); + std::task::Poll::Pending + } + } +} + +impl PollN<(), ()> { + fn new_ok(finish_at: usize) -> Self { + Self { + and_return: Some(Ok(())), + finish_at, + polls: 0, + } + } + + fn new_err(finish_at: usize) -> Self { + Self { + and_return: Some(Err(())), + finish_at, + polls: 0, + } + } +} + +fn block_on_future(task: &mut MockTask, future: F) -> F::Output +where + F: std::future::Future, +{ + let mut future = Box::pin(future); + + loop { + match task.poll(&mut future) { + std::task::Poll::Ready(v) => break v, + _ => {} + } + } +} + +#[test] +fn enter_exit_is_reasonable() { + let (subscriber, handle) = subscriber::mock() + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .drop_span(span::mock().named("foo")) + .done() + .run_with_handle(); + let mut task = MockTask::new(); + with_default(subscriber, || { + let future = PollN::new_ok(2).instrument(span!(Level::TRACE, "foo")); + block_on_future(&mut task, future).unwrap(); + }); + handle.assert_finished(); +} + +#[test] +fn error_ends_span() { + let (subscriber, handle) = subscriber::mock() + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .enter(span::mock().named("foo")) + .exit(span::mock().named("foo")) + .drop_span(span::mock().named("foo")) + .done() + .run_with_handle(); + let mut task = MockTask::new(); + with_default(subscriber, || { + let future = PollN::new_err(2).instrument(span!(Level::TRACE, "foo")); + block_on_future(&mut task, future).unwrap_err(); + }); + handle.assert_finished(); +}