Skip to content

Commit

Permalink
futures: support std::futures::Future with !Unpin futures
Browse files Browse the repository at this point in the history
- Thank you Eliza Weisman (hawkw) for the solution using the pin-utils crate
- Added feature gates for the futures 0.1, tokio, and std::future implementations
  • Loading branch information
mbilker committed Jul 8, 2019
1 parent 63d8516 commit b720bd9
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 23 deletions.
7 changes: 5 additions & 2 deletions tracing-futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ authors = ["Eliza Weisman <eliza@buoyant.io>"]
edition = "2018"

[features]
default = ["tokio"]
default = ["futures-01", "tokio"]
futures-01 = ["futures"]
std-future = ["pin-utils"]

[dependencies]
futures = "0.1"
futures = { version = "0.1", optional = true }
pin-utils = { version = "0.1.0-alpha.4", optional = true }
tracing = "0.1"
tokio = { version = "0.1", optional = true }
tokio-executor = { version = "0.1", optional = true }
Expand Down
29 changes: 19 additions & 10 deletions tracing-futures/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::{Instrument, Instrumented, WithDispatch};
#[cfg(feature = "futures-01")]
use futures::{
future::{ExecuteError, Executor},
Future,
};

#[cfg(feature = "tokio")]
use tokio::{
executor::{Executor as TokioExecutor, SpawnError},
runtime::{current_thread, Runtime, TaskExecutor},
future::{ExecuteError, Executor},
Future,
};
#[cfg(feature = "futures-01")]
use tokio::executor::{Executor as TokioExecutor, SpawnError};
use tokio::runtime::{current_thread, Runtime, TaskExecutor};

