Skip to content

Commit

Permalink
Expose read and write options in public API (#581)
Browse files Browse the repository at this point in the history
* Expose read and write options in public API

* Add test for reading

* Remove small date as there seems to be another issue with it

* Test writer options

* Make linter happy

* Add proper test with writer options and make mypy happy

* Remove unused imports from test_writer

* PR comments

* Lint

* Format using direct command

* Apply suggestions from reviewers
  • Loading branch information
george-zubrienko authored Apr 25, 2022
1 parent 4aad6aa commit f48489e
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 4 deletions.
7 changes: 5 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pyarrow
import pyarrow.fs as pa_fs
from pyarrow.dataset import FileSystemDataset, ParquetFileFormat
from pyarrow.dataset import FileSystemDataset, ParquetFileFormat, ParquetReadOptions

if TYPE_CHECKING:
import pandas
Expand Down Expand Up @@ -268,20 +268,23 @@ def to_pyarrow_dataset(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
parquet_read_options: Optional[ParquetReadOptions] = None,
) -> pyarrow.dataset.Dataset:
"""
Build a PyArrow Dataset using data from the DeltaTable.
:param partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
:param filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
:param parquet_read_options: Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31
More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html
:return: the PyArrow dataset in PyArrow
"""
if not filesystem:
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(self._table.table_uri())
)

format = ParquetFileFormat()
format = ParquetFileFormat(read_options=parquet_read_options)

