Skip to content

Commit

Permalink
perf: Replace std::thread spawn with tokio block_in_place (pola-rs#15517
Browse files Browse the repository at this point in the history
)
  • Loading branch information
nameexhaustion committed Apr 7, 2024
1 parent 4b94d2f commit 1d4880f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 28 deletions.
15 changes: 5 additions & 10 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,22 +691,17 @@ impl BatchedParquetReader {
};

// Spawn the task and wait on it asynchronously.
let (dfs, rows_read, limit) = if POOL.current_thread_index().is_some() {
if POOL.current_thread_index().is_some() {
// We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until
// another rayon thread executes it - we would deadlock if all rayon threads did this.

// Activate another tokio thread to poll futures. There should be at least 1 tokio thread that is
// not a rayon thread.
let handle = tokio::spawn(async { rx.await.unwrap() });
// Now spawn the task onto rayon and participate in executing it. The current thread will no longer
// poll async futures until this rayon task is complete.
POOL.install(f);
handle.await.unwrap()
// Safety: The tokio runtime flavor is multi-threaded.
tokio::task::block_in_place(f);
} else {
POOL.spawn(f);
rx.await.unwrap()
};

let (dfs, rows_read, limit) = rx.await.unwrap();

self.rows_read = rows_read;
self.limit = limit;
dfs
Expand Down
23 changes: 5 additions & 18 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ use std::error::Error;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
use std::sync::RwLock;
use std::thread::ThreadId;

use once_cell::sync::Lazy;
use polars_core::config::verbose;
use polars_core::POOL;
use polars_utils::aliases::PlHashSet;
use tokio::runtime::{Builder, Runtime};
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -220,7 +217,6 @@ where

pub struct RuntimeManager {
rt: Runtime,
blocking_threads: RwLock<PlHashSet<ThreadId>>,
}

impl RuntimeManager {
Expand All @@ -232,31 +228,22 @@ impl RuntimeManager {
.build()
.unwrap();

Self {
rt,
blocking_threads: Default::default(),
}
Self { rt }
}

/// Keep track of rayon threads that drive the runtime. Every thread
/// only allows a single runtime. If this thread calls block_on and this
/// rayon thread is already driving an async execution we must start a new thread
/// otherwise we panic. This can happen when we parallelize reads over 100s of files.
///
/// # Safety
/// The tokio runtime flavor is multi-threaded.
pub fn block_on_potential_spawn<F>(&'static self, future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
{
let thread_id = std::thread::current().id();

if self.blocking_threads.read().unwrap().contains(&thread_id) {
std::thread::scope(|s| s.spawn(|| self.rt.block_on(future)).join().unwrap())
} else {
self.blocking_threads.write().unwrap().insert(thread_id);
let out = self.rt.block_on(future);
self.blocking_threads.write().unwrap().remove(&thread_id);
out
}
tokio::task::block_in_place(|| self.rt.block_on(future))
}

pub fn block_on<F>(&self, future: F) -> F::Output
Expand Down

0 comments on commit 1d4880f

Please sign in to comment.