#[cfg(feature = "futures-01")]
macro_rules! deinstrument_err {
($e:expr) => {
$e.map_err(|e| {
Expand All @@ -20,6 +19,7 @@ macro_rules! deinstrument_err {
};
}

#[cfg(feature = "futures-01")]
impl<T, F> Executor<F> for Instrumented<T>
where
T: Executor<Instrumented<F>>,
Expand All @@ -31,7 +31,7 @@ where
}
}

#[cfg(feature = "tokio")]
#[cfg(all(feature = "futures-01", feature = "tokio"))]
impl<T> TokioExecutor for Instrumented<T>
where
T: TokioExecutor,
Expand All @@ -56,6 +56,7 @@ impl Instrumented<Runtime> {
///
/// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
/// instrumenting the spawned future beforehand.
#[cfg(feature = "futures-01")]
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Item = (), Error = ()> + Send + 'static,
Expand All @@ -80,6 +81,7 @@ impl Instrumented<Runtime> {
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
#[cfg(feature = "futures-01")]
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Instrumented<current_thread::Runtime> {
///
/// This method simply wraps a call to `current_thread::Runtime::spawn`,
/// instrumenting the spawned future beforehand.
#[cfg(feature = "futures-01")]
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Item = (), Error = ()> + 'static,
Expand Down Expand Up @@ -141,6 +144,7 @@ impl Instrumented<current_thread::Runtime> {
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
#[cfg(feature = "futures-01")]
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: 'static + Future<Item = R, Error = E>,
Expand All @@ -165,6 +169,7 @@ impl Instrumented<current_thread::Runtime> {
}
}

#[cfg(feature = "futures-01")]
impl<T, F> Executor<F> for WithDispatch<T>
where
T: Executor<WithDispatch<F>>,
Expand All @@ -176,7 +181,7 @@ where
}
}

#[cfg(feature = "tokio")]
#[cfg(all(feature = "futures-01", feature = "tokio"))]
impl<T> TokioExecutor for WithDispatch<T>
where
T: TokioExecutor,
Expand All @@ -202,6 +207,7 @@ impl WithDispatch<Runtime> {
///
/// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
/// instrumenting the spawned future beforehand.
#[cfg(feature = "futures-01")]
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Item = (), Error = ()> + Send + 'static,
Expand All @@ -227,6 +233,7 @@ impl WithDispatch<Runtime> {
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
#[cfg(feature = "futures-01")]
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
Expand Down Expand Up @@ -257,6 +264,7 @@ impl WithDispatch<current_thread::Runtime> {
///
/// This method simply wraps a call to `current_thread::Runtime::spawn`,
/// instrumenting the spawned future beforehand.
#[cfg(feature = "futures-01")]
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Item = (), Error = ()> + 'static,
Expand Down Expand Up @@ -290,6 +298,7 @@ impl WithDispatch<current_thread::Runtime> {
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
#[cfg(feature = "futures-01")]
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: 'static + Future<Item = R, Error = E>,
Expand Down
47 changes: 36 additions & 11 deletions tracing-futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,37 @@
//!
//! [`Instrument`]: trait.Instrument.html
//! [`WithSubscriber`]: trait.WithSubscriber.html
#[cfg(feature = "futures-01")]
extern crate futures;
#[cfg(feature = "std-future")]
extern crate pin_utils;
#[cfg(feature = "tokio")]
extern crate tokio;
#[cfg(feature = "tokio-executor")]
extern crate tokio_executor;
#[cfg_attr(test, macro_use)]
extern crate tracing;

#[cfg(feature = "std-future")]
use std::{
pin::Pin,
task::Context,
};

#[cfg(feature = "futures-01")]
use futures::{Sink, StartSend, Stream};
use tracing::{dispatcher, Dispatch, Span};
#[cfg(feature = "futures-01")]
use tracing::dispatcher;
use tracing::{Dispatch, Span};

#[cfg(feature = "tokio")]
pub mod executor;

// TODO: seal?
pub trait Instrument: Sized {
fn instrument(self, span: Span) -> Instrumented<Self> {
Instrumented { inner: self, span }
}

fn boxed_instrument(self, span: Span) -> Instrumented<Pin<Box<Self>>> {
Instrumented { inner: Box::pin(self), span }
}
}

pub trait WithSubscriber: Sized {
Expand Down Expand Up @@ -69,16 +73,26 @@ pub struct WithDispatch<T> {

impl<T: Sized> Instrument for T {}

impl<P: std::future::Future + Unpin> std::future::Future for Instrumented<P> {
type Output = P::Output;
#[cfg(feature = "std-future")]
impl<T: std::future::Future> Instrumented<T> {
pin_utils::unsafe_pinned!(inner: T);
}

fn poll(self: Pin<&mut Self>, lw: &mut Context) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let _enter = this.span.enter();
Pin::new(&mut this.inner).poll(lw)
#[cfg(feature = "std-future")]
impl<T: std::future::Future> std::future::Future for Instrumented<T> {
type Output = T::Output;

fn poll(mut self: Pin<&mut Self>, lw: &mut Context) -> std::task::Poll<Self::Output> {
let span = self.as_ref().span.clone();
let _enter = span.enter();
self.as_mut().inner().poll(lw)
}
}

#[cfg(feature = "std-future")]
impl<T: Unpin> Unpin for Instrumented<T> {}

#[cfg(feature = "futures-01")]
impl<T: futures::Future> futures::Future for Instrumented<T> {
type Item = T::Item;
type Error = T::Error;
Expand All @@ -89,6 +103,7 @@ impl<T: futures::Future> futures::Future for Instrumented<T> {
}
}

#[cfg(feature = "futures-01")]
impl<T: Stream> Stream for Instrumented<T> {
type Item = T::Item;
type Error = T::Error;
Expand All @@ -99,6 +114,7 @@ impl<T: Stream> Stream for Instrumented<T> {
}
}

#[cfg(feature = "futures-01")]
impl<T: Sink> Sink for Instrumented<T> {
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
Expand Down Expand Up @@ -135,6 +151,7 @@ impl<T> Instrumented<T> {

impl<T: Sized> WithSubscriber for T {}

#[cfg(feature = "futures-01")]
impl<T: futures::Future> futures::Future for WithDispatch<T> {
type Item = T::Item;
type Error = T::Error;
Expand All @@ -146,6 +163,7 @@ impl<T: futures::Future> futures::Future for WithDispatch<T> {
}

impl<T> WithDispatch<T> {
#[cfg(feature = "tokio")]
pub(crate) fn with_dispatch<U: Sized>(&self, inner: U) -> WithDispatch<U> {
WithDispatch {
dispatch: self.dispatch.clone(),
Expand Down Expand Up @@ -176,6 +194,8 @@ mod tests {
extern crate tokio;

use super::{test_support::*, *};

#[cfg(feature = "futures-01")]
use futures::{future, stream, task, Async, Future};
use tracing::{subscriber::with_default, Level};

Expand All @@ -185,6 +205,7 @@ mod tests {
polls: usize,
}

#[cfg(feature = "futures-01")]
impl<T, E> futures::Future for PollN<T, E> {
type Item = T;
type Error = E;
Expand Down Expand Up @@ -220,6 +241,7 @@ mod tests {
}
}

#[cfg(feature = "futures-01")]
#[test]
fn future_enter_exit_is_reasonable() {
let (subscriber, handle) = subscriber::mock()
Expand All @@ -239,6 +261,7 @@ mod tests {
handle.assert_finished();
}

#[cfg(feature = "futures-01")]
#[test]
fn future_error_ends_span() {
let (subscriber, handle) = subscriber::mock()
Expand All @@ -259,6 +282,7 @@ mod tests {
handle.assert_finished();
}

#[cfg(feature = "futures-01")]
#[test]
fn stream_enter_exit_is_reasonable() {
let (subscriber, handle) = subscriber::mock()
Expand All @@ -282,6 +306,7 @@ mod tests {
handle.assert_finished();
}

#[cfg(feature = "futures-01")]
#[test]
fn span_follows_future_onto_threadpool() {
let (subscriber, handle) = subscriber::mock()
Expand Down

0 comments on commit b720bd9

Please sign in to comment.