Skip to content

Commit

Permalink
perf(rust, python): use dedicated writer thread for sink_parquet (#6285)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 17, 2023
1 parent 6ad5b03 commit c6ee7af
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 60 deletions.
1 change: 1 addition & 0 deletions polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "Lazy query engine for the Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
crossbeam-channel = "0.5"
enum_dispatch = "0.3"
hashbrown.workspace = true
num.workspace = true
Expand Down
129 changes: 73 additions & 56 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,65 @@
use std::any::Any;
use std::path::Path;
use std::sync::Mutex;
// use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread::JoinHandle;

use crossbeam_channel::{bounded, Receiver, Sender};
use polars_core::prelude::*;
use polars_io::parquet::{BatchedWriter, ParquetWriter};
use polars_plan::prelude::ParquetWriteOptions;
use polars_utils::cell::SyncUnsafeCell;

use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};
use crate::pipeline::morsels_per_sink;

// Ensure the data is return in the order it was streamed
pub struct ParquetSink {
writer: Arc<SyncUnsafeCell<BatchedWriter<std::fs::File>>>,
chunks: Arc<Mutex<Vec<DataChunk>>>,
morsels_per_sink: u16,
fn init_writer_thread(
receiver: Receiver<Option<DataChunk>>,
mut writer: BatchedWriter<std::fs::File>,
maintain_order: bool,
// this is used to determine when a batch of chunks should be written to disk
// all chunks per push should be collected to determine in which order they should
// be written
morsels_per_sink: usize,
) -> JoinHandle<()> {
std::thread::spawn(move || {
// keep chunks around until all chunks per sink are written
// then we write them all at once.
let mut chunks = Vec::with_capacity(morsels_per_sink);

while let Ok(chunk) = receiver.recv() {
// `last_write` indicates if all chunks are processed, e.g. this is the last write.
// this is when `write_chunks` is called with `None`.
let last_write = if let Some(chunk) = chunk {
chunks.push(chunk);
false
} else {
true
};

if chunks.len() == morsels_per_sink || last_write {
if maintain_order {
chunks.sort_by_key(|chunk| chunk.chunk_index);
}

for chunk in chunks.iter() {
writer.write_batch(&chunk.data).unwrap()
}
// all chunks are written remove them
chunks.clear();

if last_write {
writer.finish().unwrap();
return;
}
}
}
})
}

impl Clone for ParquetSink {
fn clone(&self) -> Self {
Self {
writer: self.writer.clone(),
chunks: self.chunks.clone(),
morsels_per_sink: self.morsels_per_sink,
maintain_order: self.maintain_order,
}
}
// Ensure the data is return in the order it was streamed
#[derive(Clone)]
pub struct ParquetSink {
sender: Sender<Option<DataChunk>>,
io_thread_handle: Arc<Option<JoinHandle<()>>>,
}

impl ParquetSink {
Expand All @@ -39,54 +72,29 @@ impl ParquetSink {
.with_row_group_size(options.row_group_size)
.batched(schema)?;

let morsels_per_sink = morsels_per_sink() as u16;
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

Ok(ParquetSink {
writer: Arc::new(SyncUnsafeCell::new(writer)),
chunks: Default::default(),
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
morsels_per_sink,
maintain_order: options.maintain_order,
})
}
)));

// if chunk is `None` we will finalize the writer
fn write_chunks(&mut self, chunk: Option<DataChunk>) -> PolarsResult<()> {
let mut chunks = self.chunks.lock().unwrap();
let last_write = if let Some(chunk) = chunk {
chunks.push(chunk);
false
} else {
true
};

// TODO! speed this up by having a write thread that will make this async
if chunks.len() as u16 == self.morsels_per_sink || last_write {
// safety: we hold the mutex lock in chunks
let writer = unsafe { &mut *(*self.writer).get() };

if self.maintain_order {
chunks.sort_by_key(|chunk| chunk.chunk_index);
}

for chunk in chunks.iter() {
writer.write_batch(&chunk.data)?
}
// all chunks are written remove them
chunks.clear();

if last_write {
writer.finish()?;
}
}
Ok(())
Ok(ParquetSink {
sender,
io_thread_handle,
})
}
}

impl Sink for ParquetSink {
fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
// don't add empty dataframes
if chunk.data.height() > 0 {
self.write_chunks(Some(chunk))?;
self.sender.send(Some(chunk)).unwrap();
};
Ok(SinkResult::CanHaveMoreInput)
}
Expand All @@ -99,8 +107,17 @@ impl Sink for ParquetSink {
Box::new(self.clone())
}
fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
// write remaining chunks
self.write_chunks(None)?;
// `None` indicates that we can flush all remaining chunks.
self.sender.send(None).unwrap();

// wait until all files written
// some unwrap/mut kung-fu to get a hold of `self`
Arc::get_mut(&mut self.io_thread_handle)
.unwrap()
.take()
.unwrap()
.join()
.unwrap();

// return a dummy dataframe;
Ok(FinalizedSink::Finished(Default::default()))
Expand Down
7 changes: 3 additions & 4 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crossbeam_channel::{bounded, Sender};
use polars_core::prelude::*;
use polars_core::{POOL, PROCESS_ID};
use polars_io::prelude::*;
Expand All @@ -13,7 +13,7 @@ pub(super) type DfIter = Box<dyn ExactSizeIterator<Item = DataFrame> + Sync + Se
type Payload = (Option<IdxCa>, DfIter);

pub(super) struct IOThread {
sender: SyncSender<Payload>,
sender: Sender<Payload>,
pub(super) dir: PathBuf,
pub(super) sent: Arc<AtomicUsize>,
pub(super) total: Arc<AtomicUsize>,
Expand Down Expand Up @@ -53,8 +53,7 @@ impl IOThread {
std::fs::create_dir_all(&dir)?;

// we need some pushback otherwise we still could go OOM.
let (sender, receiver) =
std::sync::mpsc::sync_channel::<Payload>(POOL.current_num_threads() * 2);
let (sender, receiver) = bounded::<Payload>(POOL.current_num_threads() * 2);

let sent: Arc<AtomicUsize> = Default::default();
let total: Arc<AtomicUsize> = Default::default();
Expand Down
25 changes: 25 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c6ee7af

Please sign in to comment.