From f48489e88bb3f3f07a6412737b1a4a5722ebe1a7 Mon Sep 17 00:00:00 2001 From: George Zubrienko Date: Mon, 25 Apr 2022 18:34:56 +0200 Subject: [PATCH] Expose read and write options in public API (#581) * Expose read and write options in public API * Add test for reading * Remove small date as there seems to be another issue with it * Test writer options * Make linter happy * Add proper test with writer options and make mypy happy * Remove unused imports from test_writer * PR comments * Lint * Format using direct command * Apply suggestions from reviewers --- python/deltalake/table.py | 7 +- python/deltalake/writer.py | 29 +++++++ python/docs/source/conf.py | 1 + python/stubs/pyarrow/dataset.pyi | 2 + python/tests/test_table_read.py | 15 +++- python/tests/test_writer.py | 81 +++++++++++++++++- .../_delta_log/00000000000000000000.crc | 1 + .../_delta_log/00000000000000000000.json | 5 ++ ...491d-b3c9-3eea548de6cb-c000.snappy.parquet | Bin 0 -> 1201 bytes ...4e13-a624-ddd50ce7f5c4-c000.snappy.parquet | Bin 0 -> 1201 bytes 10 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.crc create mode 100644 rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.json create mode 100644 rust/tests/data/table_with_edge_timestamps/part-00000-a9dd181d-61aa-491d-b3c9-3eea548de6cb-c000.snappy.parquet create mode 100644 rust/tests/data/table_with_edge_timestamps/part-00001-f804d355-db40-4e13-a624-ddd50ce7f5c4-c000.snappy.parquet diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 59096c38db..72a34e76bc 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -5,7 +5,7 @@ import pyarrow import pyarrow.fs as pa_fs -from pyarrow.dataset import FileSystemDataset, ParquetFileFormat +from pyarrow.dataset import FileSystemDataset, ParquetFileFormat, ParquetReadOptions if TYPE_CHECKING: import pandas @@ -268,12 +268,15 @@ def to_pyarrow_dataset( self, partitions: Optional[List[Tuple[str, str, Any]]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, + parquet_read_options: Optional[ParquetReadOptions] = None, ) -> pyarrow.dataset.Dataset: """ Build a PyArrow Dataset using data from the DeltaTable. :param partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax :param filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem + :param parquet_read_options: Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31 + More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html :return: the PyArrow dataset in PyArrow """ if not filesystem: @@ -281,7 +284,7 @@ def to_pyarrow_dataset( DeltaStorageHandler(self._table.table_uri()) ) - format = ParquetFileFormat() + format = ParquetFileFormat(read_options=parquet_read_options) fragments = [ format.make_fragment( diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 6911d2d15f..a6f372ff20 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -44,6 +44,11 @@ def write_deltalake( partition_by: Optional[List[str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = "error", + file_options: Optional[ds.ParquetFileWriteOptions] = None, + max_open_files: int = 1024, + max_rows_per_file: int = 0, + min_rows_per_group: int = 0, + max_rows_per_group: int = 1048576, name: Optional[str] = None, description: Optional[str] = None, configuration: Optional[Mapping[str, Optional[str]]] = None, @@ -69,6 +74,25 @@ def write_deltalake( already exists. If 'append', will add new data. If 'overwrite', will replace table with new data. If 'ignore', will not write anything if table already exists. + :param file_options: Optional write options for Parquet (ParquetFileWriteOptions). + Can be provided with defaults using ParquetFileWriteOptions().make_write_options(). + Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533 + for the list of available options + :param max_open_files: Limits the maximum number of + files that can be left open while writing. If an attempt is made to open + too many files then the least recently used file will be closed. + If this setting is set too low you may end up fragmenting your + data into many small files. + :param max_rows_per_file: Maximum number of rows per file. + If greater than 0 then this will limit how many rows are placed in any single file. + Otherwise there will be no limit and one file will be created in each output directory + unless files need to be closed to respect max_open_files + :param min_rows_per_group: Minimum number of rows per group. When the value is set, + the dataset writer will batch incoming data and only write the row groups to the disk + when sufficient rows have accumulated. + :param max_rows_per_group: Maximum number of rows per group. + If the value is set, then the dataset writer may split up large incoming batches into multiple row groups. + If this value is set, then min_rows_per_group should also be set. :param name: User-provided identifier for this table. :param description: User-provided description for this table. :param configuration: A map containing configuration options for the metadata action. @@ -152,6 +176,11 @@ def visitor(written_file: Any) -> None: schema=schema if not isinstance(data, RecordBatchReader) else None, file_visitor=visitor, existing_data_behavior="overwrite_or_ignore", + file_options=file_options, + max_open_files=max_open_files, + max_rows_per_file=max_rows_per_file, + min_rows_per_group=min_rows_per_group, + max_rows_per_group=max_rows_per_group, ) if table is None: diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index bb6c11237b..05728fd44c 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -64,6 +64,7 @@ def get_release_version() -> str: ("py:class", "pyarrow._fs.FileSystemHandler"), ("py:class", "RawDeltaTable"), ("py:class", "pandas.DataFrame"), + ("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"), ] # Add any paths that contain templates here, relative to this directory. diff --git a/python/stubs/pyarrow/dataset.pyi b/python/stubs/pyarrow/dataset.pyi index d06f843246..f9f60781cb 100644 --- a/python/stubs/pyarrow/dataset.pyi +++ b/python/stubs/pyarrow/dataset.pyi @@ -5,4 +5,6 @@ dataset: Any partitioning: Any FileSystemDataset: Any ParquetFileFormat: Any +ParquetReadOptions: Any +ParquetFileWriteOptions: Any write_dataset: Any diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index a5ab208a48..dc71b2bc60 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -1,16 +1,29 @@ import os -from datetime import date +from datetime import date, datetime from threading import Barrier, Thread import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pytest +from pyarrow.dataset import ParquetReadOptions from pyarrow.fs import LocalFileSystem from deltalake import DeltaTable +def test_read_table_with_edge_timestamps(): + table_path = "../rust/tests/data/table_with_edge_timestamps" + dt = DeltaTable(table_path) + assert dt.to_pyarrow_dataset( + parquet_read_options=ParquetReadOptions(coerce_int96_timestamp_unit="ms") + ).to_table().to_pydict() == { + "BIG_DATE": [datetime(9999, 12, 31, 0, 0, 0), datetime(9999, 12, 30, 0, 0, 0)], + "NORMAL_DATE": [datetime(2022, 1, 1, 0, 0, 0), datetime(2022, 2, 1, 0, 0, 0)], + "SOME_VALUE": [1, 2], + } + + def test_read_simple_table_to_dict(): table_path = "../rust/tests/data/simple_table" dt = DeltaTable(table_path) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 9ce467cdb7..0f39e506b4 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,15 +1,19 @@ import json import os import pathlib +import random import sys from datetime import date, datetime, timedelta from decimal import Decimal +from typing import Dict, Iterable from unittest.mock import Mock import pyarrow as pa import pyarrow.compute as pc import pytest from pandas.testing import assert_frame_equal +from pyarrow._dataset_parquet import ParquetReadOptions +from pyarrow.dataset import ParquetFileFormat from pyarrow.lib import RecordBatchReader from deltalake import DeltaTable, write_deltalake @@ -228,8 +232,15 @@ def test_writer_partitioning(tmp_path: pathlib.Path): assert DeltaTable(str(tmp_path)).to_pyarrow_table() == data +def get_log_path(table: DeltaTable) -> str: + """ + Returns _delta_log path for this delta table. + """ + return table._table.table_uri() + "/_delta_log/" + ("0" * 20 + ".json") + + def get_stats(table: DeltaTable): - log_path = table._table.table_uri() + "/_delta_log/" + ("0" * 20 + ".json") + log_path = get_log_path(table) # Should only have single add entry for line in open(log_path, "r").readlines(): @@ -310,3 +321,71 @@ def test_writer_fails_on_protocol(existing_table: DeltaTable, sample_data: pa.Ta existing_table.protocol = Mock(return_value=ProtocolVersions(1, 2)) with pytest.raises(DeltaTableProtocolError): write_deltalake(existing_table, sample_data, mode="overwrite") + + +@pytest.mark.parametrize( + "row_count,rows_per_file,expected_files", + [ + (1000, 100, 10), # even distribution + (100, 1, 100), # single row per file + (1000, 3, 334), # uneven distribution, num files = rows/rows_per_file + 1 + ], +) +def test_writer_with_max_rows( + tmp_path: pathlib.Path, row_count: int, rows_per_file: int, expected_files: int +): + def get_multifile_stats(table: DeltaTable) -> Iterable[Dict]: + log_path = get_log_path(table) + + # Should only have single add entry + for line in open(log_path, "r").readlines(): + log_entry = json.loads(line) + + if "add" in log_entry: + yield json.loads(log_entry["add"]["stats"]) + + data = pa.table( + { + "colA": pa.array(range(0, row_count), pa.int32()), + "colB": pa.array( + [i * random.random() for i in range(0, row_count)], pa.float64() + ), + } + ) + path = str(tmp_path) + write_deltalake( + path, + data, + file_options=ParquetFileFormat().make_write_options(), + max_rows_per_file=rows_per_file, + max_rows_per_group=rows_per_file, + ) + + table = DeltaTable(path) + stats = get_multifile_stats(table) + files_written = [f for f in os.listdir(path) if f != "_delta_log"] + + assert sum([stat_entry["numRecords"] for stat_entry in stats]) == row_count + assert len(files_written) == expected_files + + +def test_writer_with_options(tmp_path: pathlib.Path): + column_values = [datetime(year_, 1, 1, 0, 0, 0) for year_ in range(9000, 9010)] + data = pa.table({"colA": pa.array(column_values, pa.timestamp("us"))}) + path = str(tmp_path) + opts = ( + ParquetFileFormat() + .make_write_options() + .update(compression="GZIP", coerce_timestamps="us") + ) + write_deltalake(path, data, file_options=opts) + + table = ( + DeltaTable(path) + .to_pyarrow_dataset( + parquet_read_options=ParquetReadOptions(coerce_int96_timestamp_unit="us") + ) + .to_table() + ) + + assert table == data diff --git a/rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.crc b/rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.crc new file mode 100644 index 0000000000..c4beb77fdb --- /dev/null +++ b/rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"tableSizeBytes":2402,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":0} diff --git a/rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.json b/rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..22fb790cb3 --- /dev/null +++ b/rust/tests/data/table_with_edge_timestamps/_delta_log/00000000000000000000.json @@ -0,0 +1,5 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"79114423-cab8-4cf9-ad3c-c769a7c3d23b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"BIG_DATE\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"NORMAL_DATE\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"SOME_VALUE\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1649083091859}} +{"add":{"path":"part-00000-a9dd181d-61aa-491d-b3c9-3eea548de6cb-c000.snappy.parquet","partitionValues":{},"size":1201,"modificationTime":1649083092000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"BIG_DATE\":\"9999-12-31T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-01-01T00:00:00.000Z\",\"SOME_VALUE\":1},\"maxValues\":{\"BIG_DATE\":\"9999-12-31T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-01-01T00:00:00.000Z\",\"SOME_VALUE\":1},\"nullCount\":{\"BIG_DATE\":0,\"NORMAL_DATE\":0,\"SOME_VALUE\":0}}","tags":{"INSERTION_TIME":"1649083092000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00001-f804d355-db40-4e13-a624-ddd50ce7f5c4-c000.snappy.parquet","partitionValues":{},"size":1201,"modificationTime":1649083092000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"BIG_DATE\":\"9999-12-30T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-02-01T00:00:00.000Z\",\"SOME_VALUE\":2},\"maxValues\":{\"BIG_DATE\":\"9999-12-30T00:00:00.000Z\",\"NORMAL_DATE\":\"2022-02-01T00:00:00.000Z\",\"SOME_VALUE\":2},\"nullCount\":{\"BIG_DATE\":0,\"NORMAL_DATE\":0,\"SOME_VALUE\":0}}","tags":{"INSERTION_TIME":"1649083092000001","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"commitInfo":{"timestamp":1649083093054,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputBytes":"2402","numOutputRows":"2"}}} diff --git a/rust/tests/data/table_with_edge_timestamps/part-00000-a9dd181d-61aa-491d-b3c9-3eea548de6cb-c000.snappy.parquet b/rust/tests/data/table_with_edge_timestamps/part-00000-a9dd181d-61aa-491d-b3c9-3eea548de6cb-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..7cfe6d146a5aa958213080dd2bab1bc6f0d58cdb GIT binary patch literal 1201 zcmbtU%WB&|6djLMBN9`(=!g*rYD8laP(P2FrtF=Kh`sF7?(l3P;a z5=tQ3_7l1;v|rF=7X5`R^8sCyE=%ahQYsmRW>FzV+_`7Yx%b>byY)Q{aR?&`U0i

ieQ>VP$d(fQezt3|Ih;J3yl7z)o;Miz~ZTC$fAkY||Udd=zauo&}=9jOGyZ?DReWWKUtJ7p*TM zc?u(%aVw*m@$5~SCmeH`^~eJ;>cC9$t?({;!qt4+Z!}+&SoO?wR=se^2@r@)EDi0x zN=Agaug(UcnzT>!rG(iOaSZU9+Kw?C;5&I$Q*!{X#1PJrsj{ISjj5~J!#Ao$ zUD72k!9Nlc**zXnxhgxZJ=R@Wk$VO;yN+CaF_8zv@>18cXUPEU8Wwe2VvX3=U~HPC zV=^FPpo)A=E>|bhsq&X3a%29BC%k7NjWKX(pW3$-^KuICgP1wKzZ=A*E!LH1@qXad zs%2*ab2FN%t*P3o=UT@$^h2kTS?-JtvzyU-CB2->_sT^*-zinPg$mV*>v>uz(_Fr! a70UYhT7{N#D`HIoz<^2`v62%g%`kWNSPI;(_|x zA74-BBI+MrEL^D0F;jIOh1H|kS#`h~!|2t!=kMouYrti4nYyI~ATCG|4!^@jnI~%n|umc%j{@&I`eSf#sxL*en zloh#}78R6`3t~c$r^gYLp!7W^{fwo>>aW0HjagR4-g# zNb&@RG^19AHKW<{G>--5GV2i}V>M?r!>ZeXSF zbTu*{%zbS#2+g8hVjQh+Hp@lKridee$JBAneh**DX}XpLcqxK#4=s%i^>9c%&FMeU zZ0eCV@d*AB8>!yWfT|VM^_-#MsTs9nQmgH%l?NlWM{J%tn>|YgV9&Iv>k)gvwt7R$ zA}xyn83WCzXXI*SG#+REl7w!|fALK3*_6iYd9+KNi;CGB1AIMVj_>ap@o1BE#Vy_q zyjr%MRA6pO*Yp)lU*@j09Md>(%c;fI(6rhqqf<0W*<7boFmkP8xt%Xly|9|2`4Y|M cih90etge)4DZ3