Skip to content

Commit

Permalink
just use block_in_place
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Apr 5, 2024
1 parent adfd055 commit e39127e
Showing 1 changed file with 3 additions and 32 deletions.
35 changes: 3 additions & 32 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ where

pub struct RuntimeManager {
rt: Runtime,
blocking_markers: Vec<AtomicBool>,
}

impl RuntimeManager {
Expand All @@ -229,14 +228,7 @@ impl RuntimeManager {
.build()
.unwrap();

let blocking_markers = (0..POOL.current_num_threads())
.map(|_| AtomicBool::new(false))
.collect();

Self {
rt,
blocking_markers,
}
Self { rt }
}

/// Keep track of rayon threads that drive the runtime. Every thread
Expand All @@ -248,29 +240,8 @@ impl RuntimeManager {
F: Future + Send,
F::Output: Send,
{
let thread_idx = POOL.current_thread_index();

if thread_idx.is_none() {
self.rt.block_on(future)
} else {
let thread_idx = thread_idx.unwrap();
let tokio_entered = unsafe { self.blocking_markers.get_unchecked(thread_idx) };

if tokio_entered.load(Ordering::Relaxed) {
tokio_entered.store(true, Ordering::Relaxed);
let out = self.rt.block_on(future);
debug_assert_eq!(
thread_idx,
POOL.current_thread_index().unwrap(),
"execution after block_on should be by the same rayon thread"
);
tokio_entered.store(false, Ordering::Relaxed);
out
} else {
// Safety: The tokio runtime flavor is multi-threaded
tokio::task::block_in_place(|| self.rt.block_on(future))
}
}
// Safety: The tokio runtime flavor is multi-threaded
tokio::task::block_in_place(|| self.rt.block_on(future))
}

pub fn block_on<F>(&self, future: F) -> F::Output
Expand Down

0 comments on commit e39127e

Please sign in to comment.