Skip to content

Commit

Permalink
reduce thread spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Apr 13, 2024
1 parent 92902e6 commit e280283
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 86 deletions.
80 changes: 39 additions & 41 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,55 +314,53 @@ impl FetchRowGroupsFromObjectStore {
let (snd, rcv) = channel(msg_limit);
let snd = Arc::new(snd);

let _ = std::thread::spawn(move || {
get_runtime().block_on(async {
let chunk_len = msg_limit;
let mut handles = Vec::with_capacity(chunk_len.clamp(0, row_groups.len()));
for chunk in row_groups.chunks_mut(chunk_len) {
// Start downloads concurrently
for (i, rg) in chunk {
let rg = std::mem::take(rg);

match &projected_fields {
Some(projected_fields) => {
let handle = tokio::spawn(download_projection(
projected_fields.clone(),
rg,
reader.clone(),
snd.clone(),
*i,
));
handles.push(handle)
},
None => {
let handle = tokio::spawn(download_row_group(
rg,
reader.clone(),
snd.clone(),
*i,
));
handles.push(handle)
},
}
}

// Wait n - 3 tasks, so we already start the next downloads earlier.
for task in handles.drain(..handles.len().saturating_sub(3)) {
let succeeded = task.await.unwrap();
if !succeeded {
return;
}
get_runtime().spawn(async move {
let chunk_len = msg_limit;
let mut handles = Vec::with_capacity(chunk_len.clamp(0, row_groups.len()));
for chunk in row_groups.chunks_mut(chunk_len) {
// Start downloads concurrently
for (i, rg) in chunk {
let rg = std::mem::take(rg);

match &projected_fields {
Some(projected_fields) => {
let handle = tokio::spawn(download_projection(
projected_fields.clone(),
rg,
reader.clone(),
snd.clone(),
*i,
));
handles.push(handle)
},
None => {
let handle = tokio::spawn(download_row_group(
rg,
reader.clone(),
snd.clone(),
*i,
));
handles.push(handle)
},
}
}

// Drain remaining tasks.
for task in handles.drain(..) {
// Wait n - 3 tasks, so we already start the next downloads earlier.
for task in handles.drain(..handles.len().saturating_sub(3)) {
let succeeded = task.await.unwrap();
if !succeeded {
return;
}
}
})
}

// Drain remaining tasks.
for task in handles.drain(..) {
let succeeded = task.await.unwrap();
if !succeeded {
return;
}
}
});

Ok(FetchRowGroupsFromObjectStore {
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,15 @@ impl RuntimeManager {
{
self.rt.block_on(future)
}

/// Spawns a future onto the Tokio runtime (see [`tokio::runtime::Runtime::spawn`]).
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.rt.spawn(future)
}
}

static RUNTIME: Lazy<RuntimeManager> = Lazy::new(RuntimeManager::new);
Expand Down
48 changes: 22 additions & 26 deletions py-polars/src/functions/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,35 +118,31 @@ pub fn collect_all(lfs: Vec<PyLazyFrame>, py: Python) -> PyResult<Vec<PyDataFram
}

#[pyfunction]
pub fn collect_all_with_callback(lfs: Vec<PyLazyFrame>, lambda: PyObject, py: Python) {
pub fn collect_all_with_callback(lfs: Vec<PyLazyFrame>, lambda: PyObject) {
use polars_core::utils::rayon::prelude::*;

py.allow_threads(|| {
polars_core::POOL.install(move || {
polars_core::POOL.spawn(move || {
let result = lfs
.par_iter()
.map(|lf| {
let df = lf.ldf.clone().collect()?;
Ok(PyDataFrame::new(df))
})
.collect::<polars_core::error::PolarsResult<Vec<_>>>()
.map_err(PyPolarsErr::from);

Python::with_gil(|py| match result {
Ok(dfs) => {
lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
},
Err(err) => {
lambda
.call1(py, (PyErr::from(err).to_object(py),))
.map_err(|err| err.restore(py))
.ok();
},
})
polars_core::POOL.spawn(move || {
let result = lfs
.par_iter()
.map(|lf| {
let df = lf.ldf.clone().collect()?;
Ok(PyDataFrame::new(df))
})
});
});
.collect::<polars_core::error::PolarsResult<Vec<_>>>()
.map_err(PyPolarsErr::from);

Python::with_gil(|py| match result {
Ok(dfs) => {
lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
},
Err(err) => {
lambda
.call1(py, (PyErr::from(err).to_object(py),))
.map_err(|err| err.restore(py))
.ok();
},
})
})
}

#[pyfunction]
Expand Down
36 changes: 17 additions & 19 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,27 +569,25 @@ impl PyLazyFrame {
}

#[pyo3(signature = (lambda,))]
fn collect_with_callback(&self, py: Python, lambda: PyObject) {
py.allow_threads(|| {
fn collect_with_callback(&self, lambda: PyObject) {
polars_core::POOL.spawn(move || {
let ldf = self.ldf.clone();

polars_core::POOL.spawn(move || {
let result = ldf
.collect()
.map(PyDataFrame::new)
.map_err(PyPolarsErr::from);

Python::with_gil(|py| match result {
Ok(df) => {
lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
},
Err(err) => {
lambda
.call1(py, (PyErr::from(err).to_object(py),))
.map_err(|err| err.restore(py))
.ok();
},
});
let result = ldf
.collect()
.map(PyDataFrame::new)
.map_err(PyPolarsErr::from);

Python::with_gil(|py| match result {
Ok(df) => {
lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
},
Err(err) => {
lambda
.call1(py, (PyErr::from(err).to_object(py),))
.map_err(|err| err.restore(py))
.ok();
},
});
});
}
Expand Down

0 comments on commit e280283

Please sign in to comment.