Skip to content

Commit

Permalink
feat: Change API for writing partitioned Parquet to reduce code dupli…
Browse files Browse the repository at this point in the history
…cation (#17586)
  • Loading branch information
nameexhaustion authored Jul 14, 2024
1 parent 26dd9f3 commit d43df08
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 328 deletions.
12 changes: 1 addition & 11 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::io::ipc::{read, write};
use polars_core::prelude::*;

use crate::prelude::*;
use crate::shared::{finish_reader, ArrowReader, WriterFactory};
use crate::shared::{finish_reader, ArrowReader};

/// Read Arrows Stream IPC format into a DataFrame
///
Expand Down Expand Up @@ -290,13 +290,3 @@ impl Default for IpcStreamWriterOption {
Self::new()
}
}

impl WriterFactory for IpcStreamWriterOption {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>> {
Box::new(IpcStreamWriter::new(writer).with_compression(self.compression))
}

fn extension(&self) -> PathBuf {
self.extension.to_owned()
}
}
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ pub use ipc_file::{IpcReader, IpcScanOptions};
pub use ipc_reader_async::*;
#[cfg(feature = "ipc_streaming")]
pub use ipc_stream::*;
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption, IpcWriterOptions};
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOptions};
51 changes: 7 additions & 44 deletions crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::Write;
use std::path::PathBuf;

use arrow::io::ipc::write;
use arrow::io::ipc::write::WriteOptions;
Expand All @@ -8,7 +7,7 @@ use polars_core::prelude::*;
use serde::{Deserialize, Serialize};

use crate::prelude::*;
use crate::shared::{schema_to_arrow_checked, WriterFactory};
use crate::shared::schema_to_arrow_checked;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand All @@ -19,6 +18,12 @@ pub struct IpcWriterOptions {
pub maintain_order: bool,
}

impl IpcWriterOptions {
pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
IpcWriter::new(writer).with_compression(self.compression)
}
}

/// Write a DataFrame to Arrow's IPC format
///
/// # Example
Expand Down Expand Up @@ -153,45 +158,3 @@ impl From<IpcCompression> for write::Compression {
}
}
}

pub struct IpcWriterOption {
compression: Option<IpcCompression>,
extension: PathBuf,
}

impl IpcWriterOption {
pub fn new() -> Self {
Self {
compression: None,
extension: PathBuf::from(".ipc"),
}
}

/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
self.compression = compression;
self
}

/// Set the extension. Defaults to ".ipc".
pub fn with_extension(mut self, extension: PathBuf) -> Self {
self.extension = extension;
self
}
}

impl Default for IpcWriterOption {
fn default() -> Self {
Self::new()
}
}

impl WriterFactory for IpcWriterOption {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>> {
Box::new(IpcWriter::new(writer).with_compression(self.compression))
}

fn extension(&self) -> PathBuf {
self.extension.to_owned()
}
}
137 changes: 18 additions & 119 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
@@ -1,144 +1,43 @@
//! Functionality for writing a DataFrame partitioned into multiple files.

use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::path::Path;

use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;

use crate::parquet::write::ParquetWriteOptions;
use crate::utils::resolve_homedir;
use crate::WriterFactory;

/// Write a DataFrame with disk partitioning
///
/// # Example
/// ```
/// use polars_core::prelude::*;
/// use polars_io::ipc::IpcWriterOption;
/// use polars_io::partition::PartitionedWriter;
///
/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
/// let option = IpcWriterOption::default();
/// PartitionedWriter::new(option, "./rootdir", ["a", "b"])
/// .finish(df)
/// }
/// ```

pub struct PartitionedWriter<F> {
option: F,
rootdir: PathBuf,
by: Vec<String>,
parallel: bool,
}

impl<F> PartitionedWriter<F>
where
F: WriterFactory + Send + Sync,
{
pub fn new<P, I, S>(option: F, rootdir: P, by: I) -> Self
where
P: Into<PathBuf>,
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
Self {
option,
rootdir: rootdir.into(),
by: by.into_iter().map(|s| s.as_ref().to_string()).collect(),
parallel: true,
}
}

/// Write the parquet file in parallel (default).
pub fn with_parallel(mut self, parallel: bool) -> Self {
self.parallel = parallel;
self
}

fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> PolarsResult<()> {
let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df);
std::fs::create_dir_all(&path)?;

path.push(format!(
"data-{:04}.{}",
i,
self.option.extension().display()
));

let file = std::fs::File::create(path)?;
let writer = BufWriter::new(file);

self.option
.create_writer::<BufWriter<File>>(writer)
.finish(partition_df)
}

pub fn finish(self, df: &DataFrame) -> PolarsResult<()> {
let groups = df.group_by(self.by.clone())?;
let groups = groups.get_groups();

// don't parallelize this
// there is a lot of parallelization in take and this may easily SO
POOL.install(|| {
match groups {
GroupsProxy::Idx(idx) => {
idx.par_iter()
.enumerate()
.map(|(i, (_, group))| {
// groups are in bounds
// and sorted
let mut part_df = unsafe {
df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
};
self.write_partition_df(&mut part_df, i)
})
.collect::<PolarsResult<Vec<_>>>()
},
GroupsProxy::Slice { groups, .. } => groups
.par_iter()
.enumerate()
.map(|(i, [first, len])| {
let mut part_df = df.slice(*first as i64, *len as usize);
self.write_partition_df(&mut part_df, i)
})
.collect::<PolarsResult<Vec<_>>>(),
}
})?;
#[cfg(feature = "ipc")]
use crate::prelude::IpcWriterOptions;
use crate::{SerWriter, WriteDataFrameToFile};

impl WriteDataFrameToFile for ParquetWriteOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
Ok(())
}
}

