Skip to content

Commit

Permalink
Support custom properties when writing to Parquet (#67)
Browse files Browse the repository at this point in the history
* parquet writer props

* Parquet writer properties
  • Loading branch information
kylebarron authored Jul 26, 2024
1 parent ab4418e commit 28a5be3
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 116 deletions.
134 changes: 119 additions & 15 deletions arro3-io/python/arro3/io/_rust.pyi
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
from pathlib import Path
from typing import IO, Protocol, Tuple
from typing import IO, Literal

from arro3.core import RecordBatchReader, Schema

class ArrowSchemaExportable(Protocol):
def __arrow_c_schema__(self) -> object: ...

class ArrowArrayExportable(Protocol):
def __arrow_c_array__(
self, requested_schema: object | None = None
) -> Tuple[object, object]: ...

class ArrowStreamExportable(Protocol):
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: ...
from arro3.core.types import (
ArrowSchemaExportable,
ArrowArrayExportable,
ArrowStreamExportable,
)

#### CSV

Expand Down Expand Up @@ -94,7 +88,117 @@ def write_ipc_stream(

#### Parquet

def read_parquet(file: Path | str) -> RecordBatchReader: ...
def read_parquet(file: Path | str) -> RecordBatchReader:
"""Read a Parquet file to an Arrow RecordBatchReader
Args:
file: _description_
Returns:
_description_
"""

def write_parquet(
data: ArrowStreamExportable | ArrowArrayExportable, file: IO[bytes] | Path | str
) -> None: ...
data: ArrowStreamExportable | ArrowArrayExportable,
file: IO[bytes] | Path | str,
*,
bloom_filter_enabled: bool | None = None,
bloom_filter_fpp: float | None = None,
bloom_filter_ndv: int | None = None,
compression: Literal[
"uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "zstd", "lz4_raw"
]
| str
| None = None,
created_by: str | None = None,
data_page_row_count_limit: int | None = None,
data_page_size_limit: int | None = None,
dictionary_enabled: bool | None = None,
dictionary_page_size_limit: int | None = None,
encoding: Literal[
"plain",
"plain_dictionary",
"rle",
"bit_packed",
"delta_binary_packed",
"delta_length_byte_array",
"delta_byte_array",
"rle_dictionary",
"byte_stream_split",
]
| None = None,
key_value_metadata: dict[str, str] | None = None,
max_row_group_size: int | None = None,
max_statistics_size: int | None = None,
write_batch_size: int | None = None,
writer_version: Literal["parquet_1_0", "parquet_2_0"] | None = None,
) -> None:
"""Write an Arrow Table or stream to a Parquet file.
Args:
data: The Arrow Table, RecordBatchReader, or RecordBatch to write to Parquet.
file: The output file.
Keyword Args:
bloom_filter_enabled: Sets if bloom filter is enabled by default for all columns (defaults to `false`).
bloom_filter_fpp: Sets the default target bloom filter false positive probability (fpp) for all columns (defaults to `0.05`).
bloom_filter_ndv: Sets default number of distinct values (ndv) for bloom filter for all columns (defaults to `1_000_000`).
compression:
Sets default compression codec for all columns (default to `uncompressed`).
Note that you can pass in a custom compression level with a string like
`"zstd(3)"` or `"gzip(9)"` or `"brotli(3)"`.
created_by: Sets "created by" property (defaults to `parquet-rs version <VERSION>`).
data_page_row_count_limit:
Sets best effort maximum number of rows in a data page (defaults to
`20_000`).
The parquet writer will attempt to limit the number of rows in each
`DataPage` to this value. Reducing this value will result in larger parquet
files, but may improve the effectiveness of page index based predicate
pushdown during reading.
Note: this is a best effort limit based on value of `set_write_batch_size`.
data_page_size_limit:
Sets best effort maximum size of a data page in bytes (defaults to `1024 *
1024`).
The parquet writer will attempt to limit the sizes of each `DataPage` to
this many bytes. Reducing this value will result in larger parquet files,
but may improve the effectiveness of page index based predicate pushdown
during reading.
Note: this is a best effort limit based on value of `set_write_batch_size`.
dictionary_enabled: Sets default flag to enable/disable dictionary encoding for all columns (defaults to `True`).
dictionary_page_size_limit:
Sets best effort maximum dictionary page size, in bytes (defaults to `1024 *
1024`).
The parquet writer will attempt to limit the size of each `DataPage` used to
store dictionaries to this many bytes. Reducing this value will result in
larger parquet files, but may improve the effectiveness of page index based
predicate pushdown during reading.
Note: this is a best effort limit based on value of `set_write_batch_size`.
encoding:
Sets default encoding for all columns.
If dictionary is not enabled, this is treated as a primary encoding for all
columns. In case when dictionary is enabled for any column, this value is
considered to be a fallback encoding for that column.
key_value_metadata: Sets "key_value_metadata" property (defaults to `None`).
max_row_group_size: Sets maximum number of rows in a row group (defaults to `1024 * 1024`).
max_statistics_size: Sets default max statistics size for all columns (defaults to `4096`).
write_batch_size:
Sets write batch size (defaults to 1024).
For performance reasons, data for each column is written in batches of this
size.
Additional limits such as such as `set_data_page_row_count_limit` are
checked between batches, and thus the write batch size value acts as an
upper-bound on the enforcement granularity of other limits.
writer_version: Sets the `WriterVersion` written into the parquet metadata (defaults to `"parquet_1_0"`). This value can determine what features some readers will support.
"""
163 changes: 158 additions & 5 deletions arro3-io/src/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use std::collections::HashMap;
use std::str::FromStr;

use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::arrow::ArrowWriter;
use pyo3::exceptions::PyTypeError;
use parquet::basic::{Compression, Encoding};
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::format::KeyValue;
use parquet::schema::types::ColumnPath;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3_arrow::error::PyArrowResult;
use pyo3_arrow::input::AnyRecordBatch;
use pyo3_arrow::PyRecordBatchReader;

use crate::utils::{FileReader, FileWriter};

/// Read a Parquet file to an Arrow RecordBatchReader
#[pyfunction]
pub fn read_parquet(py: Python, file: FileReader) -> PyArrowResult<PyObject> {
match file {
Expand All @@ -24,11 +31,157 @@ pub fn read_parquet(py: Python, file: FileReader) -> PyArrowResult<PyObject> {
}
}

/// Write an Arrow Table or stream to a Parquet file
pub(crate) struct PyWriterVersion(WriterVersion);

impl<'py> FromPyObject<'py> for PyWriterVersion {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let s: String = ob.extract()?;
Ok(Self(
WriterVersion::from_str(&s).map_err(|err| PyValueError::new_err(err.to_string()))?,
))
}
}

pub(crate) struct PyCompression(Compression);

impl<'py> FromPyObject<'py> for PyCompression {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let s: String = ob.extract()?;
Ok(Self(
Compression::from_str(&s).map_err(|err| PyValueError::new_err(err.to_string()))?,
))
}
}

