From 46be636ea557b8911cdfd4438c47c767cbba9eaa Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 7 Apr 2024 17:51:16 +1000 Subject: [PATCH] perf: Replace std::thread spawn with tokio block_in_place --- crates/polars-io/src/parquet/read_impl.rs | 15 +++++---------- crates/polars-io/src/pl_async.rs | 23 +++++------------------ 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 0f9a75b0ae79..c5de75c58ddc 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -691,22 +691,17 @@ impl BatchedParquetReader { }; // Spawn the task and wait on it asynchronously. - let (dfs, rows_read, limit) = if POOL.current_thread_index().is_some() { + if POOL.current_thread_index().is_some() { // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until // another rayon thread executes it - we would deadlock if all rayon threads did this. - - // Activate another tokio thread to poll futures. There should be at least 1 tokio thread that is - // not a rayon thread. - let handle = tokio::spawn(async { rx.await.unwrap() }); - // Now spawn the task onto rayon and participate in executing it. The current thread will no longer - // poll async futures until this rayon task is complete. - POOL.install(f); - handle.await.unwrap() + // Safety: The tokio runtime flavor is multi-threaded. + tokio::task::block_in_place(f); } else { POOL.spawn(f); - rx.await.unwrap() }; + let (dfs, rows_read, limit) = rx.await.unwrap(); + self.rows_read = rows_read; self.limit = limit; dfs diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 17172945dad1..87941f64ed39 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -2,13 +2,10 @@ use std::error::Error; use std::future::Future; use std::ops::Deref; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; -use std::sync::RwLock; -use std::thread::ThreadId; use once_cell::sync::Lazy; use polars_core::config::verbose; use polars_core::POOL; -use polars_utils::aliases::PlHashSet; use tokio::runtime::{Builder, Runtime}; use tokio::sync::Semaphore; @@ -220,7 +217,6 @@ where pub struct RuntimeManager { rt: Runtime, - blocking_threads: RwLock>, } impl RuntimeManager { @@ -232,31 +228,22 @@ impl RuntimeManager { .build() .unwrap(); - Self { - rt, - blocking_threads: Default::default(), - } + Self { rt } } /// Keep track of rayon threads that drive the runtime. Every thread /// only allows a single runtime. If this thread calls block_on and this /// rayon thread is already driving an async execution we must start a new thread /// otherwise we panic. This can happen when we parallelize reads over 100s of files. + /// + /// # Safety + /// The tokio runtime flavor is multi-threaded. pub fn block_on_potential_spawn(&'static self, future: F) -> F::Output where F: Future + Send, F::Output: Send, { - let thread_id = std::thread::current().id(); - - if self.blocking_threads.read().unwrap().contains(&thread_id) { - std::thread::scope(|s| s.spawn(|| self.rt.block_on(future)).join().unwrap()) - } else { - self.blocking_threads.write().unwrap().insert(thread_id); - let out = self.rt.block_on(future); - self.blocking_threads.write().unwrap().remove(&thread_id); - out - } + tokio::task::block_in_place(|| self.rt.block_on(future)) } pub fn block_on(&self, future: F) -> F::Output