diff --git a/crates/polars-core/src/utils/mod.rs b/crates/polars-core/src/utils/mod.rs index a9f68c58693d..f9a061b6ab19 100644 --- a/crates/polars-core/src/utils/mod.rs +++ b/crates/polars-core/src/utils/mod.rs @@ -133,7 +133,11 @@ pub fn split_series(s: &Series, n: usize) -> PolarsResult> { split_array!(s, n, i64) } -pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult> { +pub fn split_df_as_ref( + df: &DataFrame, + n: usize, + extend_sub_chunks: bool, +) -> PolarsResult> { let total_len = df.height(); let chunk_size = std::cmp::max(total_len / n, 1); @@ -155,7 +159,7 @@ pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult> chunk_size }; let df = df.slice((i * chunk_size) as i64, len); - if df.n_chunks() > 1 { + if extend_sub_chunks && df.n_chunks() > 1 { // we add every chunk as separate dataframe. This make sure that every partition // deals with it. out.extend(flatten_df_iter(&df)) @@ -175,7 +179,7 @@ pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult> { } // make sure that chunks are aligned. df.align_chunks(); - split_df_as_ref(df, n) + split_df_as_ref(df, n, true) } pub fn slice_slice(vals: &[T], offset: i64, len: usize) -> &[T] { diff --git a/crates/polars-io/src/parquet/write.rs b/crates/polars-io/src/parquet/write.rs index 0e6e53b7f48c..7313d80058e5 100644 --- a/crates/polars-io/src/parquet/write.rs +++ b/crates/polars-io/src/parquet/write.rs @@ -4,7 +4,7 @@ use arrow::array::{Array, ArrayRef}; use arrow::chunk::Chunk; use arrow::datatypes::{ArrowDataType, PhysicalType}; use polars_core::prelude::*; -use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df}; +use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref}; use polars_core::POOL; use polars_parquet::read::ParquetError; use polars_parquet::write::{self, DynIter, DynStreamingIterator, Encoding, FileWriter, *}; @@ -188,28 +188,25 @@ where /// Write the given DataFrame in the writer `W`. Returns the total size of the file. pub fn finish(self, df: &mut DataFrame) -> PolarsResult { - // There are two reasons why we might want to rechunk: - // - // 1. The different columns have unaligned chunks. - // 2. The chunks are aligned, but very small: writing many small chunks - // leads to slow writing performance. We use average estimated - // in-memory size of less than 128KB as a heuristic. - // - // Neither of these _require_ converting to a single chunk, and - // converting to a single chunk has the downside of doubling memory - // usage temporarily. In contrast, converting to a series of e.g. 64MB - // chunks would give the same benefits without increasing memory usage - // meaningfully. However, converting to a single chunk is how this code - // already worked previously (covering case 1 only), and implementing - // such a chunking algorithm for unaligned chunks is non-trivial, so a - // single chunk is what we'll do for now. - if df.should_rechunk() || (df.estimated_size() / df.n_chunks() < 128 * 1024) { - df.as_single_chunk_par(); - } + // ensures all chunks are aligned. + df.align_chunks(); let n_splits = df.height() / self.row_group_size.unwrap_or(512 * 512); if n_splits > 0 { - *df = accumulate_dataframes_vertical_unchecked(split_df(df, n_splits)?); + *df = accumulate_dataframes_vertical_unchecked( + split_df_as_ref(df, n_splits, false)? + .into_iter() + .map(|mut df| { + // If the chunks are small enough, writing many small chunks + // leads to slow writing performance, so in that case we + // merge them. + let n_chunks = df.n_chunks(); + if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) { + df.as_single_chunk_par(); + } + df + }), + ); } let mut batched = self.batched(&df.schema())?; batched.write_batch(df)?;