Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jul 3, 2024
1 parent 0e1e0bd commit 1517147
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 63 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
1 change: 0 additions & 1 deletion crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub mod ndjson;
mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
#[cfg(feature = "partition")]
pub mod partition;
#[cfg(feature = "async")]
pub mod pl_async;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct ParquetWriteOptions {
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
pub data_pagesize_limit: Option<usize>,
pub data_page_size: Option<usize>,
/// maintain the order the data was processed
pub maintain_order: bool,
}
Expand Down
16 changes: 15 additions & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,23 @@ use polars_parquet::write::{

use super::batched_writer::BatchedWriter;
use super::options::ParquetCompression;
use super::ParquetWriteOptions;
use crate::prelude::chunk_df_for_writing;
use crate::shared::schema_to_arrow_checked;

impl ParquetWriteOptions {
pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
where
F: Write,
{
ParquetWriter::new(f)
.with_compression(self.compression)
.with_statistics(self.statistics)
.with_row_group_size(self.row_group_size)
.with_data_page_size(self.data_page_size)
}
}

/// Write a DataFrame to Parquet format.
#[must_use]
pub struct ParquetWriter<W> {
Expand Down Expand Up @@ -103,7 +117,7 @@ where
statistics: self.statistics,
compression: self.compression,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
data_page_size: self.data_page_size,
}
}

Expand Down
88 changes: 88 additions & 0 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;

use crate::parquet::write::ParquetWriteOptions;
use crate::utils::resolve_homedir;
use crate::WriterFactory;

Expand Down Expand Up @@ -127,3 +128,90 @@ where
}
path
}

pub fn write_partitioned_dataset<S>(
df: &DataFrame,
path: &Path,
partition_by: &[S],
write_options: &ParquetWriteOptions,
) -> PolarsResult<()>
where
S: AsRef<str>,
{
let base_path = path;

for (path_part, mut part_df) in get_hive_partitions_iter(df, partition_by.as_ref())? {
let dir = base_path.join(path_part);
std::fs::create_dir_all(&dir)?;
let f = std::fs::File::create(dir.join("data.parquet"))?;
write_options.to_writer(f).finish(&mut part_df)?;
}

Ok(())
}

/// Creates an iterator of (hive partition path, DataFrame) pairs, e.g.:
/// ("a=1/b=1", DataFrame)
fn get_hive_partitions_iter<'a, S>(
df: &'a DataFrame,
partition_by: &'a [S],
) -> PolarsResult<Box<dyn Iterator<Item = (String, DataFrame)> + 'a>>
where
S: AsRef<str>,
{
for x in partition_by.iter() {
if df.get_column_index(x.as_ref()).is_none() {
polars_bail!(ColumnNotFound: "{}", x.as_ref());
}
}

let get_hive_path_part = |df: &DataFrame| {
const CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
.add(b'/')
.add(b'=')
.add(b':')
.add(b' ');

partition_by
.iter()
.map(|x| {
let s = df.column(x.as_ref()).unwrap().slice(0, 1);

format!(
"{}={}",
s.name(),
percent_encoding::percent_encode(
s.cast(&DataType::String)
.unwrap()
.str()
.unwrap()
.get(0)
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
.as_bytes(),
CHAR_SET
)
)
})
.collect::<Vec<_>>()
.join("/")
};

let groups = df.group_by(partition_by)?;
let groups = groups.take_groups();

let out: Box<dyn Iterator<Item = (String, DataFrame)>> = match groups {
GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| {
let part_df =
unsafe { df._take_unchecked_slice_sorted(&group, false, IsSorted::Ascending) };
(get_hive_path_part(&part_df), part_df)
})),
GroupsProxy::Slice { groups, .. } => {
Box::new(groups.into_iter().map(move |[offset, len]| {
let part_df = df.slice(offset as i64, len as usize);
(get_hive_path_part(&part_df), part_df)
}))
},
};

Ok(out)
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct WriteOptions {
/// The compression to apply to every page
pub compression: CompressionOptions,
/// The size to flush a page, defaults to 1024 * 1024 if None
pub data_pagesize_limit: Option<usize>,
pub data_page_size: Option<usize>,
}

