Skip to content

Commit

Permalink
A different strategy, with less memory usage and without statistics b…
Browse files Browse the repository at this point in the history
…reakage.
  • Loading branch information
pythonspeed committed Feb 16, 2024
1 parent d1b0d0f commit 6afa657
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
10 changes: 7 additions & 3 deletions crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ pub fn split_series(s: &Series, n: usize) -> PolarsResult<Vec<Series>> {
split_array!(s, n, i64)
}

pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
pub fn split_df_as_ref(
df: &DataFrame,
n: usize,
extend_sub_chunks: bool,
) -> PolarsResult<Vec<DataFrame>> {
let total_len = df.height();
let chunk_size = std::cmp::max(total_len / n, 1);

Expand All @@ -155,7 +159,7 @@ pub fn split_df_as_ref(df: &DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>>
chunk_size
};
let df = df.slice((i * chunk_size) as i64, len);
if df.n_chunks() > 1 {
if extend_sub_chunks && df.n_chunks() > 1 {
// we add every chunk as separate dataframe. This make sure that every partition
// deals with it.
out.extend(flatten_df_iter(&df))
Expand All @@ -175,7 +179,7 @@ pub fn split_df(df: &mut DataFrame, n: usize) -> PolarsResult<Vec<DataFrame>> {
}
// make sure that chunks are aligned.
df.align_chunks();
split_df_as_ref(df, n)
split_df_as_ref(df, n, true)
}

pub fn slice_slice<T>(vals: &[T], offset: i64, len: usize) -> &[T] {
Expand Down
37 changes: 17 additions & 20 deletions crates/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::array::{Array, ArrayRef};
use arrow::chunk::Chunk;
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_core::POOL;
use polars_parquet::read::ParquetError;
use polars_parquet::write::{self, DynIter, DynStreamingIterator, Encoding, FileWriter, *};
Expand Down Expand Up @@ -188,28 +188,25 @@ where

/// Write the given DataFrame in the writer `W`. Returns the total size of the file.
pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
// There are two reasons why we might want to rechunk:
//
// 1. The different columns have unaligned chunks.
// 2. The chunks are aligned, but very small: writing many small chunks
// leads to slow writing performance. We use average estimated
// in-memory size of less than 128KB as a heuristic.
//
// Neither of these _require_ converting to a single chunk, and
// converting to a single chunk has the downside of doubling memory
// usage temporarily. In contrast, converting to a series of e.g. 64MB
// chunks would give the same benefits without increasing memory usage
// meaningfully. However, converting to a single chunk is how this code
// already worked previously (covering case 1 only), and implementing
// such a chunking algorithm for unaligned chunks is non-trivial, so a
// single chunk is what we'll do for now.
if df.should_rechunk() || (df.estimated_size() / df.n_chunks() < 128 * 1024) {
df.as_single_chunk_par();
}
// ensures all chunks are aligned.
df.align_chunks();

let n_splits = df.height() / self.row_group_size.unwrap_or(512 * 512);
if n_splits > 0 {
*df = accumulate_dataframes_vertical_unchecked(split_df(df, n_splits)?);
*df = accumulate_dataframes_vertical_unchecked(
split_df_as_ref(df, n_splits, false)?
.into_iter()
.map(|mut df| {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
df
}),
);
}
let mut batched = self.batched(&df.schema())?;
batched.write_batch(df)?;
Expand Down

0 comments on commit 6afa657

Please sign in to comment.