From c24f4b1e3207825e048f723e7ed8995e0484b489 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 22 Nov 2024 11:58:46 -0500 Subject: [PATCH] Add example for using a separate threadpool for CPU bound work --- datafusion-examples/examples/thread_pools.rs | 206 ++++ datafusion/common-runtime/Cargo.toml | 7 +- .../common-runtime/src/dedicated_executor.rs | 929 +++++++++++++++++ datafusion/common-runtime/src/lib.rs | 2 + datafusion/physical-plan/Cargo.toml | 4 + .../physical-plan/src/cross_rt_stream.rs | 408 ++++++++ .../physical-plan/src/dedicated_executor.rs | 956 ++++++++++++++++++ .../physical-plan/src/io_object_store.rs | 124 +++ datafusion/physical-plan/src/lib.rs | 5 + 9 files changed, 2640 insertions(+), 1 deletion(-) create mode 100644 datafusion-examples/examples/thread_pools.rs create mode 100644 datafusion/common-runtime/src/dedicated_executor.rs create mode 100644 datafusion/physical-plan/src/cross_rt_stream.rs create mode 100644 datafusion/physical-plan/src/dedicated_executor.rs create mode 100644 datafusion/physical-plan/src/io_object_store.rs diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs new file mode 100644 index 0000000000000..df17c9e95c3cb --- /dev/null +++ b/datafusion-examples/examples/thread_pools.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to +//! run the CPU intensive parts of DataFusion plans. +//! +//! Running DataFusion plans that perform I/O, such as reading parquet files +//! directly from remote object storage (e.g. AWS S3) without care will result +//! in running CPU intensive jobs on the same thread pool, which can lead to the +//! issues described in the [Architecture section] such as throttled bandwidth +//! due to congestion control and increased latencies for processing network +//! messages. + +use arrow::util::pretty::pretty_format_batches; +use datafusion::error::Result; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::DedicatedExecutor; +use datafusion::prelude::*; +use futures::stream::StreamExt; +use object_store::http::HttpBuilder; +use object_store::ObjectStore; +use std::sync::Arc; +use url::Url; + +/// Normally, you don't need to worry about the details of the tokio runtime, +/// but for this example it is important to understand how the [`Runtime`]s work. +/// +/// There is a "current" runtime that is installed in a thread local variable +/// that is used by the `tokio::spawn` function. +/// +/// The `#[tokio::main]` macro actually creates a [`Runtime`] and installs it as +/// as the "current" runtime (on which any `async` futures, streams and tasks +/// are run). +#[tokio::main] +async fn main() -> Result<()> { + // The first two examples only do local file IO. Enable the URL table so we + // can select directly from filenames in SQL. + let ctx = SessionContext::new().enable_url_table(); + let sql = format!( + "SELECT * FROM '{}/alltypes_plain.parquet'", + datafusion::test_util::parquet_test_data() + ); + + // Run the same query on the same runtime. Note that calling `await` here + // will effectively run the future (in this case the `async` function) on + // the current runtime. + same_runtime(&ctx, &sql).await?; + + // Run the same query on a different runtime. Note that we are still calling + // `await` here, so the the `async` function still runs on the current runtime. + // We use the `DedicatedExecutor` to run the query on a different runtime. + different_runtime_basic(ctx, sql).await?; + + // Run the same query on a different runtime including remote IO + different_runtime_advanced().await?; + + Ok(()) +} + +/// Run queries directly on the current tokio `Runtime` +/// +/// This is now most examples in DataFusion are written and works well for +/// development and local query processing. +async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> { + // Calling .sql is an async function as it may also do network + // I/O, for example to contact a remote catalog or do an object store LIST + let df = ctx.sql(sql).await?; + + // While many examples call `collect` or `show()`, those methods buffers the + // results. internally DataFusion generates output a RecordBatch at a time + + // Calling `execute_stream` on a DataFrame returns a + // `SendableRecordBatchStream`. Depending on the plan, this may also do + // network I/O, for example to begin reading a parquet file from a remote + // object store as well. It is also possible that this function call spawns + // tasks that begin doing CPU intensive work as well + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` drives the plan, producing new `RecordBatch`es using the + // current runtime (and typically also the current thread). + // + // Perhaps somewhat non obvious, calling the `next()` function often will + // result in other tasks being spawned on the current runtime (e.g. for + // `RepartitionExec` to read data from each of its input partitions in + // parallel). + // + // Executing the plan like this results in all CPU intensive work + // running on same (default) Runtime. + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) +} + +/// Demonstrates how to run queries on a **different** runtime than the current one +/// +/// See [`different_runtime_advanced`] to see how you should run DataFusion +/// queries from a network server or when processing data from a remote object +/// store. +async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> { + // First, we need a new runtime, which we can create with the tokio builder + // however, since we are already in the context of another runtime + // (installed by #[tokio::main]) we create a new thread for the runtime + let dedicated_executor = DedicatedExecutor::builder().build(); + + // Now, we can simply run the query on the new runtime + dedicated_executor + .spawn(async move { + // this runs on the different threadpool + let df = ctx.sql(&sql).await?; + let mut stream: SendableRecordBatchStream = df.execute_stream().await?; + + // Calling `next()` to drive the plan on the different threadpool + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + Ok(()) as Result<()> + }) + // even though we are `await`ing here on the "current" pool, internally + // the DedicatedExecutor runs the work on the separate threadpool pool + // and the `await` simply notifies when the work is done that the work is done + .await??; + + // When done with a DedicatedExecutor, it should be shut down cleanly to give + // any outstanding tasks a chance to clean up + dedicated_executor.join().await; + + Ok(()) +} + +/// Demonstrates how to run queries on a different runtime than the current run +/// and how to handle IO operations. +/// + +async fn different_runtime_advanced() -> Result<()> { + // In this example, we will configure access to a remote object store + // over the network during the plan + + let ctx = SessionContext::new().enable_url_table(); + + // setup http object store + let base_url = Url::parse("https://github.com").unwrap(); + let http_store: Arc = + Arc::new(HttpBuilder::new().with_url(base_url.clone()).build()?); + + let dedicated_executor = DedicatedExecutor::builder().build(); + + // By default, the object store will use the current runtime for IO operations + // if we use a dedicated executor to run the plan, the eventual object store requests will also use the + // dedicated executor's runtime + // + // To avoid this, we can wrap the object store to run on the "IO" runtime + // + // (if we don't do this the example fails with an error like + // + // ctx.register_object_store(&base_url, http_store); + // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers. + + let http_store = DedicatedExecutor::wrap_object_store(http_store); + + ctx.register_object_store(&base_url, http_store); + + // Plan (and execute) the query on the dedicated runtime + let mut stream = dedicated_executor + .spawn(async move { + // Plan / execute the query + let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; + let df = ctx + .sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5")) + .await?; + let stream: SendableRecordBatchStream = df.execute_stream().await?; + + Ok(stream) as Result + }).await??; + + // We have now planned the query on the dedicated runtime, but we still need to + // drive the stream (aka call `next()` to get the results. + + // as mentioned above, calling `next()` (including indirectly by using + // FlightDataEncoder to convert the results to flight to send it over the + // network), will *still* result in the CPU work (and a bunch of spawned + // tasks) being done on the runtime calling next() (aka the current runtime) + // and not on the dedicated runtime. + + // to drive the stream on the dedicated runtime, we need to wrap it using a XXX stream function + + while let Some(batch) = stream.next().await { + println!("{}", pretty_format_batches(&[batch?]).unwrap()); + } + + Ok(()) +} diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index a21c72cd9f839..f182f0adc87f6 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -36,8 +36,13 @@ name = "datafusion_common_runtime" path = "src/lib.rs" [dependencies] +async-trait = { workspace = true } +object_store = { workspace = true } log = { workspace = true } tokio = { workspace = true } +parking_lot = { workspace = true } +futures = { workspace = true } +datafusion-common = { workspace = true } [dev-dependencies] -tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] } +tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time", "net"] } diff --git a/datafusion/common-runtime/src/dedicated_executor.rs b/datafusion/common-runtime/src/dedicated_executor.rs new file mode 100644 index 0000000000000..e90bba513d58a --- /dev/null +++ b/datafusion/common-runtime/src/dedicated_executor.rs @@ -0,0 +1,929 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. +//! +//! Originally from [InfluxDB 3.0] +//! +//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +use datafusion_common::DataFusionError; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, TryFutureExt, +}; +use log::{info, warn}; +use parking_lot::RwLock; +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +impl From for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such +/// as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound +/// tasks on the same threadpool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// TODO add note about `io_thread` +/// +/// TODO: things we use in InfluxData +/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we can wrap DedicatedExectors like IOxDedicatedExecutors +/// 2. Some sort of hook to install tokio metrics +/// +/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio +/// runtime will be maked for io, via [`register_io_runtime`] by all threads +/// spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the I/O +/// runtime. +/// +/// ## TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc>, +} + +impl DedicatedExecutor { + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # Notes + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// All spawned tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn(&self, task: T) -> impl Future> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// signals shutdown of this executor and any Clones + pub fn shutdown(&self) { + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.write(); + state.handle = None; + state.start_shutdown.notify_one(); + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { + self.shutdown(); + + // get handle mutex is held + let handle = { + let state = self.state.read(); + state.completed_shutdown.clone() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + handle.await.expect("Thread died?") + } + + /// Registers `handle` as the IO runtime for this thread + /// + /// This sets a thread-local variable + /// + /// See [`spawn_io`](Self::spawn_io) for more details + pub fn register_io_runtime(handle: Option) { + IO_RUNTIME.set(handle) + } + + /// Registers the "current" `handle` as the IO runtime for this thread + /// + /// This is useful for testing purposes. + /// + /// # Panics if no current handle is available (aka not running in a tokio + /// runtime) + pub fn register_current_runtime_for_io() { + Self::register_io_runtime(Some(Handle::current())) + } + + /// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, + /// otherwise awaits on the current thread + /// + /// # Panic + /// Needs a IO runtime [registered](register_io_runtime). + pub async fn spawn_io(fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect( + "No IO runtime registered. If you hit this panic, it likely \ + means a DataFusion plan or other CPU bound work is running on the \ + a tokio threadpool used for IO. Try spawning the work using \ + `DedicatedExecutor::spawn` or for tests `DedicatedExecutor::register_current_runtime_for_io`", + ); + DropGuard(h.spawn(fut)).await + } +} + +thread_local! { + /// Tokio runtime `Handle` for doing network (I/O) operations, see [`spawn_io`] + pub static IO_RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +struct DropGuard(JoinHandle); +impl Drop for DropGuard { + fn drop(&mut self) { + self.0.abort() + } +} + +impl Future for DropGuard { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match std::task::ready!(self.0.poll_unpin(cx)) { + Ok(v) => v, + Err(e) if e.is_cancelled() => panic!("IO runtime was shut down"), + Err(e) => std::panic::resume_unwind(e.into_panic()), + }) + } +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor. +/// +/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for +/// [`start_shutdown`](Self::start_shutdown) and signals the completion via +/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). +struct State { + /// Runtime handle. + /// + /// This is `None` when the executor is shutting down. + handle: Option, + + /// If notified, the executor tokio runtime will begin to shutdown. + /// + /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and + /// locking of the state. Just using a proper async signal is nicer. + start_shutdown: Arc, + + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +/// IMPORTANT: Implement `Drop` for [`State`], NOT for [`DedicatedExecutor`], +/// because the executor can be cloned and clones share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.handle.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.handle = None; + self.start_shutdown.notify_one(); + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() + && self.completed_shutdown.clone().now_or_never().is_none() + { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } +} + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Potential error returned when polling [`DedicatedExecutor::spawn`]. +#[derive(Debug)] +pub enum JobError { + WorkerGone, + Panic { msg: String }, +} + +impl Display for JobError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobError::WorkerGone => { + write!(f, "Worker thread gone, executor was likely shut down") + } + JobError::Panic { msg } => write!(f, "Panic: {}", msg), + } + } +} + +impl std::error::Error for JobError {} + +/// Builder for [`DedicatedExecutor`] +pub struct DedicatedExecutorBuilder { + /// Name given to all execution threads. Defaults to "DedicatedExecutor" + name: String, + /// Builder for tokio runtime. Defaults to multi-threaded builder + runtime_builder: Builder, +} + +impl From for DataFusionError { + fn from(value: JobError) -> Self { + DataFusionError::External(Box::new(value)) + .context("JobError from DedicatedExecutor") + } +} + +impl DedicatedExecutorBuilder { + /// Create a new `DedicatedExecutorBuilder` with default values + /// + /// Note that by default this `DedicatedExecutor` will not be able to + /// perform network I/O. + pub fn new() -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder: Builder::new_multi_thread(), + } + } + + /// Create a new `DedicatedExecutorBuilder` from a pre-existing tokio + /// runtime [`Builder`]. + /// + /// This method permits customizing the tokio [`Executor`] used for the + /// [`DedicatedExecutor`] + pub fn new_from_builder(runtime_builder: Builder) -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder, + } + } + + /// Set the name of the dedicated executor (appear in the names of each thread). + /// + /// Defaults to "DedicatedExecutor" + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Set the number of worker threads. Defaults to the tokio default (the + /// number of virtual CPUs) + pub fn with_worker_threads(mut self, num_threads: usize) -> Self { + self.runtime_builder.worker_threads(num_threads); + self + } + + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// executor that is separate from the thread pool created via + /// `[tokio::main]` or similar. + /// + /// Note: If [`DedicatedExecutorBuilder::build`] is called from an existing + /// tokio runtime, it will assume that the existing runtime should be used + /// for I/O. + /// + /// See the documentation on [`DedicatedExecutor`] for more details. + pub fn build(self) -> DedicatedExecutor { + let Self { + name, + runtime_builder, + } = self; + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); + let (tx_handle, rx_handle) = std::sync::mpsc::channel(); + + let io_handle = Handle::try_current().ok(); + let thread = std::thread::Builder::new() + .name(format!("{name} driver")) + .spawn(move || { + // also register the IO runtime for the current thread, since it might be used as well (esp. for the + // current thread RT) + DedicatedExecutor::register_io_runtime(io_handle.clone()); + + info!("Creating DedicatedExecutor",); + + let mut runtime_builder = runtime_builder; + let runtime = runtime_builder + .on_thread_start(move || { + DedicatedExecutor::register_io_runtime(io_handle.clone()) + }) + .build() + .expect("Creating tokio runtime"); + + runtime.block_on(async move { + // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread + // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is + // started right after the constructor finishes and the new runtime calls + // `notify_shutdown_captured.notified().await`. + // + // Tokio provides an API for that by calling `enable` on the `notified` future (this requires + // pinning though). + let shutdown = notify_shutdown_captured.notified(); + let mut shutdown = std::pin::pin!(shutdown); + shutdown.as_mut().enable(); + + if tx_handle.send(Handle::current()).is_err() { + return; + } + shutdown.await; + }); + + runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + + // send shutdown "done" signal + tx_shutdown.send(()).ok(); + }) + .expect("executor setup"); + + let handle = rx_handle.recv().expect("driver started"); + + let state = State { + handle: Some(handle), + start_shutdown: notify_shutdown, + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), + }; + + DedicatedExecutor { + state: Arc::new(RwLock::new(state)), + } + } +} + +#[cfg(test)] +#[allow(unused_qualifications)] +mod tests { + use super::*; + use std::{ + panic::panic_any, + sync::{Arc, Barrier}, + time::Duration, + }; + use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier}; + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } + + fn exec() -> DedicatedExecutor { + exec_with_threads(1) + } + + fn exec2() -> DedicatedExecutor { + exec_with_threads(2) + } + + fn exec_with_threads(threads: usize) -> DedicatedExecutor { + let mut runtime_builder = Builder::new_multi_thread(); + runtime_builder.worker_threads(threads); + runtime_builder.enable_all(); + + DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build() + } + + async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { + let io_runtime_id = std::thread::current().id(); + dedicated + .spawn(async move { + let dedicated_id = std::thread::current().id(); + let spawned = + DedicatedExecutor::spawn_io( + async move { std::thread::current().id() }, + ) + .await; + + assert_ne!(dedicated_id, spawned); + assert_eq!(io_runtime_id, spawned); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn basic() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = exec(); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread (as this test is not using the + // 'multithreaded' version of the executor and the call to + // barrier.wait actually blocks the tokio thread) + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + // Run task on clone should work fine + let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_empty_exec() { + exec(); + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + + drop(exec.clone()); + + let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = exec(); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = exec2(); + let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = exec2(); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { 25usize }); + t1.await.unwrap() + }); + + // Validate the inner task ran to completion (aka it did not panic) + assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_str() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("At the disco, on the dedicated task scheduler"); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Panic: At the disco, on the dedicated task scheduler", + ); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_string() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("{} {}", 1, 2); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: 1 2",); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_other() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic_any(1) + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: unknown internal error",); + + exec.join().await; + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier_1 = Arc::new(Barrier::new(2)); + let captured_1 = Arc::clone(&barrier_1); + let barrier_2 = Arc::new(Barrier::new(2)); + let captured_2 = Arc::clone(&barrier_2); + + let exec = exec(); + let dedicated_task = exec.spawn(async move { + captured_1.wait(); + do_work(42, captured_2).await + }); + barrier_1.wait(); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier_2.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = exec(); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = exec(); + + // shutdown the clone (but not the exec) + exec.clone().join().await; + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_join() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + exec.join().await; + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = exec(); + // test it doesn't hang + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; + } + + #[tokio::test] + async fn drop_receiver() { + // create empty executor + let exec = exec(); + + // create first blocked task + let barrier1_pre = Arc::new(AsyncBarrier::new(2)); + let barrier1_pre_captured = Arc::clone(&barrier1_pre); + let barrier1_post = Arc::new(AsyncBarrier::new(2)); + let barrier1_post_captured = Arc::clone(&barrier1_post); + let dedicated_task1 = exec.spawn(async move { + barrier1_pre_captured.wait().await; + do_work_async(11, barrier1_post_captured).await + }); + barrier1_pre.wait().await; + + // create second blocked task + let barrier2_pre = Arc::new(AsyncBarrier::new(2)); + let barrier2_pre_captured = Arc::clone(&barrier2_pre); + let barrier2_post = Arc::new(AsyncBarrier::new(2)); + let barrier2_post_captured = Arc::clone(&barrier2_post); + let dedicated_task2 = exec.spawn(async move { + barrier2_pre_captured.wait().await; + do_work_async(22, barrier2_post_captured).await + }); + barrier2_pre.wait().await; + + // cancel task + drop(dedicated_task1); + + // cancelation might take a short while + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier1_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + // unblock other task + barrier2_post.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier2_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + exec.join().await; + } + + #[tokio::test] + async fn test_io_runtime_multi_thread() { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(1); + + let dedicated = DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_io_runtime_current_thread() { + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + + let dedicated = DedicatedExecutorBuilder::new_from_builder(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_that_default_executor_prevents_io() { + let exec = DedicatedExecutorBuilder::new().build(); + + let io_disabled = exec + .spawn(async move { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_err() + }) + .await + .unwrap(); + + assert!(io_disabled) + } + + #[tokio::test] + async fn test_happy_path() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let io_thread_id = rt_io + .spawn(async move { std::thread::current().id() }) + .await + .unwrap(); + let parent_thread_id = std::thread::current().id(); + assert_ne!(io_thread_id, parent_thread_id); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + let measured_thread_id = + DedicatedExecutor::spawn_io(async move { std::thread::current().id() }).await; + assert_eq!(measured_thread_id, io_thread_id); + + rt_io.shutdown_background(); + } + + #[tokio::test] + #[should_panic(expected = "IO runtime registered")] + async fn test_panic_if_no_runtime_registered() { + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } + + #[tokio::test] + #[should_panic(expected = "IO runtime was shut down")] + async fn test_io_runtime_down() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + tokio::task::spawn_blocking(move || { + rt_io.shutdown_timeout(Duration::from_secs(1)); + }) + .await + .unwrap(); + + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } +} diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index 51cb988ea06a3..5fdee9bbf5050 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -19,5 +19,7 @@ #![deny(clippy::clone_on_ref_ptr)] pub mod common; +pub mod dedicated_executor; pub use common::SpawnedTask; +pub use dedicated_executor::{DedicatedExecutor, DedicatedExecutorBuilder}; diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 64fd0f49a2338..76226c5341b7e 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -66,6 +66,10 @@ parking_lot = { workspace = true } pin-project-lite = "^0.2.7" rand = { workspace = true } tokio = { workspace = true } +# todo figure out if we need to use tokio_stream / could use record batch receiver stream +tokio-stream = {version = "0.1"} +object_store = { workspace = true } + [dev-dependencies] criterion = { version = "0.5", features = ["async_futures"] } diff --git a/datafusion/physical-plan/src/cross_rt_stream.rs b/datafusion/physical-plan/src/cross_rt_stream.rs new file mode 100644 index 0000000000000..94ab35adfc7be --- /dev/null +++ b/datafusion/physical-plan/src/cross_rt_stream.rs @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [CrossRtStream] runs [`Stream`]s in a different tokio runtime. + +//! Tooling to pull [`Stream`]s from one tokio runtime into another. +//! +//! Originally from [InfluxDB 3.0] +//! [InfluxDB 3.0]:https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/iox_query/src/exec/cross_rt_stream.rs#L1 +//! +//! This is critical so that CPU heavy loads are not run on the same runtime as IO handling + +// TODO: figure out where ot pull this code (not in physical plan...) +// maybe its own crate or maybe in common-runtime ?? + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::dedicated_executor::JobError; +use crate::DedicatedExecutor; +use datafusion_common::DataFusionError; +use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::ReceiverStream; + +/// [`Stream`] that is calculated by one tokio runtime but can safely be pulled +/// from another w/o stalling (esp. when the calculating runtime is +/// CPU-blocked). +/// +/// See XXX in the architecture documentation for moe details +pub struct CrossRtStream { + /// Future that drives the underlying stream. + /// + /// This is actually wrapped into [`DedicatedExecutor::spawn`] so it can be safely polled by the receiving runtime. + driver: BoxFuture<'static, ()>, + + /// Flags if the [driver](Self::driver) returned [`Poll::Ready`]. + driver_ready: bool, + + /// Receiving stream. + /// + /// This one can be polled from the receiving runtime. + inner: ReceiverStream, + + /// Signals that [`inner`](Self::inner) finished. + /// + /// Note that we must also drive the [driver](Self::driver) even when the stream finished to allow proper state clean-ups. + inner_done: bool, +} + +impl std::fmt::Debug for CrossRtStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CrossRtStream") + .field("driver", &"...") + .field("driver_ready", &self.driver_ready) + .field("inner", &"...") + .field("inner_done", &self.inner_done) + .finish() + } +} + +impl CrossRtStream { + /// Create new stream by producing a future that sends its state to the given [`Sender`]. + /// + /// This is an internal method. `f` should always be wrapped into [`DedicatedExecutor::spawn`] (except for testing purposes). + fn new_with_tx(f: F) -> Self + where + F: FnOnce(Sender) -> Fut, + Fut: Future + Send + 'static, + { + let (tx, rx) = channel(1); + let driver = f(tx).boxed(); + Self { + driver, + driver_ready: false, + inner: ReceiverStream::new(rx), + inner_done: false, + } + } +} + +impl CrossRtStream> +where + X: Send + 'static, + E: Send + 'static, +{ + /// Create new stream based on an existing stream that transports [`Result`]s. + /// + /// Also receives an executor that actually executes the underlying stream as well as a converter that convets + /// [`executor::JobError`] to the error type of the stream (so we can send potential crashes/panics). + pub fn new_with_error_stream( + stream: S, + exec: DedicatedExecutor, + converter: C, + ) -> Self + where + S: Stream> + Send + 'static, + C: Fn(JobError) -> E + Send + 'static, + { + Self::new_with_tx(|tx| { + // future to be run in the other runtime + let tx_captured = tx.clone(); + let fut = async move { + tokio::pin!(stream); + + while let Some(res) = stream.next().await { + if tx_captured.send(res).await.is_err() { + // receiver gone + return; + } + } + }; + + // future for this runtime (likely the tokio/tonic/web driver) + async move { + if let Err(e) = exec.spawn(fut).await { + let e = converter(e); + + // last message, so we don't care about the receiver side + tx.send(Err(e)).await.ok(); + } + } + }) + } +} + +impl CrossRtStream> +where + X: Send + 'static, +{ + /// Create new stream based on an existing stream that transports [`Result`]s w/ [`DataFusionError`]s. + /// + /// Also receives an executor that actually executes the underlying stream. + pub fn new_with_df_error_stream(stream: S, exec: DedicatedExecutor) -> Self + where + S: Stream> + Send + 'static, + { + Self::new_with_error_stream(stream, exec, |e| { + DataFusionError::Context( + "Join Error (panic)".to_string(), + Box::new(DataFusionError::External(e.into())), + ) + }) + } +} + +impl Stream for CrossRtStream { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + + if !this.driver_ready { + let res = this.driver.poll_unpin(cx); + + if res.is_ready() { + this.driver_ready = true; + } + } + + if this.inner_done { + if this.driver_ready { + Poll::Ready(None) + } else { + Poll::Pending + } + } else { + match ready!(this.inner.poll_next_unpin(cx)) { + None => { + this.inner_done = true; + if this.driver_ready { + Poll::Ready(None) + } else { + Poll::Pending + } + } + Some(x) => Poll::Ready(Some(x)), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::DedicatedExecutorBuilder; + use datafusion_execution::dedicated_executor::JobError; + use std::sync::OnceLock; + use std::{sync::Arc, time::Duration}; + use tokio::runtime::{Handle, RuntimeFlavor}; + + // Don't create many different runtimes for testing to avoid thread creation/description overhead + fn testing_executor() -> DedicatedExecutor { + TESTING_EXECUTOR + .get_or_init(|| { + DedicatedExecutorBuilder::new() + .with_name("cross_rt_stream") + .build() + }) + .clone() + } + static TESTING_EXECUTOR: OnceLock = OnceLock::new(); + + #[tokio::test] + async fn test_async_block() { + let exec = testing_executor(); + let barrier1 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait().await; + barrier2_captured.wait().await; + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + ensure_pending(&mut f).await; + barrier1.wait().await; + ensure_pending(&mut f).await; + barrier2.wait().await; + + let res = f.await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_sync_block() { + // This would deadlock if the stream payload would run within the same tokio runtime. To prevent any cheating + // (e.g. via channels), we ensure that the current runtime only has a single thread: + assert_eq!( + RuntimeFlavor::CurrentThread, + Handle::current().runtime_flavor() + ); + + let exec = testing_executor(); + let barrier1 = Arc::new(std::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(std::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait(); + barrier2_captured.wait(); + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + ensure_pending(&mut f).await; + barrier1.wait(); + ensure_pending(&mut f).await; + barrier2.wait(); + + let res = f.await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_panic() { + let exec = testing_executor(); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async { panic!("foo") }), + exec, + std::convert::identity, + ); + + let e = stream + .next() + .await + .expect("stream not finished") + .unwrap_err(); + assert_eq!(e.to_string(), "Panic: foo"); + + let none = stream.next().await; + assert!(none.is_none()); + } + + #[tokio::test] + async fn test_cancel_future() { + let exec = testing_executor; + let barrier1 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier1_captured = Arc::clone(&barrier1); + let barrier2 = Arc::new(tokio::sync::Barrier::new(2)); + let barrier2_captured = Arc::clone(&barrier2); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier1_captured.wait().await; + barrier2_captured.wait().await; + Ok(1) + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + // fire up stream + ensure_pending(&mut f).await; + barrier1.wait().await; + + // cancel + drop(f); + + barrier2.wait().await; + let res = stream.next().await.expect("streamed data"); + assert_eq!(res.unwrap(), 1); + } + + #[tokio::test] + async fn test_cancel_stream() { + let exec = testing_executor(); + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_captured = Arc::clone(&barrier); + let mut stream = CrossRtStream::>::new_with_error_stream( + futures::stream::once(async move { + barrier_captured.wait().await; + + // block forever + futures::future::pending::<()>().await; + + // keep barrier Arc alive + drop(barrier_captured); + unreachable!() + }), + exec, + std::convert::identity, + ); + + let mut f = stream.next(); + + // fire up stream + ensure_pending(&mut f).await; + barrier.wait().await; + assert_eq!(Arc::strong_count(&barrier), 2); + + // cancel + drop(f); + drop(stream); + + tokio::time::timeout(Duration::from_secs(5), async { + loop { + if Arc::strong_count(&barrier) == 1 { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_inner_future_driven_to_completion_after_stream_ready() { + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_captured = Arc::clone(&barrier); + + let mut stream = CrossRtStream::::new_with_tx(|tx| async move { + tx.send(1).await.ok(); + drop(tx); + barrier_captured.wait().await; + }); + + let handle = tokio::spawn(async move { barrier.wait().await }); + + assert_eq!(stream.next().await, Some(1)); + handle.await.unwrap(); + } + + async fn ensure_pending(f: &mut F) + where + F: Future + Send + Unpin, + { + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(100)) => {} + _ = f => {panic!("not pending")}, + } + } +} diff --git a/datafusion/physical-plan/src/dedicated_executor.rs b/datafusion/physical-plan/src/dedicated_executor.rs new file mode 100644 index 0000000000000..20949a7f1bff3 --- /dev/null +++ b/datafusion/physical-plan/src/dedicated_executor.rs @@ -0,0 +1,956 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [DedicatedExecutor] for running CPU-bound tasks on a separate tokio runtime. +//! +//! Originally from [InfluxDB 3.0] +//! +//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor +use crate::cross_rt_stream::CrossRtStream; +use crate::io_object_store::IoObjectStore; +use crate::stream::RecordBatchStreamAdapter; +use crate::SendableRecordBatchStream; +use datafusion_common::DataFusionError; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, TryFutureExt, +}; +use log::{info, warn}; +use object_store::ObjectStore; +use parking_lot::RwLock; +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt::Display, sync::Arc, time::Duration}; +use tokio::runtime::Builder; +use tokio::task::JoinHandle; +use tokio::{ + runtime::Handle, + sync::{oneshot::error::RecvError, Notify}, + task::JoinSet, +}; + +impl From for DedicatedExecutorBuilder { + fn from(value: Builder) -> Self { + Self::new_from_builder(value) + } +} + +/// Manages a separate tokio [`Runtime`] (thread pool) for executing tasks such +/// as DataFusion `ExecutionPlans`. +/// +/// See [`DedicatedExecutorBuilder`] for creating a new instance. +/// +/// A `DedicatedExecutor` makes it easier to avoid running IO and CPU bound +/// tasks on the same threadpool by running futures (and any `tasks` that are +/// `tokio::task::spawned` by them) on a separate tokio [`Executor`]. +/// +/// TODO add note about `io_thread` +/// +/// TODO: things we use in InfluxData +/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we can wrap DedicatedExectors like IOxDedicatedExecutors +/// 2. Some sort of hook to install tokio metrics +/// +/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio +/// runtime will be maked for io, via [`register_io_runtime`] by all threads +/// spawned by the executor. Any I/O done by threads in this +/// [`DedicatedExecutor`] should use [`spawn_io`], which will run them on the I/O +/// runtime. +/// +/// ## TODO examples +/// +/// # Background +/// +/// Tokio has the notion of the "current" runtime, which runs the current future +/// and any tasks spawned by it. Typically, this is the runtime created by +/// `tokio::main` and is used for the main application logic and I/O handling +/// +/// For CPU bound work, such as DataFusion plan execution, it is important to +/// run on a separate thread pool to avoid blocking the I/O handling for extended +/// periods of time in order to avoid long poll latencies (which decreases the +/// throughput of small requests under concurrent load). +/// +/// # IO Scheduling +/// +/// I/O, such as network calls, should not be performed on the runtime managed +/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running +/// CPU tasks will not be preempted and can therefore starve servicing of other +/// tasks. This manifests in long poll-latencies, where a task is ready to run +/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as +/// there is no external party waiting on a response, however, for I/O tasks, +/// long poll latencies can prevent timely servicing of IO, which can have a +/// significant detrimental effect. +/// +/// # Details +/// +/// The worker thread priority is set to low so that such tasks do +/// not starve other more important tasks (such as answering health checks) +/// +/// Follows the example from stack overflow and spawns a new +/// thread to install a Tokio runtime "context" +/// +/// +/// # Trouble Shooting: +/// +/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread! +/// +/// This means that IO was attempted on a tokio runtime that was not registered +/// for IO. One solution is to run the task using [DedicatedExecutor::spawn]. +/// +/// ## "Cannot drop a runtime in a context where blocking is not allowed"` +/// +/// If you try to use this structure from an async context you see something like +/// thread 'test_builder_plan' panicked at 'Cannot +/// drop a runtime in a context where blocking is not allowed it means This +/// happens when a runtime is dropped from within an asynchronous +/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 +/// +#[derive(Clone)] +pub struct DedicatedExecutor { + state: Arc>, +} + +impl DedicatedExecutor { + /// Create a new builder to crate a [`DedicatedExecutor`] + pub fn builder() -> DedicatedExecutorBuilder { + DedicatedExecutorBuilder::new() + } + + /// Runs the specified [`Future`] (and any tasks it spawns) on the thread + /// pool managed by this `DedicatedExecutor`. + /// + /// # Notes + /// + /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when + /// it is dropped. Thus, you need ensure the returned future lives until it + /// completes (call `await`) or you wish to cancel it. + /// + /// All spawned tasks are added to the tokio executor immediately and + /// compete for the threadpool's resources. + pub fn spawn(&self, task: T) -> impl Future> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let handle = { + let state = self.state.read(); + state.handle.clone() + }; + + let Some(handle) = handle else { + return futures::future::err(JobError::WorkerGone).boxed(); + }; + + // use JoinSet implement "cancel on drop" + let mut join_set = JoinSet::new(); + join_set.spawn_on(task, &handle); + async move { + join_set + .join_next() + .await + .expect("just spawned task") + .map_err(|e| match e.try_into_panic() { + Ok(e) => { + let s = if let Some(s) = e.downcast_ref::() { + s.clone() + } else if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else { + "unknown internal error".to_string() + }; + + JobError::Panic { msg: s } + } + Err(_) => JobError::WorkerGone, + }) + } + .boxed() + } + + /// signals shutdown of this executor and any Clones + pub fn shutdown(&self) { + // hang up the channel which will cause the dedicated thread + // to quit + let mut state = self.state.write(); + state.handle = None; + state.start_shutdown.notify_one(); + } + + /// Stops all subsequent task executions, and waits for the worker + /// thread to complete. Note this will shutdown all clones of this + /// `DedicatedExecutor` as well. + /// + /// Only the first all to `join` will actually wait for the + /// executing thread to complete. All other calls to join will + /// complete immediately. + /// + /// # Panic / Drop + /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call + /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see + /// . + pub async fn join(&self) { + self.shutdown(); + + // get handle mutex is held + let handle = { + let state = self.state.read(); + state.completed_shutdown.clone() + }; + + // wait for completion while not holding the mutex to avoid + // deadlocks + handle.await.expect("Thread died?") + } + + /// Returns an ObjectStore instance that will always perform I/O work on the + /// IO_RUNTIME. + pub fn wrap_object_store(object_store: Arc) -> Arc { + Arc::new(IoObjectStore::new(object_store)) + } + + /// Returns a SendableRecordBatchStream that will run on this executor's thread pool + pub fn wrap_stream( + &self, + stream: SendableRecordBatchStream, + ) -> SendableRecordBatchStream { + let schema = stream.schema(); + let cross_rt_stream = + CrossRtStream::new_with_df_error_stream(stream, self.clone()); + Box::pin(RecordBatchStreamAdapter::new(schema, cross_rt_stream)) + } + + /// Registers `handle` as the IO runtime for this thread + /// + /// This sets a thread-local variable + /// + /// See [`spawn_io`](Self::spawn_io) for more details + pub fn register_io_runtime(handle: Option) { + IO_RUNTIME.set(handle) + } + + /// Registers the "current" `handle` as the IO runtime for this thread + /// + /// This is useful for testing purposes. + /// + /// # Panics if no current handle is available (aka not running in a tokio + /// runtime) + pub fn register_current_runtime_for_io() { + Self::register_io_runtime(Some(Handle::current())) + } + + /// Runs `fut` on the runtime registered by [`register_io_runtime`] if any, + /// otherwise awaits on the current thread + /// + /// # Panic + /// Needs a IO runtime [registered](register_io_runtime). + pub async fn spawn_io(fut: Fut) -> Fut::Output + where + Fut: Future + Send + 'static, + Fut::Output: Send, + { + let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect( + "No IO runtime registered. If you hit this panic, it likely \ + means a DataFusion plan or other CPU bound work is running on the \ + a tokio threadpool used for IO. Try spawning the work using \ + `DedicatedExecutor::spawn` or for tests `DedicatedExecutor::register_current_runtime_for_io`", + ); + DropGuard(h.spawn(fut)).await + } +} + +thread_local! { + /// Tokio runtime `Handle` for doing network (I/O) operations, see [`spawn_io`] + pub static IO_RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +struct DropGuard(JoinHandle); +impl Drop for DropGuard { + fn drop(&mut self) { + self.0.abort() + } +} + +impl Future for DropGuard { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(match std::task::ready!(self.0.poll_unpin(cx)) { + Ok(v) => v, + Err(e) if e.is_cancelled() => panic!("IO runtime was shut down"), + Err(e) => std::panic::resume_unwind(e.into_panic()), + }) + } +} + +/// Runs futures (and any `tasks` that are `tokio::task::spawned` by +/// them) on a separate tokio Executor. +/// +/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for +/// [`start_shutdown`](Self::start_shutdown) and signals the completion via +/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side). +struct State { + /// Runtime handle. + /// + /// This is `None` when the executor is shutting down. + handle: Option, + + /// If notified, the executor tokio runtime will begin to shutdown. + /// + /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and + /// locking of the state. Just using a proper async signal is nicer. + start_shutdown: Arc, + + /// Receiver side indicating that shutdown is complete. + completed_shutdown: Shared>>>, + + /// The inner thread that can be used to join during drop. + thread: Option>, +} + +/// IMPORTANT: Implement `Drop` for [`State`], NOT for [`DedicatedExecutor`], +/// because the executor can be cloned and clones share their inner state. +impl Drop for State { + fn drop(&mut self) { + if self.handle.is_some() { + warn!("DedicatedExecutor dropped without calling shutdown()"); + self.handle = None; + self.start_shutdown.notify_one(); + } + + // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575 + if !std::thread::panicking() + && self.completed_shutdown.clone().now_or_never().is_none() + { + warn!("DedicatedExecutor dropped without waiting for worker termination",); + } + + // join thread but don't care about the results + self.thread.take().expect("not dropped yet").join().ok(); + } +} + +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Potential error returned when polling [`DedicatedExecutor::spawn`]. +#[derive(Debug)] +pub enum JobError { + WorkerGone, + Panic { msg: String }, +} + +impl Display for JobError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JobError::WorkerGone => { + write!(f, "Worker thread gone, executor was likely shut down") + } + JobError::Panic { msg } => write!(f, "Panic: {}", msg), + } + } +} + +impl std::error::Error for JobError {} + +/// Builder for [`DedicatedExecutor`] +pub struct DedicatedExecutorBuilder { + /// Name given to all execution threads. Defaults to "DedicatedExecutor" + name: String, + /// Builder for tokio runtime. Defaults to multi-threaded builder + runtime_builder: Builder, +} + +impl From for DataFusionError { + fn from(value: JobError) -> Self { + DataFusionError::External(Box::new(value)) + .context("JobError from DedicatedExecutor") + } +} + +impl DedicatedExecutorBuilder { + /// Create a new `DedicatedExecutorBuilder` with default values + /// + /// Note that by default this `DedicatedExecutor` will not be able to + /// perform network I/O. + pub fn new() -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder: Builder::new_multi_thread(), + } + } + + /// Create a new `DedicatedExecutorBuilder` from a pre-existing tokio + /// runtime [`Builder`]. + /// + /// This method permits customizing the tokio [`Executor`] used for the + /// [`DedicatedExecutor`] + pub fn new_from_builder(runtime_builder: Builder) -> Self { + Self { + name: String::from("DedicatedExecutor"), + runtime_builder, + } + } + + /// Set the name of the dedicated executor (appear in the names of each thread). + /// + /// Defaults to "DedicatedExecutor" + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + /// Set the number of worker threads. Defaults to the tokio default (the + /// number of virtual CPUs) + pub fn with_worker_threads(mut self, num_threads: usize) -> Self { + self.runtime_builder.worker_threads(num_threads); + self + } + + /// Creates a new `DedicatedExecutor` with a dedicated tokio + /// executor that is separate from the thread pool created via + /// `[tokio::main]` or similar. + /// + /// Note: If [`DedicatedExecutorBuilder::build`] is called from an existing + /// tokio runtime, it will assume that the existing runtime should be used + /// for I/O. + /// + /// See the documentation on [`DedicatedExecutor`] for more details. + pub fn build(self) -> DedicatedExecutor { + let Self { + name, + runtime_builder, + } = self; + + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel(); + let (tx_handle, rx_handle) = std::sync::mpsc::channel(); + + let io_handle = Handle::try_current().ok(); + let thread = std::thread::Builder::new() + .name(format!("{name} driver")) + .spawn(move || { + // also register the IO runtime for the current thread, since it might be used as well (esp. for the + // current thread RT) + DedicatedExecutor::register_io_runtime(io_handle.clone()); + + info!("Creating DedicatedExecutor",); + + let mut runtime_builder = runtime_builder; + let runtime = runtime_builder + .on_thread_start(move || { + DedicatedExecutor::register_io_runtime(io_handle.clone()) + }) + .build() + .expect("Creating tokio runtime"); + + runtime.block_on(async move { + // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread + // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is + // started right after the constructor finishes and the new runtime calls + // `notify_shutdown_captured.notified().await`. + // + // Tokio provides an API for that by calling `enable` on the `notified` future (this requires + // pinning though). + let shutdown = notify_shutdown_captured.notified(); + let mut shutdown = std::pin::pin!(shutdown); + shutdown.as_mut().enable(); + + if tx_handle.send(Handle::current()).is_err() { + return; + } + shutdown.await; + }); + + runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + + // send shutdown "done" signal + tx_shutdown.send(()).ok(); + }) + .expect("executor setup"); + + let handle = rx_handle.recv().expect("driver started"); + + let state = State { + handle: Some(handle), + start_shutdown: notify_shutdown, + completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(), + thread: Some(thread), + }; + + DedicatedExecutor { + state: Arc::new(RwLock::new(state)), + } + } +} + +#[cfg(test)] +#[allow(unused_qualifications)] +mod tests { + use super::*; + use std::{ + panic::panic_any, + sync::{Arc, Barrier}, + time::Duration, + }; + use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier}; + + /// Wait for the barrier and then return `result` + async fn do_work(result: usize, barrier: Arc) -> usize { + barrier.wait(); + result + } + + /// Wait for the barrier and then return `result` + async fn do_work_async(result: usize, barrier: Arc) -> usize { + barrier.wait().await; + result + } + + fn exec() -> DedicatedExecutor { + exec_with_threads(1) + } + + fn exec2() -> DedicatedExecutor { + exec_with_threads(2) + } + + fn exec_with_threads(threads: usize) -> DedicatedExecutor { + let mut runtime_builder = Builder::new_multi_thread(); + runtime_builder.worker_threads(threads); + runtime_builder.enable_all(); + + DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build() + } + + async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) { + let io_runtime_id = std::thread::current().id(); + dedicated + .spawn(async move { + let dedicated_id = std::thread::current().id(); + let spawned = + DedicatedExecutor::spawn_io( + async move { std::thread::current().id() }, + ) + .await; + + assert_ne!(dedicated_id, spawned); + assert_eq!(io_runtime_id, spawned); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn basic() { + let barrier = Arc::new(Barrier::new(2)); + + let exec = exec(); + let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // Note the dedicated task will never complete if it runs on + // the main tokio thread (as this test is not using the + // 'multithreaded' version of the executor and the call to + // barrier.wait actually blocks the tokio thread) + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn basic_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + // Run task on clone should work fine + let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn drop_empty_exec() { + exec(); + } + + #[tokio::test] + async fn drop_clone() { + let barrier = Arc::new(Barrier::new(2)); + let exec = exec(); + + drop(exec.clone()); + + let task = exec.spawn(do_work(42, Arc::clone(&barrier))); + barrier.wait(); + assert_eq!(task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + #[should_panic(expected = "foo")] + async fn just_panic() { + struct S(DedicatedExecutor); + + impl Drop for S { + fn drop(&mut self) { + self.0.join().now_or_never(); + } + } + + let exec = exec(); + let _s = S(exec); + + // this must not lead to a double-panic and SIGILL + panic!("foo") + } + + #[tokio::test] + async fn multi_task() { + let barrier = Arc::new(Barrier::new(3)); + + // make an executor with two threads + let exec = exec2(); + let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); + let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); + + // block main thread until completion of other two tasks + barrier.wait(); + + // should be able to get the result + assert_eq!(dedicated_task1.await.unwrap(), 11); + assert_eq!(dedicated_task2.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn tokio_spawn() { + let exec = exec2(); + + // spawn a task that spawns to other tasks and ensure they run on the dedicated + // executor + let dedicated_task = exec.spawn(async move { + // spawn separate tasks + let t1 = tokio::task::spawn(async { 25usize }); + t1.await.unwrap() + }); + + // Validate the inner task ran to completion (aka it did not panic) + assert_eq!(dedicated_task.await.unwrap(), 25); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_str() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("At the disco, on the dedicated task scheduler"); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Panic: At the disco, on the dedicated task scheduler", + ); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_string() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic!("{} {}", 1, 2); + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: 1 2",); + + exec.join().await; + } + + #[tokio::test] + async fn panic_on_executor_other() { + let exec = exec(); + let dedicated_task = exec.spawn(async move { + if true { + panic_any(1) + } else { + 42 + } + }); + + // should not be able to get the result + let err = dedicated_task.await.unwrap_err(); + assert_eq!(err.to_string(), "Panic: unknown internal error",); + + exec.join().await; + } + + #[tokio::test] + async fn executor_shutdown_while_task_running() { + let barrier_1 = Arc::new(Barrier::new(2)); + let captured_1 = Arc::clone(&barrier_1); + let barrier_2 = Arc::new(Barrier::new(2)); + let captured_2 = Arc::clone(&barrier_2); + + let exec = exec(); + let dedicated_task = exec.spawn(async move { + captured_1.wait(); + do_work(42, captured_2).await + }); + barrier_1.wait(); + + exec.shutdown(); + // block main thread until completion of the outstanding task + barrier_2.wait(); + + // task should complete successfully + assert_eq!(dedicated_task.await.unwrap(), 42); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_shutdown() { + let exec = exec(); + + // Simulate trying to submit tasks once executor has shutdown + exec.shutdown(); + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_submit_task_after_clone_shutdown() { + let exec = exec(); + + // shutdown the clone (but not the exec) + exec.clone().join().await; + + // Simulate trying to submit tasks once executor has shutdown + let dedicated_task = exec.spawn(async { 11 }); + + // task should complete, but return an error + let err = dedicated_task.await.unwrap_err(); + assert_eq!( + err.to_string(), + "Worker thread gone, executor was likely shut down" + ); + + exec.join().await; + } + + #[tokio::test] + async fn executor_join() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + } + + #[tokio::test] + async fn executor_join2() { + let exec = exec(); + // test it doesn't hang + exec.join().await; + exec.join().await; + } + + #[tokio::test] + #[allow(clippy::redundant_clone)] + async fn executor_clone_join() { + let exec = exec(); + // test it doesn't hang + exec.clone().join().await; + exec.clone().join().await; + exec.join().await; + } + + #[tokio::test] + async fn drop_receiver() { + // create empty executor + let exec = exec(); + + // create first blocked task + let barrier1_pre = Arc::new(AsyncBarrier::new(2)); + let barrier1_pre_captured = Arc::clone(&barrier1_pre); + let barrier1_post = Arc::new(AsyncBarrier::new(2)); + let barrier1_post_captured = Arc::clone(&barrier1_post); + let dedicated_task1 = exec.spawn(async move { + barrier1_pre_captured.wait().await; + do_work_async(11, barrier1_post_captured).await + }); + barrier1_pre.wait().await; + + // create second blocked task + let barrier2_pre = Arc::new(AsyncBarrier::new(2)); + let barrier2_pre_captured = Arc::clone(&barrier2_pre); + let barrier2_post = Arc::new(AsyncBarrier::new(2)); + let barrier2_post_captured = Arc::clone(&barrier2_post); + let dedicated_task2 = exec.spawn(async move { + barrier2_pre_captured.wait().await; + do_work_async(22, barrier2_post_captured).await + }); + barrier2_pre.wait().await; + + // cancel task + drop(dedicated_task1); + + // cancelation might take a short while + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier1_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + // unblock other task + barrier2_post.wait().await; + assert_eq!(dedicated_task2.await.unwrap(), 22); + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if Arc::strong_count(&barrier2_post) == 1 { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await + } + }) + .await + .unwrap(); + + exec.join().await; + } + + #[tokio::test] + async fn test_io_runtime_multi_thread() { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.worker_threads(1); + + let dedicated = DedicatedExecutorBuilder::from(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_io_runtime_current_thread() { + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + + let dedicated = DedicatedExecutorBuilder::new_from_builder(runtime_builder) + .with_name("Test DedicatedExecutor") + .build(); + test_io_runtime_multi_thread_impl(dedicated).await; + } + + #[tokio::test] + async fn test_that_default_executor_prevents_io() { + let exec = DedicatedExecutorBuilder::new().build(); + + let io_disabled = exec + .spawn(async move { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_err() + }) + .await + .unwrap(); + + assert!(io_disabled) + } + + #[tokio::test] + async fn test_happy_path() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let io_thread_id = rt_io + .spawn(async move { std::thread::current().id() }) + .await + .unwrap(); + let parent_thread_id = std::thread::current().id(); + assert_ne!(io_thread_id, parent_thread_id); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + let measured_thread_id = + DedicatedExecutor::spawn_io(async move { std::thread::current().id() }).await; + assert_eq!(measured_thread_id, io_thread_id); + + rt_io.shutdown_background(); + } + + #[tokio::test] + #[should_panic(expected = "IO runtime registered")] + async fn test_panic_if_no_runtime_registered() { + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } + + #[tokio::test] + #[should_panic(expected = "IO runtime was shut down")] + async fn test_io_runtime_down() { + let rt_io = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + DedicatedExecutor::register_io_runtime(Some(rt_io.handle().clone())); + + tokio::task::spawn_blocking(move || { + rt_io.shutdown_timeout(Duration::from_secs(1)); + }) + .await + .unwrap(); + + DedicatedExecutor::spawn_io(futures::future::ready(())).await; + } +} diff --git a/datafusion/physical-plan/src/io_object_store.rs b/datafusion/physical-plan/src/io_object_store.rs new file mode 100644 index 0000000000000..88f03dd20d173 --- /dev/null +++ b/datafusion/physical-plan/src/io_object_store.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::DedicatedExecutor; +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; + +/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying +/// methods with [`DedicatedExecutor::spawn_io`] so that they are run on the Tokio Runtime +/// dedicated to doing IO. +/// +/// +/// +#[derive(Debug)] +pub struct IoObjectStore { + inner: Arc, +} + +impl IoObjectStore { + pub fn new(object_store: Arc) -> Self { + Self { + inner: object_store, + } + } +} + +impl std::fmt::Display for IoObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IoObjectStore") + } +} + +#[async_trait] +impl ObjectStore for IoObjectStore { + /// TODO wrap the resulting stream in CrossRTStream + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io( + async move { store.get_opts(&location, options).await }, + ) + .await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { store.delete(&location).await }).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let prefix = prefix.cloned(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { + store.list_with_delimiter(prefix.as_ref()).await + }) + .await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> Result> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { + store.put_multipart_opts(&location, opts).await + }) + .await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + DedicatedExecutor::spawn_io(async move { + store.put_opts(&location, payload, opts).await + }) + .await + } +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 845a74eaea48e..2e9adbdca2111 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -86,6 +86,11 @@ pub mod udaf { pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } +pub use dedicated_executor::{DedicatedExecutor, DedicatedExecutorBuilder}; + pub mod coalesce; +mod cross_rt_stream; +pub mod dedicated_executor; +mod io_object_store; #[cfg(test)] pub mod test;