From d0d5e270770640eb57c0af6a8303a86cd41bcc44 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sat, 26 Nov 2022 06:40:48 -0600 Subject: [PATCH 01/15] Checks schema of candidate dataframe and if necessary rebuilds the schema to microsecond timestamps --- python/deltalake/writer.py | 22 +++++++++++++++++++++- python/tests/test_writer.py | 3 +-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 41de1961c6..edd083dc92 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -132,8 +132,28 @@ def write_deltalake( :param overwrite_schema: If True, allows updating the schema of the table. :param storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined. """ + if _has_pandas and isinstance(data, pd.DataFrame): - data = pa.Table.from_pandas(data) + if schema is not None: + data = pa.Table.from_panda(data, schema=schema) + else: + _data = pa.Table.from_pandas(data) + _schema = _data.schema + schema_out = [] + for _field in _schema: + # handles https://github.com/delta-io/delta-rs/issues/686 + if isinstance(_field.type, pa.lib.TimestampType): + f = pa.field( + name=_field.name, + type=pa.timestamp("us"), + nullable=_field.nullable, + metadata=_field.metadata, + ) + schema_out.append(f) + else: + schema_out.append(_field) + schema = pa.schema(schema_out, metadata=_schema.metadata) + data = pa.Table.from_pandas(data, schema=schema) if schema is None: if isinstance(data, RecordBatchReader): diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index cda1c3601e..4991f69b96 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -275,9 +275,8 @@ def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Ta def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table): # When timestamp is converted to Pandas, it gets casted to ns resolution, # but Delta Lake schemas only support us resolution. - sample_pandas = sample_data.to_pandas().drop(["timestamp"], axis=1) + sample_pandas = sample_data.to_pandas() write_deltalake(str(tmp_path), sample_pandas) - delta_table = DeltaTable(str(tmp_path)) df = delta_table.to_pandas() assert_frame_equal(df, sample_pandas) From abb17ced8df5fbc1f6ef6434abec2f30e2822a25 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sat, 26 Nov 2022 06:46:30 -0600 Subject: [PATCH 02/15] update comments --- python/deltalake/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index edd083dc92..b7b6f45ceb 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -141,7 +141,7 @@ def write_deltalake( _schema = _data.schema schema_out = [] for _field in _schema: - # handles https://github.com/delta-io/delta-rs/issues/686 + # partially handles https://github.com/delta-io/delta-rs/issues/686 if isinstance(_field.type, pa.lib.TimestampType): f = pa.field( name=_field.name, From e42b98cf134626b8df1d0c8d5af7d0abbc863f18 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 27 Nov 2022 07:00:43 -0600 Subject: [PATCH 03/15] Add test for schema provided on write_deltalake --- python/deltalake/writer.py | 2 +- python/tests/test_writer.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index b7b6f45ceb..9a73174e99 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -135,7 +135,7 @@ def write_deltalake( if _has_pandas and isinstance(data, pd.DataFrame): if schema is not None: - data = pa.Table.from_panda(data, schema=schema) + data = pa.Table.from_pandas(data, schema=schema) else: _data = pa.Table.from_pandas(data) _schema = _data.schema diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 4991f69b96..6f1e5a2077 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -272,11 +272,16 @@ def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Ta @pytest.mark.pandas -def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table): +@pytest.mark.parametrize("schema_provided", [True, False]) +def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table, schema_provided): # When timestamp is converted to Pandas, it gets casted to ns resolution, # but Delta Lake schemas only support us resolution. sample_pandas = sample_data.to_pandas() - write_deltalake(str(tmp_path), sample_pandas) + if schema_provided is True: + schema = sample_data.schema + else: + schema = None + write_deltalake(str(tmp_path), sample_pandas, schema=schema) delta_table = DeltaTable(str(tmp_path)) df = delta_table.to_pandas() assert_frame_equal(df, sample_pandas) From 89f8dead44df546c04b82bd5dcf07d443fb77271 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 27 Nov 2022 07:57:54 -0600 Subject: [PATCH 04/15] Linting --- python/deltalake/writer.py | 48 ++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 9a73174e99..030789d434 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -60,6 +60,36 @@ class AddAction: stats: str +def _delta_arrow_schema_from_pandas( + data: pd.DataFrame, +) -> Tuple[pd.DataFrame, pa.Schema]: + """ " + Infers the schema for the delta table from the Pandas DataFrame. + Necessary because of issues such as: https://github.com/delta-io/delta-rs/issues/686 + + :param data: Data to write. + :returns A Pyarrow Table and the inferred schema for the Delta Table + """ + + table = pa.Table.from_pandas(data) + _schema = table.schema + schema_out = [] + for _field in _schema: + if isinstance(_field.type, pa.lib.TimestampType): + f = pa.field( + name=_field.name, + type=pa.timestamp("us"), + nullable=_field.nullable, + metadata=_field.metadata, + ) + schema_out.append(f) + else: + schema_out.append(_field) + schema = pa.schema(schema_out, metadata=_schema.metadata) + data = pa.Table.from_pandas(data, schema=schema) + return data, schema + + def write_deltalake( table_or_uri: Union[str, DeltaTable], data: Union[ @@ -137,23 +167,7 @@ def write_deltalake( if schema is not None: data = pa.Table.from_pandas(data, schema=schema) else: - _data = pa.Table.from_pandas(data) - _schema = _data.schema - schema_out = [] - for _field in _schema: - # partially handles https://github.com/delta-io/delta-rs/issues/686 - if isinstance(_field.type, pa.lib.TimestampType): - f = pa.field( - name=_field.name, - type=pa.timestamp("us"), - nullable=_field.nullable, - metadata=_field.metadata, - ) - schema_out.append(f) - else: - schema_out.append(_field) - schema = pa.schema(schema_out, metadata=_schema.metadata) - data = pa.Table.from_pandas(data, schema=schema) + data, schema = _delta_arrow_schema_from_pandas(data) if schema is None: if isinstance(data, RecordBatchReader): From 0d726a4be6ad4ec1e870e8cccf17d6c8b5a48da5 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 27 Nov 2022 07:59:20 -0600 Subject: [PATCH 05/15] Move helper function to public --- python/deltalake/writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 030789d434..cd10534394 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -60,7 +60,7 @@ class AddAction: stats: str -def _delta_arrow_schema_from_pandas( +def delta_arrow_schema_from_pandas( data: pd.DataFrame, ) -> Tuple[pd.DataFrame, pa.Schema]: """ " @@ -167,7 +167,7 @@ def write_deltalake( if schema is not None: data = pa.Table.from_pandas(data, schema=schema) else: - data, schema = _delta_arrow_schema_from_pandas(data) + data, schema = delta_arrow_schema_from_pandas(data) if schema is None: if isinstance(data, RecordBatchReader): From 38b6b7bc5fb4a1bf9dbbfe6d74d84a8e8af1ea67 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Mon, 28 Nov 2022 05:02:30 -0600 Subject: [PATCH 06/15] Fix mypy --- python/stubs/pyarrow/__init__.pyi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/stubs/pyarrow/__init__.pyi b/python/stubs/pyarrow/__init__.pyi index dde6a48050..fb01f5796d 100644 --- a/python/stubs/pyarrow/__init__.pyi +++ b/python/stubs/pyarrow/__init__.pyi @@ -23,6 +23,8 @@ float16: Any float32: Any float64: Any dictionary: Any +timestamp: Any +TimestampType: Any py_buffer: Callable[[bytes], Any] NativeFile: Any From c6ee10ac4106d2b32e8cc2556e158ea183837636 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Tue, 29 Nov 2022 19:24:21 -0600 Subject: [PATCH 07/15] Linting --- python/deltalake/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index cd10534394..8a943253d0 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -75,7 +75,7 @@ def delta_arrow_schema_from_pandas( _schema = table.schema schema_out = [] for _field in _schema: - if isinstance(_field.type, pa.lib.TimestampType): + if isinstance(_field.type, pa.TimestampType): f = pa.field( name=_field.name, type=pa.timestamp("us"), From 6348ca1d95b57aadc984c272a07d55e8ba7f657f Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Tue, 29 Nov 2022 21:39:24 -0600 Subject: [PATCH 08/15] Moved delta_arrow_schema_from_pandas to schema.py --- python/deltalake/schema.py | 35 ++++++++++++++++++++++++++++++++++- python/deltalake/writer.py | 32 ++------------------------------ 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index f90307eb96..ea77ce1a1d 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -1,7 +1,40 @@ -from typing import Union +from typing import Tuple, Union + +import pandas as pd +import pyarrow as pa from ._internal import ArrayType, Field, MapType, PrimitiveType, Schema, StructType # Can't implement inheritance (see note in src/schema.rs), so this is next # best thing. DataType = Union["PrimitiveType", "MapType", "StructType", "ArrayType"] + + +def delta_arrow_schema_from_pandas( + data: pd.DataFrame, +) -> Tuple[pd.DataFrame, pa.Schema]: + """ " + Infers the schema for the delta table from the Pandas DataFrame. + Necessary because of issues such as: https://github.com/delta-io/delta-rs/issues/686 + + :param data: Data to write. + :returns A Pyarrow Table and the inferred schema for the Delta Table + """ + + table = pa.Table.from_pandas(data) + _schema = table.schema + schema_out = [] + for _field in _schema: + if isinstance(_field.type, pa.TimestampType): + f = pa.field( + name=_field.name, + type=pa.timestamp("us"), + nullable=_field.nullable, + metadata=_field.metadata, + ) + schema_out.append(f) + else: + schema_out.append(_field) + schema = pa.schema(schema_out, metadata=_schema.metadata) + data = pa.Table.from_pandas(data, schema=schema) + return data, schema diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 8a943253d0..ab3481f50a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -35,6 +35,8 @@ import pyarrow.fs as pa_fs from pyarrow.lib import RecordBatchReader +from deltalake.schema import delta_arrow_schema_from_pandas + from ._internal import DeltaDataChecker as _DeltaDataChecker from ._internal import PyDeltaTableError from ._internal import write_new_deltalake as _write_new_deltalake @@ -60,36 +62,6 @@ class AddAction: stats: str -def delta_arrow_schema_from_pandas( - data: pd.DataFrame, -) -> Tuple[pd.DataFrame, pa.Schema]: - """ " - Infers the schema for the delta table from the Pandas DataFrame. - Necessary because of issues such as: https://github.com/delta-io/delta-rs/issues/686 - - :param data: Data to write. - :returns A Pyarrow Table and the inferred schema for the Delta Table - """ - - table = pa.Table.from_pandas(data) - _schema = table.schema - schema_out = [] - for _field in _schema: - if isinstance(_field.type, pa.TimestampType): - f = pa.field( - name=_field.name, - type=pa.timestamp("us"), - nullable=_field.nullable, - metadata=_field.metadata, - ) - schema_out.append(f) - else: - schema_out.append(_field) - schema = pa.schema(schema_out, metadata=_schema.metadata) - data = pa.Table.from_pandas(data, schema=schema) - return data, schema - - def write_deltalake( table_or_uri: Union[str, DeltaTable], data: Union[ From 127abf5abe69c92a71d60537388c2eae59bb3c7b Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Wed, 30 Nov 2022 08:34:37 -0600 Subject: [PATCH 09/15] Update python/deltalake/schema.py Co-authored-by: Will Jones --- python/deltalake/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index ea77ce1a1d..40a78c74f4 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -12,7 +12,7 @@ def delta_arrow_schema_from_pandas( data: pd.DataFrame, -) -> Tuple[pd.DataFrame, pa.Schema]: +) -> Tuple[pa.Table, pa.Schema]: """ " Infers the schema for the delta table from the Pandas DataFrame. Necessary because of issues such as: https://github.com/delta-io/delta-rs/issues/686 From 0067cd6413cafb02448bd935902511fb2b9317ae Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Wed, 30 Nov 2022 08:35:03 -0600 Subject: [PATCH 10/15] Update python/deltalake/schema.py Co-authored-by: Will Jones --- python/deltalake/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index 40a78c74f4..4037c96aad 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -13,7 +13,7 @@ def delta_arrow_schema_from_pandas( data: pd.DataFrame, ) -> Tuple[pa.Table, pa.Schema]: - """ " + """ Infers the schema for the delta table from the Pandas DataFrame. Necessary because of issues such as: https://github.com/delta-io/delta-rs/issues/686 From 07aa60bb753b8f98d97ff91bee255248e184fd6c Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Wed, 30 Nov 2022 08:35:26 -0600 Subject: [PATCH 11/15] Update python/deltalake/schema.py Co-authored-by: Will Jones --- python/deltalake/schema.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index 4037c96aad..67514a9286 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -22,19 +22,19 @@ def delta_arrow_schema_from_pandas( """ table = pa.Table.from_pandas(data) - _schema = table.schema + schema = table.schema schema_out = [] - for _field in _schema: - if isinstance(_field.type, pa.TimestampType): + for field in schema: + if isinstance(field.type, pa.TimestampType): f = pa.field( - name=_field.name, + name=field.name, type=pa.timestamp("us"), - nullable=_field.nullable, - metadata=_field.metadata, + nullable=field.nullable, + metadata=field.metadata, ) schema_out.append(f) else: - schema_out.append(_field) + schema_out.append(field) schema = pa.schema(schema_out, metadata=_schema.metadata) data = pa.Table.from_pandas(data, schema=schema) return data, schema From 7f8530f64f7172caead86b67ef4173ab860126df Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Wed, 30 Nov 2022 18:50:25 -0600 Subject: [PATCH 12/15] Update python/deltalake/schema.py Co-authored-by: Will Jones --- python/deltalake/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index 67514a9286..e9063b41cc 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -18,7 +18,7 @@ def delta_arrow_schema_from_pandas( Necessary because of issues such as: https://github.com/delta-io/delta-rs/issues/686 :param data: Data to write. - :returns A Pyarrow Table and the inferred schema for the Delta Table + :return: A PyArrow Table and the inferred schema for the Delta Table """ table = pa.Table.from_pandas(data) From d648f46d0d245166a19f2db7d786197e151c1478 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Wed, 30 Nov 2022 18:50:50 -0600 Subject: [PATCH 13/15] Update python/deltalake/schema.py Co-authored-by: Will Jones --- python/deltalake/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index e9063b41cc..d6b6e52b34 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -35,6 +35,6 @@ def delta_arrow_schema_from_pandas( schema_out.append(f) else: schema_out.append(field) - schema = pa.schema(schema_out, metadata=_schema.metadata) + schema = pa.schema(schema_out, metadata=schema.metadata) data = pa.Table.from_pandas(data, schema=schema) return data, schema From 1750839e0fd8af699d519fd9d63b388c5c56f5b2 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 30 Nov 2022 17:58:31 -0800 Subject: [PATCH 14/15] fix: make pandas optional again --- python/deltalake/schema.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index d6b6e52b34..150e99029c 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -1,8 +1,10 @@ -from typing import Tuple, Union +from typing import Tuple, Union, TYPE_CHECKING -import pandas as pd import pyarrow as pa +if TYPE_CHECKING: + import pandas as pd + from ._internal import ArrayType, Field, MapType, PrimitiveType, Schema, StructType # Can't implement inheritance (see note in src/schema.rs), so this is next @@ -11,7 +13,7 @@ def delta_arrow_schema_from_pandas( - data: pd.DataFrame, + data: "pd.DataFrame", ) -> Tuple[pa.Table, pa.Schema]: """ Infers the schema for the delta table from the Pandas DataFrame. From e8fb3be755cdd07fe84e2cc318da0a93f40fbbbd Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 30 Nov 2022 18:02:09 -0800 Subject: [PATCH 15/15] Apply suggestions from code review --- python/deltalake/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index 150e99029c..abc88dba46 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -1,4 +1,4 @@ -from typing import Tuple, Union, TYPE_CHECKING +from typing import TYPE_CHECKING, Tuple, Union import pyarrow as pa