From d5b93707672e7c6bf3c39cbe5c7d4a1ea1e50c4a Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Tue, 24 Sep 2024 11:31:01 -0400 Subject: [PATCH 1/9] feat(memtable): add pyarrow.dataset support This adds support for using `memtable` with `pyarrow.dataset.Dataset` objects. Mostly this looks the same as the `PyArrowTableProxy`. Some deviations: - An additional `to_pyarrow_lazy` method for grabbing the underlying object without materializing it, for backends that can perform pushdowns (currently only enabled with DuckDB). - Datasets are hashable objects, so we override the PseudoHashable `hash` implementation (this seemed like less trouble than special-casing this one memtable everywhere else we look for `TableProxy` objects) - I've added a `cached_property` so we can avoid repeatedly materializing the data -- this does _not_ apply to the `lazy` view. --- ibis/backends/duckdb/__init__.py | 5 +++- ibis/backends/tests/test_client.py | 21 ------------- ibis/expr/api.py | 18 +++++++++++ ibis/formats/pyarrow.py | 48 ++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 22 deletions(-) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 0bdaa57a0f66..7b17773631e5 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -1609,7 +1609,10 @@ def _in_memory_table_exists(self, name: str) -> bool: return True def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: - self.con.register(op.name, op.data.to_pyarrow(op.schema)) + if hasattr(op.data, "to_pyarrow_lazy"): + self.con.register(op.name, op.data.to_pyarrow_lazy(op.schema)) + else: + self.con.register(op.name, op.data.to_pyarrow(op.schema)) def _finalize_memtable(self, name: str) -> None: # if we don't aggressively unregister tables duckdb will keep a diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 7d1ae751e58b..000e6d28dbc7 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -928,27 +928,6 @@ def test_self_join_memory_table(backend, con, monkeypatch): param( lambda: ds.dataset(pa.table({"a": ["a"], "b": [1]})), "df_arrow_dataset", - marks=[ - pytest.mark.notimpl( - [ - "bigquery", - "clickhouse", - "duckdb", - "exasol", - "impala", - "mssql", - "mysql", - "oracle", - "polars", - "postgres", - "pyspark", - "risingwave", - "snowflake", - "sqlite", - "trino", - ] - ), - ], id="pyarrow dataset", ), param(lambda: pd.DataFrame({"a": ["a"], "b": [1]}), "df_pandas", id="pandas"), diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 3ea297672c90..e89ed7b8ee99 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -50,6 +50,7 @@ import pandas as pd import polars as pl import pyarrow as pa + import pyarrow.dataset as ds from ibis.expr.schema import SchemaLike @@ -480,6 +481,23 @@ def _memtable_from_pyarrow_table( ).to_expr() +@_memtable.register("pyarrow.dataset.Dataset") +def _memtable_from_pyarrow_dataset( + data: ds.Dataset, + *, + name: str | None = None, + schema: SchemaLike | None = None, + columns: Iterable[str] | None = None, +): + from ibis.formats.pyarrow import PyArrowDatasetProxy + + return ops.InMemoryTable( + name=name if name is not None else util.gen_name("pyarrow_memtable"), + schema=Schema.from_pyarrow(data.schema), + data=PyArrowDatasetProxy(data), + ).to_expr() + + @_memtable.register("polars.LazyFrame") def _memtable_from_polars_lazyframe(data: pl.LazyFrame, **kwargs): return _memtable_from_polars_dataframe(data.collect(), **kwargs) diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 5791c4cc1b7a..53d1e728d9e8 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -1,5 +1,6 @@ from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING, Any import pyarrow as pa @@ -12,7 +13,11 @@ if TYPE_CHECKING: from collections.abc import Sequence + import pandas as pd import polars as pl + import pyarrow.dataset as ds + + from ibis.util import V _from_pyarrow_types = { @@ -341,3 +346,46 @@ def to_polars(self, schema: Schema) -> pl.DataFrame: df = pl.from_arrow(self.obj) return PolarsData.convert_table(df, schema) + + +class PyArrowDatasetProxy(TableProxy): + __slots__ = ("obj", "__dict__") + obj: V + + def __init__(self, obj: V): + self.obj = obj + + def __len__(self): + return self.obj.count_rows() + + # pyarrow datasets are hashable, so we override the hash from TableProxy + def __hash__(self): + return hash(self.obj) + + @cached_property + def _cache(self): + return self.obj.to_table() + + def to_frame(self) -> pd.DataFrame: + """Convert this input to a pandas DataFrame.""" + return self._cache.to_pandas() + + def to_pyarrow(self, schema: Schema) -> pa.Table: + """Convert this input to a PyArrow Table.""" + return self._cache + + def to_pyarrow_lazy(self, schema: Schema) -> ds.Dataset: + """Return the dataset object itself. + + Use with backends that can perform pushdowns into dataset objects. + """ + return self.obj + + def to_polars(self, schema: Schema) -> pl.DataFrame: + """Convert this input to a Polars DataFrame.""" + import polars as pl + + from ibis.formats.polars import PolarsData + + df = pl.from_arrow(self._cache) + return PolarsData.convert_table(df, schema) From a432bcc4e0afe1f0cbb25c273e91c711d6029811 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Tue, 24 Sep 2024 12:09:58 -0400 Subject: [PATCH 2/9] test(polars): add polars xfail back in --- ibis/backends/tests/test_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 000e6d28dbc7..4b0b2d6ab65b 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -928,6 +928,9 @@ def test_self_join_memory_table(backend, con, monkeypatch): param( lambda: ds.dataset(pa.table({"a": ["a"], "b": [1]})), "df_arrow_dataset", + marks=[ + pytest.mark.notimpl(["polars"]), + ], id="pyarrow dataset", ), param(lambda: pd.DataFrame({"a": ["a"], "b": [1]}), "df_pandas", id="pandas"), From 693640c0170393329dff179a09429d5022fb31c7 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Tue, 24 Sep 2024 16:57:12 -0400 Subject: [PATCH 3/9] refactor(dataset): raise if user tries to materialize a dataset --- ibis/backends/duckdb/__init__.py | 4 +-- ibis/backends/tests/test_client.py | 21 ++++++++++++- ibis/formats/pyarrow.py | 48 +++++++++++++++++++----------- 3 files changed, 52 insertions(+), 21 deletions(-) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 7b17773631e5..b1f5a1f07453 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -1609,8 +1609,8 @@ def _in_memory_table_exists(self, name: str) -> bool: return True def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: - if hasattr(op.data, "to_pyarrow_lazy"): - self.con.register(op.name, op.data.to_pyarrow_lazy(op.schema)) + if hasattr(op.data, "to_pyarrow_dataset"): + self.con.register(op.name, op.data.to_pyarrow_dataset(op.schema)) else: self.con.register(op.name, op.data.to_pyarrow(op.schema)) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 4b0b2d6ab65b..652592642911 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -929,7 +929,26 @@ def test_self_join_memory_table(backend, con, monkeypatch): lambda: ds.dataset(pa.table({"a": ["a"], "b": [1]})), "df_arrow_dataset", marks=[ - pytest.mark.notimpl(["polars"]), + pytest.mark.notimpl( + [ + "bigquery", + "clickhouse", + "exasol", + "impala", + "mssql", + "mysql", + "oracle", + "postgres", + "pyspark", + "risingwave", + "snowflake", + "sqlite", + "trino", + ], + raises=com.UnsupportedOperationError, + reason="we don't materialize datasets to avoid perf footguns", + ), + pytest.mark.notimpl(["polars"], raises=NotImplementedError), ], id="pyarrow dataset", ), diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 53d1e728d9e8..38e671df6024 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -1,11 +1,11 @@ from __future__ import annotations -from functools import cached_property from typing import TYPE_CHECKING, Any import pyarrow as pa import pyarrow_hotfix # noqa: F401 +import ibis.common.exceptions as com import ibis.expr.datatypes as dt from ibis.expr.schema import Schema from ibis.formats import DataMapper, SchemaMapper, TableProxy, TypeMapper @@ -349,7 +349,7 @@ def to_polars(self, schema: Schema) -> pl.DataFrame: class PyArrowDatasetProxy(TableProxy): - __slots__ = ("obj", "__dict__") + __slots__ = ("obj",) obj: V def __init__(self, obj: V): @@ -362,30 +362,42 @@ def __len__(self): def __hash__(self): return hash(self.obj) - @cached_property - def _cache(self): - return self.obj.to_table() - def to_frame(self) -> pd.DataFrame: - """Convert this input to a pandas DataFrame.""" - return self._cache.to_pandas() + raise com.UnsupportedOperationError( + """You are trying to use a PyArrow Dataset with a backend + that will require materializing the entire dataset in local + memory. + + If you would like to materialize this dataset, please construct the + memtable directly by running `ibis.memtable(my_dataset.to_table())` + """ + ) def to_pyarrow(self, schema: Schema) -> pa.Table: - """Convert this input to a PyArrow Table.""" - return self._cache + raise com.UnsupportedOperationError( + """You are trying to use a PyArrow Dataset with a backend + that will require materializing the entire dataset in local + memory. - def to_pyarrow_lazy(self, schema: Schema) -> ds.Dataset: + If you would like to materialize this dataset, please construct the + memtable directly by running `ibis.memtable(my_dataset.to_table())` + """ + ) + + def to_pyarrow_dataset(self, schema: Schema) -> ds.Dataset: """Return the dataset object itself. Use with backends that can perform pushdowns into dataset objects. """ return self.obj - def to_polars(self, schema: Schema) -> pl.DataFrame: - """Convert this input to a Polars DataFrame.""" - import polars as pl - - from ibis.formats.polars import PolarsData + def to_polars(self, schema: Schema) -> pa.Table: + raise com.UnsupportedOperationError( + """You are trying to use a PyArrow Dataset with a backend + that will require materializing the entire dataset in local + memory. - df = pl.from_arrow(self._cache) - return PolarsData.convert_table(df, schema) + If you would like to materialize this dataset, please construct the + memtable directly by running `ibis.memtable(my_dataset.to_table())` + """ + ) From 478c8b58812a2dea867e8dcb6b38096a23943786 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 26 Sep 2024 05:37:24 -0400 Subject: [PATCH 4/9] chore: `__len__` implementation --- ibis/formats/pyarrow.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 38e671df6024..405a9a9fc15c 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -355,9 +355,6 @@ class PyArrowDatasetProxy(TableProxy): def __init__(self, obj: V): self.obj = obj - def __len__(self): - return self.obj.count_rows() - # pyarrow datasets are hashable, so we override the hash from TableProxy def __hash__(self): return hash(self.obj) From a2083cd63682329d15a7d86c902b2f1e6fce27fb Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 26 Sep 2024 05:41:39 -0400 Subject: [PATCH 5/9] chore: catch `AttributeError` instead of checking --- ibis/backends/duckdb/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index b1f5a1f07453..09d1f22fab74 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -1609,10 +1609,15 @@ def _in_memory_table_exists(self, name: str) -> bool: return True def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: - if hasattr(op.data, "to_pyarrow_dataset"): - self.con.register(op.name, op.data.to_pyarrow_dataset(op.schema)) - else: - self.con.register(op.name, op.data.to_pyarrow(op.schema)) + data = op.data + schema = op.schema + + try: + obj = data.to_pyarrow_dataset(schema) + except AttributeError: + obj = data.to_pyarrow(schema) + + self.con.register(op.name, obj) def _finalize_memtable(self, name: str) -> None: # if we don't aggressively unregister tables duckdb will keep a From 91bed2f56072b045cc6015861bbf694779b69319 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 26 Sep 2024 05:46:52 -0400 Subject: [PATCH 6/9] chore: remove `__len__` from `TableProxy` --- ibis/formats/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ibis/formats/__init__.py b/ibis/formats/__init__.py index d8c3b24669c7..0e76b1db735e 100644 --- a/ibis/formats/__init__.py +++ b/ibis/formats/__init__.py @@ -239,9 +239,6 @@ def __repr__(self) -> str: data_repr = indent(repr(self.obj), spaces=2) return f"{self.__class__.__name__}:\n{data_repr}" - def __len__(self) -> int: - return len(self.obj) - @abstractmethod def to_frame(self) -> pd.DataFrame: # pragma: no cover """Convert this input to a pandas DataFrame.""" From 78a94627861f602e2543e238b139d428f8e4aaca Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 26 Sep 2024 05:47:21 -0400 Subject: [PATCH 7/9] chore: consolidate error message --- ibis/formats/pyarrow.py | 41 ++++++++--------------------------------- 1 file changed, 8 insertions(+), 33 deletions(-) diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 405a9a9fc15c..f91390b9f961 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -17,8 +17,6 @@ import polars as pl import pyarrow.dataset as ds - from ibis.util import V - _from_pyarrow_types = { pa.int8(): dt.Int8, @@ -349,37 +347,22 @@ def to_polars(self, schema: Schema) -> pl.DataFrame: class PyArrowDatasetProxy(TableProxy): - __slots__ = ("obj",) - obj: V + ERROR_MESSAGE = """\ +You are trying to use a PyArrow Dataset with a backend that will require +materializing the entire dataset in local memory. - def __init__(self, obj: V): - self.obj = obj +If you would like to materialize this dataset, please construct the memtable +directly by running `ibis.memtable(my_dataset.to_table())`.""" # pyarrow datasets are hashable, so we override the hash from TableProxy def __hash__(self): return hash(self.obj) def to_frame(self) -> pd.DataFrame: - raise com.UnsupportedOperationError( - """You are trying to use a PyArrow Dataset with a backend - that will require materializing the entire dataset in local - memory. - - If you would like to materialize this dataset, please construct the - memtable directly by running `ibis.memtable(my_dataset.to_table())` - """ - ) + raise com.UnsupportedOperationError(self.ERROR_MESSAGE) def to_pyarrow(self, schema: Schema) -> pa.Table: - raise com.UnsupportedOperationError( - """You are trying to use a PyArrow Dataset with a backend - that will require materializing the entire dataset in local - memory. - - If you would like to materialize this dataset, please construct the - memtable directly by running `ibis.memtable(my_dataset.to_table())` - """ - ) + raise com.UnsupportedOperationError(self.ERROR_MESSAGE) def to_pyarrow_dataset(self, schema: Schema) -> ds.Dataset: """Return the dataset object itself. @@ -389,12 +372,4 @@ def to_pyarrow_dataset(self, schema: Schema) -> ds.Dataset: return self.obj def to_polars(self, schema: Schema) -> pa.Table: - raise com.UnsupportedOperationError( - """You are trying to use a PyArrow Dataset with a backend - that will require materializing the entire dataset in local - memory. - - If you would like to materialize this dataset, please construct the - memtable directly by running `ibis.memtable(my_dataset.to_table())` - """ - ) + raise com.UnsupportedOperationError(self.ERROR_MESSAGE) From b93858194e9da21fa1a6817fd1d98fa51c29c834 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 26 Sep 2024 05:53:23 -0400 Subject: [PATCH 8/9] chore: revert dropping of slots --- ibis/formats/pyarrow.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index f91390b9f961..7496b52fce7a 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -9,6 +9,7 @@ import ibis.expr.datatypes as dt from ibis.expr.schema import Schema from ibis.formats import DataMapper, SchemaMapper, TableProxy, TypeMapper +from ibis.util import V if TYPE_CHECKING: from collections.abc import Sequence @@ -330,7 +331,7 @@ def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table: return table -class PyArrowTableProxy(TableProxy): +class PyArrowTableProxy(TableProxy[V]): def to_frame(self): return self.obj.to_pandas() @@ -346,7 +347,7 @@ def to_polars(self, schema: Schema) -> pl.DataFrame: return PolarsData.convert_table(df, schema) -class PyArrowDatasetProxy(TableProxy): +class PyArrowDatasetProxy(TableProxy[V]): ERROR_MESSAGE = """\ You are trying to use a PyArrow Dataset with a backend that will require materializing the entire dataset in local memory. @@ -354,6 +355,9 @@ class PyArrowDatasetProxy(TableProxy): If you would like to materialize this dataset, please construct the memtable directly by running `ibis.memtable(my_dataset.to_table())`.""" + __slots__ = ("obj",) + obj: V + # pyarrow datasets are hashable, so we override the hash from TableProxy def __hash__(self): return hash(self.obj) From 926bd01267e74d06fb30b08287894695dd8cffae Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 26 Sep 2024 06:05:51 -0400 Subject: [PATCH 9/9] chore: constructor --- ibis/formats/pyarrow.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 7496b52fce7a..93c41b7169aa 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -358,6 +358,9 @@ class PyArrowDatasetProxy(TableProxy[V]): __slots__ = ("obj",) obj: V + def __init__(self, obj: V) -> None: + self.obj = obj + # pyarrow datasets are hashable, so we override the hash from TableProxy def __hash__(self): return hash(self.obj)