/// `partition_df` must be created in the same way as `partition_by`.
fn resolve_partition_dir<I, S>(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut path = PathBuf::new();
path.push(resolve_homedir(rootdir));

for key in by.into_iter() {
let value = partition_df[key.as_ref()].get(0).unwrap().to_string();
path.push(format!("{}={}", key.as_ref(), value))
#[cfg(feature = "ipc")]
impl WriteDataFrameToFile for IpcWriterOptions {
fn write_df_to_file<W: std::io::Write>(&self, mut df: DataFrame, file: W) -> PolarsResult<()> {
self.to_writer(file).finish(&mut df)?;
Ok(())
}
path
}

/// Write a partitioned parquet dataset. This functionality is unstable.
pub fn write_partitioned_dataset<S>(
pub fn write_partitioned_dataset<S, O>(
df: &DataFrame,
path: &Path,
partition_by: &[S],
file_write_options: &ParquetWriteOptions,
file_write_options: &O,
chunk_size: usize,
) -> PolarsResult<()>
where
S: AsRef<str>,
O: WriteDataFrameToFile + Send + Sync,
{
// Note: When adding support for formats other than Parquet, avoid writing the partitioned
// columns into the file. We write them for parquet because they are encoded efficiently with
Expand Down Expand Up @@ -210,9 +109,9 @@ where
(n_files, rows_per_file)
};

let write_part = |mut df: DataFrame, path: &Path| {
let write_part = |df: DataFrame, path: &Path| {
let f = std::fs::File::create(path)?;
file_write_options.to_writer(f).finish(&mut df)?;
file_write_options.write_df_to_file(df, f)?;
PolarsResult::Ok(())
};

Expand Down Expand Up @@ -258,7 +157,7 @@ where
.par_iter()
.map(|group| {
let df = unsafe {
df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
};
finish_part_df(df)
})
Expand Down
6 changes: 2 additions & 4 deletions crates/polars-io/src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array::new_empty_array;
Expand Down Expand Up @@ -41,9 +40,8 @@ where
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()>;
}

pub trait WriterFactory {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>>;
fn extension(&self) -> PathBuf;
pub trait WriteDataFrameToFile {
fn write_df_to_file<W: std::io::Write>(&self, df: DataFrame, file: W) -> PolarsResult<()>;
}

pub trait ArrowReader {
Expand Down
5 changes: 3 additions & 2 deletions docs/src/python/user-guide/io/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
Path(path).parent.mkdir(exist_ok=True, parents=True)
df.write_parquet(path)

Path("docs/data/hive_mixed/description.txt").touch()
# Make sure the file is not empty because path expansion ignores empty files.
Path("docs/data/hive_mixed/description.txt").write_text("A")


def print_paths(path: str) -> None:
Expand Down Expand Up @@ -123,7 +124,7 @@ def dir_recurse(path: Path):
# --8<-- [end:write_parquet_partitioned_show_data]

# --8<-- [start:write_parquet_partitioned]
df.write_parquet_partitioned("docs/data/hive_write/", ["a", "b"])
df.write_parquet("docs/data/hive_write/", partition_by=["a", "b"])
# --8<-- [end:write_parquet_partitioned]

# --8<-- [start:write_parquet_partitioned_show_paths]
Expand Down
8 changes: 4 additions & 4 deletions docs/user-guide/io/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ path included in the output:

### Handling mixed files

Passing a directory to `scan_parquet` may not work if there are extra non-data files next to the
data files.
Passing a directory to `scan_parquet` may not work if there are files with different extensions in
the directory.

For this example the following directory structure is used:

Expand Down Expand Up @@ -80,15 +80,15 @@ Polars supports writing hive partitioned parquet datasets, with planned support

For this example the following DataFrame is used:

{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',['write_parquet_partitioned'])}}
{{code_block('user-guide/io/hive','write_parquet_partitioned_show_data',[])}}

```python exec="on" result="text" session="user-guide/io/hive"
--8<-- "python/user-guide/io/hive.py:write_parquet_partitioned_show_data"
```

We will write it to a hive-partitioned parquet dataset, partitioned by the columns `a` and `b`:

{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet_partitioned'])}}
{{code_block('user-guide/io/hive','write_parquet_partitioned',['write_parquet'])}}

```python exec="on" result="text" session="user-guide/io/hive"
--8<-- "python/user-guide/io/hive.py:write_parquet_partitioned"
Expand Down
1 change: 0 additions & 1 deletion py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ Parquet
read_parquet_schema
scan_parquet
DataFrame.write_parquet
DataFrame.write_parquet_partitioned
LazyFrame.sink_parquet

PyArrow Datasets
Expand Down
Loading

0 comments on commit d43df08

Please sign in to comment.