Skip to content

Commit

Permalink
perf: If there are many small chunks in write_parquet(), convert to a…
Browse files Browse the repository at this point in the history
… single chunk (#14484) (#14487)

Co-authored-by: Itamar Turner-Trauring <itamar@pythonspeed.com>
  • Loading branch information
itamarst and pythonspeed authored Feb 22, 2024
1 parent b205950 commit 3b065f3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
10 changes: 7 additions & 3 deletions crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ pub fn split_series(s: &Series, n: usize) -> PolarsResult<Vec<Series>> {
split_array!(s, n, i64)
}

pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
pub fn split_df_as_ref(
df: &DataFrame,
n: usize,
extend_sub_chunks: bool,
) -> PolarsResult<Vec<DataFrame>> {
let total_len = df.height();
let chunk_size = std::cmp::max(total_len / n, 1);

Expand All @@ -155,7 +159,7 @@ pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>>
chunk_size
};
let df = df.slice((i * chunk_size) as i64, len);
if df.n_chunks() > 1 {
if extend_sub_chunks && df.n_chunks() > 1 {
// we add every chunk as separate dataframe. This make sure that every partition
// deals with it.
out.extend(flatten_df_iter(&df))
Expand All @@ -175,7 +179,7 @@ pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
}
// make sure that chunks are aligned.
df.align_chunks();
split_df_as_ref(df, n)
split_df_as_ref(df, n, true)
}

pub fn slice_slice<T>(vals: &[T], offset: i64, len: usize) -> &[T] {
Expand Down
17 changes: 15 additions & 2 deletions crates/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::array::{Array, ArrayRef};
use arrow::chunk::Chunk;
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_core::POOL;
use polars_parquet::read::ParquetError;
use polars_parquet::write::{self, DynIter, DynStreamingIterator, Encoding, FileWriter, *};
Expand Down Expand Up @@ -193,7 +193,20 @@ where

let n_splits = df.height() / self.row_group_size.unwrap_or(512 * 512);
if n_splits > 0 {
*df = accumulate_dataframes_vertical_unchecked(split_df(df, n_splits)?);
*df = accumulate_dataframes_vertical_unchecked(
split_df_as_ref(df, n_splits, false)?
.into_iter()
.map(|mut df| {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
df
}),
);
}
let mut batched = self.batched(&df.schema())?;
batched.write_batch(df)?;
Expand Down

0 comments on commit 3b065f3

Please sign in to comment.