Skip to content

Commit

Permalink
examples: add futures executor threadpool (#3198)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosb1 authored Dec 22, 2020
1 parent 0b83b3b commit 7d28e4c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 1 deletion.
13 changes: 12 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ async-stream = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
bytes = { git = "https://github.com/tokio-rs/bytes" }
futures = "0.3.0"
futures = { version = "0.3.0", features = ["thread-pool"]}
http = "0.2"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
httparse = "1.0"
time = "0.1"
once_cell = "1.5.2"


[[example]]
name = "chat"
Expand Down Expand Up @@ -66,3 +68,12 @@ path = "udp-codec.rs"
[[example]]
name = "tinyhttp"
path = "tinyhttp.rs"

[[example]]
name = "custom-executor"
path = "custom-executor.rs"


[[example]]
name = "custom-executor-tokio-context"
path = "custom-executor-tokio-context.rs"
32 changes: 32 additions & 0 deletions examples/custom-executor-tokio-context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// This example shows how to use the tokio runtime with any other executor
//
//It takes advantage from RuntimeExt which provides the extension to customize your
//runtime.

use tokio::net::TcpListener;
use tokio::runtime::Builder;
use tokio::sync::oneshot;
use tokio_util::context::RuntimeExt;

fn main() {
let (tx, rx) = oneshot::channel();
let rt1 = Builder::new_multi_thread()
.worker_threads(1)
// no timer!
.build()
.unwrap();
let rt2 = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(rt2.wrap(async move {
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
println!("addr: {:?}", listener.local_addr());
tx.send(()).unwrap();
}));
futures::executor::block_on(rx).unwrap();
}
56 changes: 56 additions & 0 deletions examples/custom-executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// This example shows how to use the tokio runtime with any other executor
//
// The main components are a spawn fn that will wrap futures in a special future
// that will always enter the tokio context on poll. This only spawns one extra thread
// to manage and run the tokio drivers in the background.

use tokio::net::TcpListener;
use tokio::sync::oneshot;

fn main() {
let (tx, rx) = oneshot::channel();

my_custom_runtime::spawn(async move {
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();

println!("addr: {:?}", listener.local_addr());

tx.send(()).unwrap();
});

futures::executor::block_on(rx).unwrap();
}

mod my_custom_runtime {
use once_cell::sync::Lazy;
use std::future::Future;
use tokio_util::context::TokioContext;

pub fn spawn(f: impl Future<Output = ()> + Send + 'static) {
EXECUTOR.spawn(f);
}

struct ThreadPool {
inner: futures::executor::ThreadPool,
rt: tokio::runtime::Runtime,
}

static EXECUTOR: Lazy<ThreadPool> = Lazy::new(|| {
// Spawn tokio runtime on a single background thread
// enabling IO and timers.
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let inner = futures::executor::ThreadPool::builder().create().unwrap();

ThreadPool { inner, rt }
});

impl ThreadPool {
fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
let handle = self.rt.handle().clone();
self.inner.spawn_ok(TokioContext::new(f, handle));
}
}
}

0 comments on commit 7d28e4c

Please sign in to comment.