From 7d28e4cdbb63b0ad19e66d1b077690cbebf71353 Mon Sep 17 00:00:00 2001 From: Carlos B Date: Tue, 22 Dec 2020 20:08:43 +0100 Subject: [PATCH] examples: add futures executor threadpool (#3198) --- examples/Cargo.toml | 13 +++++- examples/custom-executor-tokio-context.rs | 32 +++++++++++++ examples/custom-executor.rs | 56 +++++++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 examples/custom-executor-tokio-context.rs create mode 100644 examples/custom-executor.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 2c1f7f9257c..233c83d037c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -15,13 +15,15 @@ async-stream = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] } bytes = { git = "https://github.com/tokio-rs/bytes" } -futures = "0.3.0" +futures = { version = "0.3.0", features = ["thread-pool"]} http = "0.2" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" httparse = "1.0" time = "0.1" +once_cell = "1.5.2" + [[example]] name = "chat" @@ -66,3 +68,12 @@ path = "udp-codec.rs" [[example]] name = "tinyhttp" path = "tinyhttp.rs" + +[[example]] +name = "custom-executor" +path = "custom-executor.rs" + + +[[example]] +name = "custom-executor-tokio-context" +path = "custom-executor-tokio-context.rs" diff --git a/examples/custom-executor-tokio-context.rs b/examples/custom-executor-tokio-context.rs new file mode 100644 index 00000000000..ae1cd2df2d5 --- /dev/null +++ b/examples/custom-executor-tokio-context.rs @@ -0,0 +1,32 @@ +// This example shows how to use the tokio runtime with any other executor +// +//It takes advantage from RuntimeExt which provides the extension to customize your +//runtime. + +use tokio::net::TcpListener; +use tokio::runtime::Builder; +use tokio::sync::oneshot; +use tokio_util::context::RuntimeExt; + +fn main() { + let (tx, rx) = oneshot::channel(); + let rt1 = Builder::new_multi_thread() + .worker_threads(1) + // no timer! + .build() + .unwrap(); + let rt2 = Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + // Without the `HandleExt.wrap()` there would be a panic because there is + // no timer running, since it would be referencing runtime r1. + let _ = rt1.block_on(rt2.wrap(async move { + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + println!("addr: {:?}", listener.local_addr()); + tx.send(()).unwrap(); + })); + futures::executor::block_on(rx).unwrap(); +} diff --git a/examples/custom-executor.rs b/examples/custom-executor.rs new file mode 100644 index 00000000000..48fdaf46b84 --- /dev/null +++ b/examples/custom-executor.rs @@ -0,0 +1,56 @@ +// This example shows how to use the tokio runtime with any other executor +// +// The main components are a spawn fn that will wrap futures in a special future +// that will always enter the tokio context on poll. This only spawns one extra thread +// to manage and run the tokio drivers in the background. + +use tokio::net::TcpListener; +use tokio::sync::oneshot; + +fn main() { + let (tx, rx) = oneshot::channel(); + + my_custom_runtime::spawn(async move { + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + + println!("addr: {:?}", listener.local_addr()); + + tx.send(()).unwrap(); + }); + + futures::executor::block_on(rx).unwrap(); +} + +mod my_custom_runtime { + use once_cell::sync::Lazy; + use std::future::Future; + use tokio_util::context::TokioContext; + + pub fn spawn(f: impl Future + Send + 'static) { + EXECUTOR.spawn(f); + } + + struct ThreadPool { + inner: futures::executor::ThreadPool, + rt: tokio::runtime::Runtime, + } + + static EXECUTOR: Lazy = Lazy::new(|| { + // Spawn tokio runtime on a single background thread + // enabling IO and timers. + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + let inner = futures::executor::ThreadPool::builder().create().unwrap(); + + ThreadPool { inner, rt } + }); + + impl ThreadPool { + fn spawn(&self, f: impl Future + Send + 'static) { + let handle = self.rt.handle().clone(); + self.inner.spawn_ok(TokioContext::new(f, handle)); + } + } +}