Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Apr 3, 2024
1 parent 502ca73 commit 140c5e3
Showing 1 changed file with 2 additions and 49 deletions.
51 changes: 2 additions & 49 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,27 +215,8 @@ where
callable().await
}

// Bit array for threads to keep track of their runtime status. This is lock free
// as every thread uses the unique bit indexed by their thread ID.
struct BlockingMarkers(
*mut bool,
// This is here just for ownership of the underlying vec
Vec<bool>,
);
unsafe impl Send for BlockingMarkers {}
unsafe impl Sync for BlockingMarkers {}

impl BlockingMarkers {
#[allow(clippy::mut_from_ref)]
unsafe fn get_unchecked_mut(&self, index: usize) -> &mut bool {
debug_assert!(index < self.1.len());
&mut *self.0.add(index)
}
}

pub struct RuntimeManager {
rt: Runtime,
blocking_markers: BlockingMarkers,
}

impl RuntimeManager {
Expand All @@ -247,13 +228,7 @@ impl RuntimeManager {
.build()
.unwrap();

let mut blocking_markers = vec![false; POOL.current_num_threads()];
let blocking_markers = BlockingMarkers(blocking_markers.as_mut_ptr(), blocking_markers);

Self {
rt,
blocking_markers,
}
Self { rt }
}

/// Keep track of rayon threads that drive the runtime. Every thread
Expand All @@ -265,29 +240,7 @@ impl RuntimeManager {
F: Future + Send,
F::Output: Send,
{
let thread_idx = POOL.current_thread_index();

if thread_idx.is_none() {
self.rt.block_on(future)
} else {
let thread_idx = thread_idx.unwrap();
let tokio_entered = unsafe { self.blocking_markers.get_unchecked_mut(thread_idx) };

if !*tokio_entered {
*tokio_entered = true;
let out = self.rt.block_on(future);
debug_assert_eq!(
thread_idx,
POOL.current_thread_index().unwrap(),
"execution after block_on should be by the same rayon thread"
);
*tokio_entered = false;
out
} else {
// Safety: The tokio runtime flavor is multi-threaded
tokio::task::block_in_place(|| self.rt.block_on(future))
}
}
tokio::task::block_in_place(|| self.rt.block_on(future))
}

pub fn block_on<F>(&self, future: F) -> F::Output
Expand Down

0 comments on commit 140c5e3

Please sign in to comment.