Skip to content

Commit

Permalink
Parquet per-column writer options (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron authored Jul 26, 2024
1 parent 28a5be3 commit 68554b9
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
51 changes: 33 additions & 18 deletions arro3-io/python/arro3/io/_rust.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import IO, Literal
from typing import IO, Literal, Sequence

from arro3.core import RecordBatchReader, Schema
from arro3.core.types import (
Expand Down Expand Up @@ -88,6 +88,28 @@ def write_ipc_stream(

#### Parquet

ParquetColumnPath = str | Sequence[str]
"""Allowed types to refer to a Parquet Column."""

ParquetCompression = (
Literal["uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "zstd", "lz4_raw"]
| str
)
"""Allowed compression schemes for Parquet."""

ParquetEncoding = Literal[
"plain",
"plain_dictionary",
"rle",
"bit_packed",
"delta_binary_packed",
"delta_length_byte_array",
"delta_byte_array",
"rle_dictionary",
"byte_stream_split",
]
"""Allowed Parquet encodings."""

def read_parquet(file: Path | str) -> RecordBatchReader:
"""Read a Parquet file to an Arrow RecordBatchReader
Expand All @@ -105,28 +127,17 @@ def write_parquet(
bloom_filter_enabled: bool | None = None,
bloom_filter_fpp: float | None = None,
bloom_filter_ndv: int | None = None,
compression: Literal[
"uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "zstd", "lz4_raw"
]
| str
| None = None,
column_compression: dict[ParquetColumnPath, ParquetCompression] | None = None,
column_dictionary_enabled: dict[ParquetColumnPath, bool] | None = None,
column_encoding: dict[ParquetColumnPath, ParquetEncoding] | None = None,
column_max_statistics_size: dict[ParquetColumnPath, int] | None = None,
compression: ParquetCompression | None = None,
created_by: str | None = None,
data_page_row_count_limit: int | None = None,
data_page_size_limit: int | None = None,
dictionary_enabled: bool | None = None,
dictionary_page_size_limit: int | None = None,
encoding: Literal[
"plain",
"plain_dictionary",
"rle",
"bit_packed",
"delta_binary_packed",
"delta_length_byte_array",
"delta_byte_array",
"rle_dictionary",
"byte_stream_split",
]
| None = None,
encoding: ParquetEncoding | None = None,
key_value_metadata: dict[str, str] | None = None,
max_row_group_size: int | None = None,
max_statistics_size: int | None = None,
Expand All @@ -143,6 +154,10 @@ def write_parquet(
bloom_filter_enabled: Sets if bloom filter is enabled by default for all columns (defaults to `false`).
bloom_filter_fpp: Sets the default target bloom filter false positive probability (fpp) for all columns (defaults to `0.05`).
bloom_filter_ndv: Sets default number of distinct values (ndv) for bloom filter for all columns (defaults to `1_000_000`).
column_compression: Sets compression codec for a specific column. Takes precedence over `compression`.
column_dictionary_enabled: Sets flag to enable/disable dictionary encoding for a specific column. Takes precedence over `dictionary_enabled`.
column_encoding: Sets encoding for a specific column. Takes precedence over `encoding`.
column_max_statistics_size: Sets max size for statistics for a specific column. Takes precedence over `max_statistics_size`.
compression:
Sets default compression codec for all columns (default to `uncompressed`).
Note that you can pass in a custom compression level with a string like
Expand Down
30 changes: 29 additions & 1 deletion arro3-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<'py> FromPyObject<'py> for PyEncoding {
}
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
#[allow(dead_code)]
pub(crate) struct PyColumnPath(ColumnPath);

Expand All @@ -91,6 +91,10 @@ impl<'py> FromPyObject<'py> for PyColumnPath {
bloom_filter_enabled = None,
bloom_filter_fpp = None,
bloom_filter_ndv = None,
column_compression = None,
column_dictionary_enabled = None,
column_encoding = None,
column_max_statistics_size = None,
compression = None,
created_by = None,
data_page_row_count_limit = None,
Expand All @@ -111,6 +115,10 @@ pub(crate) fn write_parquet(
bloom_filter_enabled: Option<bool>,
bloom_filter_fpp: Option<f64>,
bloom_filter_ndv: Option<u64>,
column_compression: Option<HashMap<PyColumnPath, PyCompression>>,
column_dictionary_enabled: Option<HashMap<PyColumnPath, bool>>,
column_encoding: Option<HashMap<PyColumnPath, PyEncoding>>,
column_max_statistics_size: Option<HashMap<PyColumnPath, usize>>,
compression: Option<PyCompression>,
created_by: Option<String>,
data_page_row_count_limit: Option<usize>,
Expand Down Expand Up @@ -176,6 +184,26 @@ pub(crate) fn write_parquet(
if let Some(encoding) = encoding {
props = props.set_encoding(encoding.0);
}
if let Some(column_encoding) = column_encoding {
for (column_path, encoding) in column_encoding.into_iter() {
props = props.set_column_encoding(column_path.0, encoding.0);
}
}
if let Some(column_compression) = column_compression {
for (column_path, compression) in column_compression.into_iter() {
props = props.set_column_compression(column_path.0, compression.0);
}
}
if let Some(column_dictionary_enabled) = column_dictionary_enabled {
for (column_path, dictionary_enabled) in column_dictionary_enabled.into_iter() {
props = props.set_column_dictionary_enabled(column_path.0, dictionary_enabled);
}
}
if let Some(column_max_statistics_size) = column_max_statistics_size {
for (column_path, max_statistics_size) in column_max_statistics_size.into_iter() {
props = props.set_column_max_statistics_size(column_path.0, max_statistics_size);
}
}

let reader = data.into_reader()?;

Expand Down

0 comments on commit 68554b9

Please sign in to comment.