From 9a4055eb79af9ffc81a7a07b0bdbffaf1982936e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 14 Nov 2024 10:04:07 -0500 Subject: [PATCH] Add example for using a separate threadpool for CPU bound work --- datafusion-examples/examples/thread_pools.rs | 167 ++++ datafusion/common-runtime/Cargo.toml | 4 +- .../common-runtime/src/dedicated_executor.rs | 917 ++++++++++++++++++ datafusion/common-runtime/src/lib.rs | 1 + 4 files changed, 1088 insertions(+), 1 deletion(-) create mode 100644 datafusion-examples/examples/thread_pools.rs create mode 100644 datafusion/common-runtime/src/dedicated_executor.rs diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs new file mode 100644 index 0000000000000..b680733209c67 --- /dev/null +++ b/datafusion-examples/examples/thread_pools.rs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::util::pretty::pretty_format_batches; +use datafusion::error::Result; +use datafusion::prelude::*; +use futures::stream::StreamExt; +use datafusion::execution::SendableRecordBatchStream; +//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to +//! run the CPU intensive parts of DataFusion plans. +//! +//! Running DataFusion plans that perform I/O, such as reading parquet files +//! directly from remote object storage (e.g. AWS S3) without care will result +//! in running CPU intensive jobs on the same thread pool, which can lead to the +//! issues described in the [Architecture section] such as throttled bandwidth +//! due to congestion control and increased latencies for processing network +//! messages. + + +/// Normally, you don't need to worry about the details of the tokio runtime, +/// but for this example it is important to understand how the [`Runtime`]s work. +/// +/// There is a "current" runtime that is installed in a thread local variable +/// that is used by the `tokio::spawn` function. +/// +/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it as +/// as the "current" runtime (on which any `async` futures, streams and tasks +/// are run). +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new() + // enabling URL table means we can select directly from + // paths in SQL queries. + .enable_url_table(); + let sql = format!( + "SELECT * FROM '{}/alltypes_plain.parquet'", + datafusion::test_util::parquet_test_data() + ); + + // Run the same query on the same runtime. Note that calling `await` here + // will effectively run the future (in this case the `async` function) on + // the current runtime. + same_runtime(&ctx, &sql).await?; + + // Run the same query on a different runtime. Note that we are still calling + // `await` here, so the the `async` function still runs on the current runtime. + // We use the `DedicatedExecutor` to run the query on a different runtime. + different_runtime(ctx, sql).await?; + Ok(()) +} + +/// Run queries directly on the current tokio `Runtime` +/// +/// This is now most examples in DataFusion are written and works well for +/// development and local query processing. +async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { + // Calling .sql is an async function as it may also do network + // I/O, for example to contact a remote catalog or do an object store LIST + let df = ctx.sql(sql).await?; + + // While many examples call `collect` or `show()`, those methods buffers the + // results. internally DataFusion generates output a RecordBatch at a time + + // Calling `execute_stream` on a DataFrame returns a + // `SendableRecordBatchStream`. Depending on the plan, this may also do + // network I/O, for example to begin reading a parquet file from a remote + // object store as well. It is also possible that this function call spawns + // tasks that begin doing CPU intensive work as well + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` drives the plan, producing new `RecordBatch`es using the + // current runtime (and typically also the current thread). + // + // Perhaps somewhat non obvious, calling the `next()` function often will + // result in other tasks being spawned on the current runtime (e.g. for + // `RepartitionExec` to read data from each of its input partitions in + // parallel). + // + // Executing the plan like this results in all CPU intensive work + // running on same (default) Runtime. + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) +} + +/// Demonstrates how to run queries on a **different** runtime than the current one +/// +/// This is typically how you should run DataFusion queries from a network +/// server or when processing data from a remote object store. +async fn different_runtime(ctx: SessionContext, sql: String) -> Result<()> { + // First, we need a new runtime, which we can create with the tokio builder + // however, since we are already in the context of another runtime + // (installed by #[tokio::main]) we create a new thread for the runtime + tokio::task::spawn_blocking(move || { + std::thread::spawn(move || thread_entry(ctx, sql.to_string())) + .join() + .expect("thread did not panic") + }) + .await + .expect("task did not panic") +} + +/// This is the entry point of thread that we started our second runtime on +fn thread_entry(ctx: SessionContext, sql: String) -> Result<()> { + let runtime = tokio::runtime::Builder::new_multi_thread() + // only enable the time driver (not the I/O driver), meaning this + // runtime will not be able to perform network I/O + .enable_time() + .build()?; + + // Now we can run the actual code we want on a different runtime + runtime.block_on(async move { different_runtime_inner(ctx, sql).await }) +} + +/// this is run on a new, separate runtime +async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> { + // Setup execution as before + let df = ctx.sql(&sql).await?; + + let mut stream = df.execute_stream().await?; + + //XXX Note at this point, calling next() will be run on our new threadpool. However, this will also spawn the catalog and object store requests on the same threadpool as well! + + // While this will mean we don't interfere with handling of other network requests, it will mean tht the network requests that happen as part of query processing will still be running on the same threadpool + + // TODO show this working + + // To avoid this, all IO access, both catalog and data (e.g. object_store) must be spawned on to their own runtime, like this: + // TODO.... + + // + // care is required to avoid calling `next()` (aka polling) from the default IO thread (even if planning / execution is run on that other thread) + // Best practice is to do all of DataFusion planning / execution on a separate pool. Note that some care is required for remote catalogs such as iceberg that + // themselves do network IO + // TODO figure out how to cause an erorr due to io thread + + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + + // + // You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you. + + // Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools. + + // Using separate Runtime will avoid other requests being messed up but it won't really help requests made from DataSources such as + // reading parquet files from object_store. + // + // Thus this runtime also disables IO so that we are sure there is no IO work being done on it. + + Ok(()) +} diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index a21c72cd9f839..8c3081fb6e073 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -38,6 +38,8 @@ path = "src/lib.rs" [dependencies] log = { workspace = true } tokio = { workspace = true } +parking_lot = { workspace = true } +futures = { workspace = true } [dev-dependencies] -tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] } +tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time", "net"] } diff --git a/datafusion/common-runtime/src/dedicated_executor.rs b/datafusion/common-runtime/src/dedicated_executor.rs new file mode 100644 index 0000000000000..a54f9183881f4 --- /dev/null +++ b/datafusion/common-runtime/src/dedicated_executor.rs @@ -0,0 +1,917 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. +//! +//! Originally from [InfluxDB 3.0] +//! +//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, TryFutureExt, +}; +use log::{info, warn}; +use parking_lot::RwLock; +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +impl From for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such +/// as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound +/// tasks on the same threadpool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// TODO add note about `io_thread` +/// +/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio +/// runtime will be maked for io, via [`register_io_runtime`] by all threads +/// spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the I/O +/// runtime. +/// +/// ## TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc>, +} + +impl DedicatedExecutor { + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # Notes + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// All spawned tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn(&self, task: T) -> impl Future> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// signals shutdown of this executor and any Clones + pub fn shutdown(&self) { + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.write(); + state.handle = None; + state.start_shutdown.notify_one(); + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { + self.shutdown(); + + // get handle mutex is held + let handle = { + let state = self.state.read(); + state.completed_shutdown.clone() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + handle.await.expect("Thread died?") + } + + /// Registers `handle` as the IO runtime for this thread + /// + /// This sets a thread-local variable + /// + /// See [`spawn_io`](Self::spawn_io) for more details + pub fn register_io_runtime(handle: Option) { + IO_RUNTIME.set(handle) + } + + /// Registers the "current" `handle` as the IO runtime for this thread + /// + /// This is useful for testing purposes. + /// + /// # Panics if no current handle is available (aka not running in a tokio + /// runtime) + pub fn register_current_runtime_for_io() { + Self::register_io_runtime(Some(Handle::current())) + } + + /// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, + /// otherwise awaits on the current thread + /// + /// # Panic + /// Needs a IO runtime [registered](register_io_runtime). + pub async fn spawn_io(fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect( + "No IO runtime registered. If you hit this panic, it likely \ + means a DataFusion plan or other CPU bound work is running on the \ + a tokio threadpool used for IO. Try spawning the work using \ + `DedicatedExecutor::spawn` or for tests `DedicatedExecutor::register_current_runtime_for_io`", + ); + DropGuard(h.spawn(fut)).await + } +} + +thread_local! { + /// Tokio runtime `Handle` for doing network (I/O) operations, see [`spawn_io`] + pub static IO_RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +struct DropGuard(JoinHandle); +impl Drop for DropGuard { + fn drop(&mut self) { + self.0.abort() + } +} + +impl Future for DropGuard { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match std::task::ready!(self.0.poll_unpin(cx)) { + Ok(v) => v, + Err(e) if e.is_cancelled() => panic!("IO runtime was shut down"), + Err(e) => std::panic::resume_unwind(e.into_panic()), + }) + } +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor. +/// +/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for +/// [`start_shutdown`](Self::start_shutdown) and signals the completion via +/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). +struct State { + /// Runtime handle. + /// + /// This is `None` when the executor is shutting down. + handle: Option, + + /// If notified, the executor tokio runtime will begin to shutdown. + /// + /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and + /// locking of the state. Just using a proper async signal is nicer. + start_shutdown: Arc, + + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +/// IMPORTANT: Implement `Drop` for [`State`], NOT for [`DedicatedExecutor`], +/// because the executor can be cloned and clones share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.handle.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.handle = None; + self.start_shutdown.notify_one(); + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() + && self.completed_shutdown.clone().now_or_never().is_none() + { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } +} + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Potential error returned when polling [`DedicatedExecutor::spawn`]. +#[derive(Debug)] +pub enum JobError { + WorkerGone, + Panic { msg: String }, +} + +impl Display for JobError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobError::WorkerGone => { + write!(f, "Worker thread gone, executor was likely shut down") + } + JobError::Panic { msg } => write!(f, "Panic: {}", msg), + } + } +} + +impl std::error::Error for JobError {} + +/// Builder for [`DedicatedExecutor`] +pub struct DedicatedExecutorBuilder { + /// Name given to all execution threads. Defaults to "DedicatedExecutor" + name: String, + /// Builder for tokio runtime. Defaults to multi-threaded builder + runtime_builder: Builder, +} + +impl DedicatedExecutorBuilder { + /// Create a new `DedicatedExecutorBuilder` with default values + /// + /// Note that by default this `D`edicatedExecutor` will not be able to perform + /// I/O. + pub fn new() -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder: Builder::new_multi_thread(), + } + } + + /// Create a new `DedicatedExecutorBuilder` from a pre-existing tokio + /// runtime [`Builder`]. + /// + /// This method permits customizing the tokio [`Executor`] used for the + /// [`DedicatedExecutor`] + pub fn new_from_builder(runtime_builder: Builder) -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder, + } + } + + /// Set the name of the dedicated executor (appear in the names of each thread). + /// + /// Defaults to "DedicatedExecutor" + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Set the number of worker threads. Defaults to the tokio default (the + /// number of virtual CPUs) + pub fn with_worker_threads(mut self, num_threads: usize) -> Self { + self.runtime_builder.worker_threads(num_threads); + self + } + + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// executor that is separate from the thread pool created via + /// `[tokio::main]` or similar. + /// + /// Note: If [`DedicatedExecutorBuilder::build`] is called from an existing + /// tokio runtime, it will assume that the existing runtime should be used + /// for I/O. + /// + /// See the documentation on [`DedicatedExecutor`] for more details. + fn build(self) -> DedicatedExecutor { + let Self { + name, + runtime_builder, + } = self; + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); + let (tx_handle, rx_handle) = std::sync::mpsc::channel(); + + let io_handle = Handle::try_current().ok(); + let thread = std::thread::Builder::new() + .name(format!("{name} driver")) + .spawn(move || { + // also register the IO runtime for the current thread, since it might be used as well (esp. for the + // current thread RT) + DedicatedExecutor::register_io_runtime(io_handle.clone()); + + info!("Creating DedicatedExecutor",); + + let mut runtime_builder = runtime_builder; + let runtime = runtime_builder + .on_thread_start(move || { + DedicatedExecutor::register_io_runtime(io_handle.clone()) + }) + .build() + .expect("Creating tokio runtime"); + + runtime.block_on(async move { + // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread + // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is + // started right after the constructor finishes and the new runtime calls + // `notify_shutdown_captured.notified().await`. + // + // Tokio provides an API for that by calling `enable` on the `notified` future (this requires + // pinning though). + let shutdown = notify_shutdown_captured.notified(); + let mut shutdown = std::pin::pin!(shutdown); + shutdown.as_mut().enable(); + + if tx_handle.send(Handle::current()).is_err() { + return; + } + shutdown.await; + }); + + runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + + // send shutdown "done" signal + tx_shutdown.send(()).ok(); + }) + .expect("executor setup"); + + let handle = rx_handle.recv().expect("driver started"); + + let state = State { + handle: Some(handle), + start_shutdown: notify_shutdown, + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), + }; + + DedicatedExecutor { + state: Arc::new(RwLock::new(state)), + } + } +} + +#[cfg(test)] +#[allow(unused_qualifications)] +mod tests { + use super::*; + use std::{ + panic::panic_any, + sync::{Arc, Barrier}, + time::Duration, + }; + use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier}; + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } + + fn exec() -> DedicatedExecutor { + exec_with_threads(1) + } + + fn exec2() -> DedicatedExecutor { + exec_with_threads(2) + } + + fn exec_with_threads(threads: usize) -> DedicatedExecutor { + let mut runtime_builder = Builder::new_multi_thread(); + runtime_builder.worker_threads(threads); + runtime_builder.enable_all(); + + DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build() + } + + async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { + let io_runtime_id = std::thread::current().id(); + dedicated + .spawn(async move { + let dedicated_id = std::thread::current().id(); + let spawned = + DedicatedExecutor::spawn_io( + async move { std::thread::current().id() }, + ) + .await; + + assert_ne!(dedicated_id, spawned); + assert_eq!(io_runtime_id, spawned); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn basic() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = exec(); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread (as this test is not using the + // 'multithreaded' version of the executor and the call to + // barrier.wait actually blocks the tokio thread) + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + // Run task on clone should work fine + let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_empty_exec() { + exec(); + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + + drop(exec.clone()); + + let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = exec(); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = exec2(); + let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = exec2(); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { 25usize }); + t1.await.unwrap() + }); + + // Validate the inner task ran to completion (aka it did not panic) + assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_str() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("At the disco, on the dedicated task scheduler"); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Panic: At the disco, on the dedicated task scheduler", + ); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_string() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("{} {}", 1, 2); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: 1 2",); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_other() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic_any(1) + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: unknown internal error",); + + exec.join().await; + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier_1 = Arc::new(Barrier::new(2)); + let captured_1 = Arc::clone(&barrier_1); + let barrier_2 = Arc::new(Barrier::new(2)); + let captured_2 = Arc::clone(&barrier_2); + + let exec = exec(); + let dedicated_task = exec.spawn(async move { + captured_1.wait(); + do_work(42, captured_2).await + }); + barrier_1.wait(); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier_2.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = exec(); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = exec(); + + // shutdown the clone (but not the exec) + exec.clone().join().await; + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_join() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + exec.join().await; + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = exec(); + // test it doesn't hang + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; + } + + #[tokio::test] + async fn drop_receiver() { + // create empty executor + let exec = exec(); + + // create first blocked task + let barrier1_pre = Arc::new(AsyncBarrier::new(2)); + let barrier1_pre_captured = Arc::clone(&barrier1_pre); + let barrier1_post = Arc::new(AsyncBarrier::new(2)); + let barrier1_post_captured = Arc::clone(&barrier1_post); + let dedicated_task1 = exec.spawn(async move { + barrier1_pre_captured.wait().await; + do_work_async(11, barrier1_post_captured).await + }); + barrier1_pre.wait().await; + + // create second blocked task + let barrier2_pre = Arc::new(AsyncBarrier::new(2)); + let barrier2_pre_captured = Arc::clone(&barrier2_pre); + let barrier2_post = Arc::new(AsyncBarrier::new(2)); + let barrier2_post_captured = Arc::clone(&barrier2_post); + let dedicated_task2 = exec.spawn(async move { + barrier2_pre_captured.wait().await; + do_work_async(22, barrier2_post_captured).await + }); + barrier2_pre.wait().await; + + // cancel task + drop(dedicated_task1); + + // cancelation might take a short while + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier1_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + // unblock other task + barrier2_post.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier2_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + exec.join().await; + } + + #[tokio::test] + async fn test_io_runtime_multi_thread() { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(1); + + let dedicated = DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_io_runtime_current_thread() { + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + + let dedicated = DedicatedExecutorBuilder::new_from_builder(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_that_default_executor_prevents_io() { + let exec = DedicatedExecutorBuilder::new().build(); + + let io_disabled = exec + .spawn(async move { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_err() + }) + .await + .unwrap(); + + assert!(io_disabled) + } + + #[tokio::test] + async fn test_happy_path() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let io_thread_id = rt_io + .spawn(async move { std::thread::current().id() }) + .await + .unwrap(); + let parent_thread_id = std::thread::current().id(); + assert_ne!(io_thread_id, parent_thread_id); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + let measured_thread_id = + DedicatedExecutor::spawn_io(async move { std::thread::current().id() }).await; + assert_eq!(measured_thread_id, io_thread_id); + + rt_io.shutdown_background(); + } + + #[tokio::test] + #[should_panic(expected = "IO runtime registered")] + async fn test_panic_if_no_runtime_registered() { + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } + + #[tokio::test] + #[should_panic(expected = "IO runtime was shut down")] + async fn test_io_runtime_down() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + tokio::task::spawn_blocking(move || { + rt_io.shutdown_timeout(Duration::from_secs(1)); + }) + .await + .unwrap(); + + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } +} diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index 51cb988ea06a3..8e104c27cdeb0 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -19,5 +19,6 @@ #![deny(clippy::clone_on_ref_ptr)] pub mod common; +pub mod dedicated_executor; pub use common::SpawnedTask;