Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Combine small chunks in sinks for streaming pipelines #14346

Merged
Merged
1 change: 1 addition & 0 deletions crates/polars-pipe/src/executors/sinks/ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl Sink for OrderedSink {
)));
}
self.sort();

let chunks = std::mem::take(&mut self.chunks);
Ok(FinalizedSink::Finished(chunks_to_df_unchecked(chunks)))
}
Expand Down
23 changes: 20 additions & 3 deletions crates/polars-pipe/src/executors/sinks/output/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use std::thread::JoinHandle;
use crossbeam_channel::{Receiver, Sender};
use polars_core::prelude::*;

use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};
use crate::operators::{
DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, StreamingVstacker,
};

pub(super) trait SinkWriter {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>;

fn _finish(&mut self) -> PolarsResult<()>;
}

Expand All @@ -24,6 +27,7 @@ pub(super) fn init_writer_thread(
// keep chunks around until all chunks per sink are written
// then we write them all at once.
let mut chunks = Vec::with_capacity(morsels_per_sink);
let mut vstacker = StreamingVstacker::default();

while let Ok(chunk) = receiver.recv() {
// `last_write` indicates if all chunks are processed, e.g. this is the last write.
Expand All @@ -40,13 +44,26 @@ pub(super) fn init_writer_thread(
chunks.sort_by_key(|chunk| chunk.chunk_index);
}

for chunk in chunks.iter() {
writer._write_batch(&chunk.data).unwrap()
for chunk in chunks.drain(0..) {
itamarst marked this conversation as resolved.
Show resolved Hide resolved
for mut df in vstacker.add(chunk.data) {
// The dataframe may only be a single, large chunk, in
// which case we don't want to bother with copying it...
if df.n_chunks() > 1 {
df.as_single_chunk();
}
writer._write_batch(&df).unwrap();
}
}
// all chunks are written remove them
chunks.clear();

if last_write {
if let Some(mut df) = vstacker.finish() {
if df.n_chunks() > 1 {
df.as_single_chunk();
}
writer._write_batch(&df).unwrap();
}
writer._finish().unwrap();
return;
}
Expand Down
147 changes: 147 additions & 0 deletions crates/polars-pipe/src/operators/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,150 @@ impl DataChunk {
pub(crate) fn chunks_to_df_unchecked(chunks: Vec<DataChunk>) -> DataFrame {
accumulate_dataframes_vertical_unchecked(chunks.into_iter().map(|c| c.data))
}

/// Combine a series of `DataFrame`s, and if they're small enough, combine them
/// into larger `DataFrame`s using `vstack`. This allows the caller to turn them
/// into contiguous memory allocations so that we don't suffer from overhead of
/// many small writes. The assumption is that added `DataFrame`s are already in
/// the correct order, and can therefore be combined.
///
/// The benefit of having a series of `DataFrame` that are e.g. 4MB each that
/// are then made contiguous is that you're not using a lot of memory (an extra
/// 4MB), but you're still doing better than if you had a series of of 2KB
/// `DataFrame`s.
///
/// Changing the `DataFrame` into contiguous chunks is the caller's
/// responsibility.
#[cfg(feature = "parquet")]
#[derive(Clone)]
pub(crate) struct StreamingVstacker {
current_dataframe: Option<DataFrame>,
/// How big should resulting chunks be, if possible?
output_chunk_size: usize,
}

#[cfg(feature = "parquet")]
impl StreamingVstacker {
/// Create a new instance.
pub fn new(output_chunk_size: usize) -> Self {
Self {
current_dataframe: None,
output_chunk_size,
}
}

/// Add another `DataFrame`, return (potentially combined) `DataFrame`s that
/// result, if any.
pub fn add(&mut self, next_frame: DataFrame) -> impl Iterator<Item = DataFrame> {
let mut result: [Option<DataFrame>; 2] = [None, None];

// If the next chunk is too large, we probably don't want make copies of
// it if a caller does as_single_chunk(), so we flush in advance.
if self.current_dataframe.is_some()
&& next_frame.estimated_size() > self.output_chunk_size / 4
{
result[0] = self.flush();
}

if let Some(ref mut current_frame) = self.current_dataframe {
current_frame
.vstack_mut(&next_frame)
.expect("These are chunks from the same dataframe");
} else {
self.current_dataframe = Some(next_frame);
};

if self.current_dataframe.as_ref().unwrap().estimated_size() > self.output_chunk_size {
result[1] = self.flush();
}
result.into_iter().flatten()
}

/// Clear and return any cached `DataFrame` data.
#[must_use]
fn flush(&mut self) -> Option<DataFrame> {
std::mem::take(&mut self.current_dataframe)
}

/// Finish and return any remaining cached `DataFrame` data. The only way
/// that `SemicontiguousVstacker` should be cleaned up.
#[must_use]
pub fn finish(mut self) -> Option<DataFrame> {
self.flush()
}
}

#[cfg(feature = "parquet")]
impl Default for StreamingVstacker {
/// 4 MB was chosen based on some empirical experiments that showed it to
/// be decently faster than lower or higher values, and it's small enough
/// it won't impact memory usage significantly.
fn default() -> Self {
StreamingVstacker::new(4 * 1024 * 1024)
}
}

#[cfg(test)]
#[cfg(feature = "parquet")]
mod test {
use super::*;

/// DataFrames get merged into chunks that are bigger than the specified
/// size when possible.
#[test]
fn semicontiguous_vstacker_merges() {
let test = semicontiguous_vstacker_merges_impl;
test(vec![10]);
test(vec![10, 10, 10, 10, 10, 10, 10]);
test(vec![10, 40, 10, 10, 10, 10]);
test(vec![40, 10, 10, 40, 10, 10, 40]);
test(vec![50, 50, 50]);
}

/// Eventually would be nice to drive this with proptest.
fn semicontiguous_vstacker_merges_impl(df_lengths: Vec<usize>) {
// Convert the lengths into a series of DataFrames:
let mut vstacker = StreamingVstacker::new(4096);
let dfs: Vec<DataFrame> = df_lengths
.iter()
.enumerate()
.map(|(i, length)| {
let series = Series::new("val", vec![i as u64; *length]);
DataFrame::new(vec![series]).unwrap()
})
.collect();

// Combine the DataFrames using a SemicontiguousVstacker:
let mut results = vec![];
for (i, df) in dfs.iter().enumerate() {
for mut result_df in vstacker.add(df.clone()) {
result_df.as_single_chunk();
results.push((i, result_df));
}
}
if let Some(mut result_df) = vstacker.finish() {
result_df.as_single_chunk();
results.push((df_lengths.len() - 1, result_df));
}

// Make sure the lengths are as sufficiently large, and the chunks
// were merged, the whole point of the exercise:
for (original_idx, result_df) in &results {
if result_df.height() < 40 {
// This means either this was the last df, or the next one
// was big enough we decided not to aggregate.
if *original_idx < results.len() - 1 {
assert!(dfs[original_idx + 1].height() > 10);
}
}
// Make sure all result DataFrames only have a single chunk.
assert_eq!(result_df.get_columns()[0].chunk_lengths().len(), 1);
}

// Make sure the data was preserved:
assert_eq!(
accumulate_dataframes_vertical_unchecked(dfs.into_iter()),
accumulate_dataframes_vertical_unchecked(results.into_iter().map(|(_, df)| df)),
);
}
}
Loading