#[derive(Debug)]
pub(crate) struct PyEncoding(Encoding);

impl<'py> FromPyObject<'py> for PyEncoding {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let s: String = ob.extract()?;
Ok(Self(
Encoding::from_str(&s).map_err(|err| PyValueError::new_err(err.to_string()))?,
))
}
}

#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct PyColumnPath(ColumnPath);

impl<'py> FromPyObject<'py> for PyColumnPath {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
if let Ok(path) = ob.extract::<String>() {
Ok(Self(path.into()))
} else if let Ok(path) = ob.extract::<Vec<String>>() {
Ok(Self(path.into()))
} else {
Err(PyTypeError::new_err(
"Expected string or list of string input for column path.",
))
}
}
}

#[pyfunction]
pub fn write_parquet(data: AnyRecordBatch, file: FileWriter) -> PyArrowResult<()> {
#[pyo3(signature=(
data,
file,
*,
bloom_filter_enabled = None,
bloom_filter_fpp = None,
bloom_filter_ndv = None,
compression = None,
created_by = None,
data_page_row_count_limit = None,
data_page_size_limit = None,
dictionary_enabled = None,
dictionary_page_size_limit = None,
encoding = None,
key_value_metadata = None,
max_row_group_size = None,
max_statistics_size = None,
write_batch_size = None,
writer_version = None,
))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn write_parquet(
data: AnyRecordBatch,
file: FileWriter,
bloom_filter_enabled: Option<bool>,
bloom_filter_fpp: Option<f64>,
bloom_filter_ndv: Option<u64>,
compression: Option<PyCompression>,
created_by: Option<String>,
data_page_row_count_limit: Option<usize>,
data_page_size_limit: Option<usize>,
dictionary_enabled: Option<bool>,
dictionary_page_size_limit: Option<usize>,
encoding: Option<PyEncoding>,
key_value_metadata: Option<HashMap<String, String>>,
max_row_group_size: Option<usize>,
max_statistics_size: Option<usize>,
write_batch_size: Option<usize>,
writer_version: Option<PyWriterVersion>,
) -> PyArrowResult<()> {
let mut props = WriterProperties::builder();

if let Some(writer_version) = writer_version {
props = props.set_writer_version(writer_version.0);
}
if let Some(data_page_size_limit) = data_page_size_limit {
props = props.set_data_page_size_limit(data_page_size_limit);
}
if let Some(data_page_row_count_limit) = data_page_row_count_limit {
props = props.set_data_page_row_count_limit(data_page_row_count_limit);
}
if let Some(dictionary_page_size_limit) = dictionary_page_size_limit {
props = props.set_dictionary_page_size_limit(dictionary_page_size_limit);
}
if let Some(write_batch_size) = write_batch_size {
props = props.set_write_batch_size(write_batch_size);
}
if let Some(max_row_group_size) = max_row_group_size {
props = props.set_max_row_group_size(max_row_group_size);
}
if let Some(created_by) = created_by {
props = props.set_created_by(created_by);
}
if let Some(key_value_metadata) = key_value_metadata {
props = props.set_key_value_metadata(Some(
key_value_metadata
.into_iter()
.map(|(k, v)| KeyValue::new(k, v))
.collect(),
));
}
if let Some(compression) = compression {
props = props.set_compression(compression.0);
}
if let Some(dictionary_enabled) = dictionary_enabled {
props = props.set_dictionary_enabled(dictionary_enabled);
}
if let Some(max_statistics_size) = max_statistics_size {
props = props.set_max_statistics_size(max_statistics_size);
}
if let Some(bloom_filter_enabled) = bloom_filter_enabled {
props = props.set_bloom_filter_enabled(bloom_filter_enabled);
}
if let Some(bloom_filter_fpp) = bloom_filter_fpp {
props = props.set_bloom_filter_fpp(bloom_filter_fpp);
}
if let Some(bloom_filter_ndv) = bloom_filter_ndv {
props = props.set_bloom_filter_ndv(bloom_filter_ndv);
}
if let Some(encoding) = encoding {
props = props.set_encoding(encoding.0);
}

let reader = data.into_reader()?;
let mut writer = ArrowWriter::try_new(file, reader.schema(), None).unwrap();

let writer_options = ArrowWriterOptions::new().with_properties(props.build());
let mut writer =
ArrowWriter::try_new_with_options(file, reader.schema(), writer_options).unwrap();
for batch in reader {
writer.write(&batch?).unwrap();
}
Expand Down
Loading

0 comments on commit 28a5be3

Please sign in to comment.