use arrow::compute::aggregate::estimated_bytes_size;
Expand Down Expand Up @@ -298,7 +298,7 @@ pub fn array_to_pages(
let byte_size = estimated_bytes_size(primitive_array);

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
let bytes_per_row = if number_of_rows == 0 {
0
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ParquetSink {
let file = std::fs::File::create(path)?;
let writer = ParquetWriter::new(file)
.with_compression(options.compression)
.with_data_page_size(options.data_pagesize_limit)
.with_data_page_size(options.data_page_size)
.with_statistics(options.statistics)
.with_row_group_size(options.row_group_size)
// This is important! Otherwise we will deadlock
Expand Down Expand Up @@ -154,7 +154,7 @@ impl ParquetCloudSink {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_pagesize_limit)
.with_data_page_size(parquet_options.data_page_size)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ fn integration_write(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let encodings = schema
Expand Down
4 changes: 2 additions & 2 deletions crates/polars/tests/it/io/parquet/arrow/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn pages(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let pages1 = [array11, array12, array13]
Expand Down Expand Up @@ -82,7 +82,7 @@ fn read_with_indexes(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let to_compressed = |pages: Vec<Page>| {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn round_trip_opt_stats(
statistics: StatisticsOptions::full(),
compression,
version,
data_pagesize_limit: None,
data_page_size: None,
};

let iter = vec![RecordBatchT::try_new(vec![array.clone()])];
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn round_trip(
statistics: StatisticsOptions::full(),
compression,
version,
data_pagesize_limit: None,
data_page_size: None,
};

let iter = vec![RecordBatchT::try_new(vec![array.clone()])];
Expand Down
84 changes: 84 additions & 0 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3618,6 +3618,90 @@ def write_parquet(
data_page_size,
)

def write_parquet_partitioned(
self,
path: str | Path,
partition_by: str | Collection[str],
*,
compression: ParquetCompression = "zstd",
compression_level: int | None = None,
statistics: bool | str | dict[str, bool] = True,
row_group_size: int | None = None,
data_page_size: int | None = None,
) -> None:
"""
Write a partitioned directory of parquet files.
Parameters
----------
path
Path to the base directory for the partitioned dataset.
partition_by
Columns to partition by.
compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'}
Choose "zstd" for good compression performance.
Choose "lz4" for fast compression/decompression.
Choose "snappy" for more backwards compatibility guarantees
when you deal with older parquet readers.
compression_level
The level of compression to use. Higher compression means smaller files on
disk.
- "gzip" : min-level: 0, max-level: 10.
- "brotli" : min-level: 0, max-level: 11.
- "zstd" : min-level: 1, max-level: 22.
statistics
Write statistics to the parquet headers. This is the default behavior.
Possible values:
- `True`: enable default set of statistics (default)
- `False`: disable all statistics
- "full": calculate and write all available statistics. Cannot be
combined with `use_pyarrow`.
- `{ "statistic-key": True / False, ... }`. Cannot be combined with
`use_pyarrow`. Available keys:
- "min": column minimum value (default: `True`)
- "max": column maximum value (default: `True`)
- "distinct_count": number of unique column values (default: `False`)
- "null_count": number of null values in column (default: `True`)
row_group_size
Size of the row groups in number of rows. Defaults to 512^2 rows.
data_page_size
Size of the data page in bytes. Defaults to 1024^2 bytes.
"""
path = normalize_filepath(path, check_not_directory=False)
partition_by = [partition_by] if isinstance(partition_by, str) else partition_by

if isinstance(statistics, bool) and statistics:
statistics = {
"min": True,
"max": True,
"distinct_count": False,
"null_count": True,
}
elif isinstance(statistics, bool) and not statistics:
statistics = {}
elif statistics == "full":
statistics = {
"min": True,
"max": True,
"distinct_count": True,
"null_count": True,
}

self._df.write_parquet_partitioned(
path,
partition_by,
compression,
compression_level,
statistics,
row_group_size,
data_page_size,
)

def write_database(
self,
table_name: str,
Expand Down
6 changes: 3 additions & 3 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,7 @@ def sink_parquet(
compression_level: int | None = None,
statistics: bool | str | dict[str, bool] = True,
row_group_size: int | None = None,
data_pagesize_limit: int | None = None,
data_page_size: int | None = None,
maintain_order: bool = True,
type_coercion: bool = True,
predicate_pushdown: bool = True,
Expand Down Expand Up @@ -2209,7 +2209,7 @@ def sink_parquet(
If None (default), the chunks of the `DataFrame` are
used. Writing in smaller chunks may reduce memory pressure and improve
writing speeds.
data_pagesize_limit
data_page_size
Size limit of individual data pages.
If not set defaults to 1024 * 1024 bytes
maintain_order
Expand Down Expand Up @@ -2269,7 +2269,7 @@ def sink_parquet(
compression_level=compression_level,
statistics=statistics,
row_group_size=row_group_size,
data_pagesize_limit=data_pagesize_limit,
data_page_size=data_page_size,
maintain_order=maintain_order,
)

Expand Down
36 changes: 36 additions & 0 deletions py-polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,42 @@ impl PyDataFrame {
Ok(())
}

#[cfg(feature = "parquet")]
#[pyo3(signature = (py_f, partition_by, compression, compression_level, statistics, row_group_size, data_page_size))]
pub fn write_parquet_partitioned(
&mut self,
py: Python,
py_f: PyObject,
partition_by: Vec<String>,
compression: &str,
compression_level: Option<i32>,
statistics: Wrap<StatisticsOptions>,
row_group_size: Option<usize>,
data_page_size: Option<usize>,
) -> PyResult<()> {
use std::path::Path;

use polars_io::partition::write_partitioned_dataset;

let Ok(path) = py_f.extract::<PyBackedStr>(py) else {
return Err(PyPolarsErr::from(polars_err!(ComputeError: "expected path-like")).into());
};
let path = Path::new(&*path);
let compression = parse_parquet_compression(compression, compression_level)?;

let write_options = ParquetWriteOptions {
compression,
statistics: statistics.0,
row_group_size,
data_page_size,
maintain_order: true,
};

write_partitioned_dataset(&self.df, path, partition_by.as_slice(), &write_options)
.map_err(PyPolarsErr::from)?;
Ok(())
}

#[cfg(feature = "json")]
pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> {
let file = BufWriter::new(get_file_like(py_f, true)?);
Expand Down
Loading

0 comments on commit 1517147

Please sign in to comment.