Skip to content

Commit

Permalink
util: Add TokioContext future (#2791) (#2958)
Browse files Browse the repository at this point in the history
Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
Co-authored-by: Blas Rodriguez Irizar <rodrigblas@gmail.com>
  • Loading branch information
LucioFranco and blasrodri authored Oct 14, 2020
1 parent b69d4a6 commit 85d8029
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 9 deletions.
2 changes: 2 additions & 0 deletions tokio-macros/src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::unnecessary_lazy_evaluations)]

use proc_macro::TokenStream;
use quote::quote;
use std::num::NonZeroUsize;
Expand Down
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ full = ["codec", "udp", "compat"]
compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
rt = ["tokio/rt-core"]

[dependencies]
tokio = { version = "0.2.5", path = "../tokio" }
Expand Down
10 changes: 10 additions & 0 deletions tokio-util/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ macro_rules! cfg_compat {
}
}

macro_rules! cfg_rt {
($($item:item)*) => {
$(
#[cfg(feature = "rt")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
$item
)*
}
}

macro_rules! cfg_udp {
($($item:item)*) => {
$(
Expand Down
78 changes: 78 additions & 0 deletions tokio-util/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! Tokio context aware futures utilities.
//!
//! This module includes utilities around integrating tokio with other runtimes
//! by allowing the context to be attached to futures. This allows spawning
//! futures on other executors while still using tokio to drive them. This
//! can be useful if you need to use a tokio based library in an executor/runtime
//! that does not provide a tokio context.
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;

pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
#[pin]
inner: F,
handle: Handle,
}
}

impl<F: Future> Future for TokioContext<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let handle = me.handle;
let fut = me.inner;

handle.enter(|| fut.poll(cx))
}
}

/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
/// use tokio_util::context::HandleExt;
/// use tokio::time::{delay_for, Duration};
///
/// let mut rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
///
/// let rt2 = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .build().unwrap();
///
/// let fut = delay_for(Duration::from_millis(2));
///
/// rt.block_on(
/// rt2
/// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
}

impl HandleExt for Handle {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
TokioContext {
inner: fut,
handle: self.clone(),
}
}
}
4 changes: 4 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ cfg_udp! {
cfg_compat! {
pub mod compat;
}

cfg_rt! {
pub mod context;
}
29 changes: 29 additions & 0 deletions tokio-util/tests/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "rt")]

use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;

#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
.build()
.unwrap();
let rt2 = Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.unwrap();

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
}
4 changes: 3 additions & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
clippy::needless_doctest_main
clippy::needless_doctest_main,
clippy::match_like_matches_macro,
clippy::stable_sort_primitive
)]
#![warn(
missing_debug_implementations,
Expand Down
3 changes: 2 additions & 1 deletion tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(clippy::blacklisted_name)]
#![allow(clippy::blacklisted_name, clippy::stable_sort_primitive)]

use tokio::sync::{mpsc, oneshot};
use tokio::task;
use tokio_test::{assert_ok, assert_pending, assert_ready};
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(clippy::needless_range_loop)]
#![allow(clippy::needless_range_loop, clippy::stable_sort_primitive)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

Expand Down
10 changes: 6 additions & 4 deletions tokio/tests/stream_stream_map.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::stable_sort_primitive)]

use tokio::stream::{self, pending, Stream, StreamExt, StreamMap};
use tokio::sync::mpsc;
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
Expand Down Expand Up @@ -213,17 +215,17 @@ fn new_capacity_zero() {
let map = StreamMap::<&str, stream::Pending<()>>::new();
assert_eq!(0, map.capacity());

let keys = map.keys().collect::<Vec<_>>();
assert!(keys.is_empty());
let mut keys = map.keys();
assert!(keys.next().is_none());
}

#[test]
fn with_capacity() {
let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
assert!(10 <= map.capacity());

let keys = map.keys().collect::<Vec<_>>();
assert!(keys.is_empty());
let mut keys = map.keys();
assert!(keys.next().is_none());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/sync_broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(clippy::cognitive_complexity)]
#![allow(clippy::cognitive_complexity, clippy::match_like_matches_macro)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]

Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/time_delay_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(clippy::blacklisted_name)]
#![allow(clippy::blacklisted_name, clippy::stable_sort_primitive)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

Expand Down

0 comments on commit 85d8029

Please sign in to comment.