fragments = [
format.make_fragment(
Expand Down
29 changes: 29 additions & 0 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ def write_deltalake(
partition_by: Optional[List[str]] = None,
filesystem: Optional[pa_fs.FileSystem] = None,
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
file_options: Optional[ds.ParquetFileWriteOptions] = None,
max_open_files: int = 1024,
max_rows_per_file: int = 0,
min_rows_per_group: int = 0,
max_rows_per_group: int = 1048576,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
Expand All @@ -69,6 +74,25 @@ def write_deltalake(
already exists. If 'append', will add new data. If 'overwrite', will
replace table with new data. If 'ignore', will not write anything if
table already exists.
:param file_options: Optional write options for Parquet (ParquetFileWriteOptions).
Can be provided with defaults using ParquetFileWriteOptions().make_write_options().
Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533
for the list of available options
:param max_open_files: Limits the maximum number of
files that can be left open while writing. If an attempt is made to open
too many files then the least recently used file will be closed.
If this setting is set too low you may end up fragmenting your
data into many small files.
:param max_rows_per_file: Maximum number of rows per file.
If greater than 0 then this will limit how many rows are placed in any single file.
Otherwise there will be no limit and one file will be created in each output directory
unless files need to be closed to respect max_open_files
:param min_rows_per_group: Minimum number of rows per group. When the value is set,
the dataset writer will batch incoming data and only write the row groups to the disk
when sufficient rows have accumulated.
:param max_rows_per_group: Maximum number of rows per group.
If the value is set, then the dataset writer may split up large incoming batches into multiple row groups.
If this value is set, then min_rows_per_group should also be set.
:param name: User-provided identifier for this table.
:param description: User-provided description for this table.
:param configuration: A map containing configuration options for the metadata action.
Expand Down Expand Up @@ -152,6 +176,11 @@ def visitor(written_file: Any) -> None:
schema=schema if not isinstance(data, RecordBatchReader) else None,
file_visitor=visitor,
existing_data_behavior="overwrite_or_ignore",
file_options=file_options,
max_open_files=max_open_files,
max_rows_per_file=max_rows_per_file,
min_rows_per_group=min_rows_per_group,
max_rows_per_group=max_rows_per_group,
)

if table is None:
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def get_release_version() -> str:
("py:class", "pyarrow._fs.FileSystemHandler"),
("py:class", "RawDeltaTable"),
("py:class", "pandas.DataFrame"),
("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"),
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
2 changes: 2 additions & 0 deletions python/stubs/pyarrow/dataset.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ dataset: Any
partitioning: Any
FileSystemDataset: Any
ParquetFileFormat: Any
ParquetReadOptions: Any
ParquetFileWriteOptions: Any
write_dataset: Any
15 changes: 14 additions & 1 deletion python/tests/test_table_read.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
import os
from datetime import date
from datetime import date, datetime
from threading import Barrier, Thread

import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pytest
from pyarrow.dataset import ParquetReadOptions
from pyarrow.fs import LocalFileSystem

from deltalake import DeltaTable


def test_read_table_with_edge_timestamps():
table_path = "../rust/tests/data/table_with_edge_timestamps"
dt = DeltaTable(table_path)
assert dt.to_pyarrow_dataset(
parquet_read_options=ParquetReadOptions(coerce_int96_timestamp_unit="ms")
).to_table().to_pydict() == {
"BIG_DATE": [datetime(9999, 12, 31, 0, 0, 0), datetime(9999, 12, 30, 0, 0, 0)],
"NORMAL_DATE": [datetime(2022, 1, 1, 0, 0, 0), datetime(2022, 2, 1, 0, 0, 0)],
"SOME_VALUE": [1, 2],
}


def test_read_simple_table_to_dict():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
Expand Down
81 changes: 80 additions & 1 deletion python/tests/test_writer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import json
import os
import pathlib
import random
import sys
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Dict, Iterable
from unittest.mock import Mock

import pyarrow as pa
import pyarrow.compute as pc
import pytest
from pandas.testing import assert_frame_equal
from pyarrow._dataset_parquet import ParquetReadOptions
from pyarrow.dataset import ParquetFileFormat
from pyarrow.lib import RecordBatchReader

from deltalake import DeltaTable, write_deltalake
Expand Down Expand Up @@ -228,8 +232,15 @@ def test_writer_partitioning(tmp_path: pathlib.Path):
assert DeltaTable(str(tmp_path)).to_pyarrow_table() == data


def get_log_path(table: DeltaTable) -> str:
"""
Returns _delta_log path for this delta table.
"""
return table._table.table_uri() + "/_delta_log/" + ("0" * 20 + ".json")


def get_stats(table: DeltaTable):
log_path = table._table.table_uri() + "/_delta_log/" + ("0" * 20 + ".json")
log_path = get_log_path(table)

# Should only have single add entry
for line in open(log_path, "r").readlines():
Expand Down Expand Up @@ -310,3 +321,71 @@ def test_writer_fails_on_protocol(existing_table: DeltaTable, sample_data: pa.Ta
existing_table.protocol = Mock(return_value=ProtocolVersions(1, 2))
with pytest.raises(DeltaTableProtocolError):
write_deltalake(existing_table, sample_data, mode="overwrite")


@pytest.mark.parametrize(
"row_count,rows_per_file,expected_files",
[
(1000, 100, 10), # even distribution
(100, 1, 100), # single row per file
(1000, 3, 334), # uneven distribution, num files = rows/rows_per_file + 1
],
)
def test_writer_with_max_rows(
tmp_path: pathlib.Path, row_count: int, rows_per_file: int, expected_files: int
):
def get_multifile_stats(table: DeltaTable) -> Iterable[Dict]:
log_path = get_log_path(table)

# Should only have single add entry
for line in open(log_path, "r").readlines():
log_entry = json.loads(line)

if "add" in log_entry:
yield json.loads(log_entry["add"]["stats"])

data = pa.table(
{
"colA": pa.array(range(0, row_count), pa.int32()),
"colB": pa.array(
[i * random.random() for i in range(0, row_count)], pa.float64()
),
}
)
path = str(tmp_path)
write_deltalake(
path,
data,
file_options=ParquetFileFormat().make_write_options(),
max_rows_per_file=rows_per_file,
max_rows_per_group=rows_per_file,
)

table = DeltaTable(path)
stats = get_multifile_stats(table)
files_written = [f for f in os.listdir(path) if f != "_delta_log"]

assert sum([stat_entry["numRecords"] for stat_entry in stats]) == row_count
assert len(files_written) == expected_files


def test_writer_with_options(tmp_path: pathlib.Path):
column_values = [datetime(year_, 1, 1, 0, 0, 0) for year_ in range(9000, 9010)]
data = pa.table({"colA": pa.array(column_values, pa.timestamp("us"))})
path = str(tmp_path)
opts = (
ParquetFileFormat()
.make_write_options()
.update(compression="GZIP", coerce_timestamps="us")
)
write_deltalake(path, data, file_options=opts)

table = (
DeltaTable(path)
.to_pyarrow_dataset(
parquet_read_options=ParquetReadOptions(coerce_int96_timestamp_unit="us")
)
.to_table()
)

assert table == data
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":2402,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"79114423-cab8-4cf9-ad3c-c769a7c3d23b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"BIG_DATE\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"NORMAL_DATE\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"SOME_VALUE\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1649083091859}}
{"add":{"path":"part-00000-a9dd181d-61aa-491d-b3c9-3eea548de6cb-c000.snappy.parquet","partitionValues":{},"size":1201,"modificationTime":1649083092000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"BIG_DATE\":\"9999-12-31T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-01-01T00:00:00.000Z\",\"SOME_VALUE\":1},\"maxValues\":{\"BIG_DATE\":\"9999-12-31T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-01-01T00:00:00.000Z\",\"SOME_VALUE\":1},\"nullCount\":{\"BIG_DATE\":0,\"NORMAL_DATE\":0,\"SOME_VALUE\":0}}","tags":{"INSERTION_TIME":"1649083092000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00001-f804d355-db40-4e13-a624-ddd50ce7f5c4-c000.snappy.parquet","partitionValues":{},"size":1201,"modificationTime":1649083092000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"BIG_DATE\":\"9999-12-30T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-02-01T00:00:00.000Z\",\"SOME_VALUE\":2},\"maxValues\":{\"BIG_DATE\":\"9999-12-30T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-02-01T00:00:00.000Z\",\"SOME_VALUE\":2},\"nullCount\":{\"BIG_DATE\":0,\"NORMAL_DATE\":0,\"SOME_VALUE\":0}}","tags":{"INSERTION_TIME":"1649083092000001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1649083093054,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputBytes":"2402","numOutputRows":"2"}}}
Binary file not shown.
Binary file not shown.

0 comments on commit f48489e

Please sign in to comment.