Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Ensure parallel encoding/compression in sink_parquet #14964

Merged
merged 1 commit into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 123 additions & 8 deletions crates/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::borrow::Cow;
use std::collections::VecDeque;
use std::io::Write;
use std::sync::Mutex;

use arrow::array::Array;
use arrow::chunk::Chunk;
Expand All @@ -8,6 +10,7 @@ use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_core::POOL;
use polars_parquet::read::ParquetError;
pub use polars_parquet::write::RowGroupIter;
use polars_parquet::write::{self, *};
use rayon::prelude::*;
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -175,7 +178,7 @@ where
let parquet_schema = to_parquet_schema(&schema)?;
let encodings = get_encodings(&schema);
let options = self.materialize_options();
let writer = FileWriter::try_new(self.writer, schema, options)?;
let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?);

Ok(BatchedWriter {
writer,
Expand Down Expand Up @@ -223,7 +226,7 @@ fn prepare_rg_iter<'a>(
encodings: &'a [Vec<Encoding>],
options: WriteOptions,
parallel: bool,
) -> impl Iterator<Item = PolarsResult<RowGroupIter<'a, PolarsError>>> + 'a {
) -> impl Iterator<Item = PolarsResult<RowGroupIter<'static, PolarsError>>> + 'a {
let rb_iter = df.iter_chunks(true);
rb_iter.filter_map(move |batch| match batch.len() {
0 => None,
Expand Down Expand Up @@ -265,14 +268,36 @@ fn encoding_map(data_type: &ArrowDataType) -> Encoding {
}

pub struct BatchedWriter<W: Write> {
writer: FileWriter<W>,
// A mutex so that streaming engine can get concurrent read access to
// compress pages.
writer: Mutex<FileWriter<W>>,
parquet_schema: SchemaDescriptor,
encodings: Vec<Vec<Encoding>>,
options: WriteOptions,
parallel: bool,
}

impl<W: Write> BatchedWriter<W> {
pub fn encode_and_compress<'a>(
&'a self,
df: &'a DataFrame,
) -> impl Iterator<Item = PolarsResult<RowGroupIter<'static, PolarsError>>> + 'a {
let rb_iter = df.iter_chunks(true);
rb_iter.filter_map(move |batch| match batch.len() {
0 => None,
_ => {
let row_group = create_eager_serializer(
batch,
self.parquet_schema.fields(),
self.encodings.as_ref(),
self.options,
);

Some(row_group)
},
})
}

/// Write a batch to the parquet writer.
///
/// # Panics
Expand All @@ -285,26 +310,45 @@ impl<W: Write> BatchedWriter<W> {
self.options,
self.parallel,
);
// Lock before looping so that order is maintained under contention.
let mut writer = self.writer.lock().unwrap();
for group in row_group_iter {
self.writer.write(group?)?;
writer.write(group?)?;
}
Ok(())
}

pub fn get_writer(&self) -> &Mutex<FileWriter<W>> {
&self.writer
}

pub fn write_row_groups(
&self,
rgs: Vec<RowGroupIter<'static, PolarsError>>,
) -> PolarsResult<()> {
// Lock before looping so that order is maintained.
let mut writer = self.writer.lock().unwrap();
for group in rgs {
writer.write(group)?;
}
Ok(())
}

/// Writes the footer of the parquet file. Returns the total size of the file.
pub fn finish(&mut self) -> PolarsResult<u64> {
let size = self.writer.end(None)?;
pub fn finish(&self) -> PolarsResult<u64> {
let mut writer = self.writer.lock().unwrap();
let size = writer.end(None)?;
Ok(size)
}
}

fn create_serializer<'a>(
fn create_serializer(
batch: Chunk<Box<dyn Array>>,
fields: &[ParquetType],
encodings: &[Vec<Encoding>],
options: WriteOptions,
parallel: bool,
) -> PolarsResult<RowGroupIter<'a, PolarsError>> {
) -> PolarsResult<RowGroupIter<'static, PolarsError>> {
let func = move |((array, type_), encoding): ((&ArrayRef, &ParquetType), &Vec<Encoding>)| {
let encoded_columns = array_to_columns(array, type_.clone(), options, encoding).unwrap();

Expand Down Expand Up @@ -356,3 +400,74 @@ fn create_serializer<'a>(

Ok(row_group)
}

struct CompressedPages {
pages: VecDeque<PolarsResult<CompressedPage>>,
current: Option<CompressedPage>,
}

impl CompressedPages {
fn new(pages: VecDeque<PolarsResult<CompressedPage>>) -> Self {
Self {
pages,
current: None,
}
}
}

impl FallibleStreamingIterator for CompressedPages {
type Item = CompressedPage;
type Error = PolarsError;

fn advance(&mut self) -> Result<(), Self::Error> {
self.current = self.pages.pop_front().transpose()?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}

/// This serializer encodes and compresses all eagerly in memory.
/// Used for separating compute from IO.
fn create_eager_serializer(
batch: Chunk<Box<dyn Array>>,
fields: &[ParquetType],
encodings: &[Vec<Encoding>],
options: WriteOptions,
) -> PolarsResult<RowGroupIter<'static, PolarsError>> {
let func = move |((array, type_), encoding): ((&ArrayRef, &ParquetType), &Vec<Encoding>)| {
let encoded_columns = array_to_columns(array, type_.clone(), options, encoding).unwrap();

encoded_columns
.into_iter()
.map(|encoded_pages| {
let compressed_pages = encoded_pages
.into_iter()
.map(|page| {
let page = page?;
let page = compress(page, vec![], options.compression)?;
Ok(Ok(page))
})
.collect::<PolarsResult<VecDeque<_>>>()?;

Ok(DynStreamingIterator::new(CompressedPages::new(
compressed_pages,
)))
})
.collect::<Vec<_>>()
};

let columns = batch
.columns()
.iter()
.zip(fields)
.zip(encodings)
.flat_map(func)
.collect::<Vec<_>>();

let row_group = DynIter::new(columns.into_iter());

Ok(row_group)
}
125 changes: 108 additions & 17 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,64 @@
use std::any::Any;
use std::path::Path;
use std::thread::JoinHandle;

use crossbeam_channel::bounded;
use crossbeam_channel::{bounded, Receiver, Sender};
use polars_core::prelude::*;
use polars_io::parquet::ParquetWriter;
use polars_io::parquet::{BatchedWriter, ParquetWriter, RowGroupIter};
use polars_plan::prelude::ParquetWriteOptions;

use crate::executors::sinks::output::file_sink::{init_writer_thread, FilesSink, SinkWriter};
use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};
use crate::pipeline::morsels_per_sink;

pub struct ParquetSink {}
type RowGroups = Vec<RowGroupIter<'static, PolarsError>>;

pub(super) fn init_row_group_writer_thread(
receiver: Receiver<Option<(IdxSize, RowGroups)>>,
writer: Arc<BatchedWriter<std::fs::File>>,
// this is used to determine when a batch of chunks should be written to disk
// all chunks per push should be collected to determine in which order they should
// be written
morsels_per_sink: usize,
) -> JoinHandle<()> {
std::thread::spawn(move || {
// keep chunks around until all chunks per sink are written
// then we write them all at once.
let mut batched = Vec::with_capacity(morsels_per_sink);
while let Ok(rgs) = receiver.recv() {
// `last_write` indicates if all chunks are processed, e.g. this is the last write.
// this is when `write_chunks` is called with `None`.
let last_write = if let Some(rgs) = rgs {
batched.push(rgs);
false
} else {
true
};

if batched.len() == morsels_per_sink || last_write {
batched.sort_by_key(|chunk| chunk.0);

for (_, rg) in batched.drain(0..) {
writer.write_row_groups(rg).unwrap()
}
}
if last_write {
writer.finish().unwrap();
return;
}
}
})
}

#[derive(Clone)]
pub struct ParquetSink {
writer: Arc<BatchedWriter<std::fs::File>>,
io_thread_handle: Arc<Option<JoinHandle<()>>>,
sender: Sender<Option<(IdxSize, RowGroups)>>,
}
impl ParquetSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
path: &Path,
options: ParquetWriteOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
pub fn new(path: &Path, options: ParquetWriteOptions, schema: &Schema) -> PolarsResult<Self> {
let file = std::fs::File::create(path)?;
let writer = ParquetWriter::new(file)
.with_compression(options.compression)
Expand All @@ -27,26 +70,74 @@ impl ParquetSink {
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let writer = Arc::new(writer);
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;

let backpressure = morsels_per_sink * 4;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
let io_thread_handle = Arc::new(Some(init_row_group_writer_thread(
receiver,
writer,
options.maintain_order,
writer.clone(),
morsels_per_sink,
)));

Ok(FilesSink {
sender,
Ok(Self {
writer,
io_thread_handle,
sender,
})
}
}

impl Sink for ParquetSink {
fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
// Encode and compress row-groups on every thread.
let row_groups = self
.writer
.encode_and_compress(&chunk.data)
.collect::<PolarsResult<Vec<_>>>()?;
// Only then send the compressed pages to the writer.
self.sender
.send(Some((chunk.chunk_index, row_groups)))
.unwrap();
Ok(SinkResult::CanHaveMoreInput)
}

fn combine(&mut self, _other: &mut dyn Sink) {
// Nothing to do
}

fn split(&self, _thread_no: usize) -> Box<dyn Sink> {
Box::new(self.clone())
}

fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
// `None` indicates that we can flush all remaining chunks.
self.sender.send(None).unwrap();

// wait until all files written
// some unwrap/mut kung-fu to get a hold of `self`
Arc::get_mut(&mut self.io_thread_handle)
.unwrap()
.take()
.unwrap()
.join()
.unwrap();

// return a dummy dataframe;
Ok(FinalizedSink::Finished(Default::default()))
}

fn as_any(&mut self) -> &mut dyn Any {
self
}

fn fmt(&self) -> &str {
"parquet_sink"
}
}

#[cfg(feature = "cloud")]
pub struct ParquetCloudSink {}
#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -79,7 +170,7 @@ impl ParquetCloudSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
parquet_options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
Loading