From 428d1a3c6c8d5e40f1379a4c0b3f82c65b96d29a Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Thu, 26 Sep 2024 06:30:26 -0400 Subject: [PATCH] feat(memtable): add pyarrow.dataset support (#10206) Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com> --- ibis/backends/duckdb/__init__.py | 10 +++++++- ibis/backends/tests/test_client.py | 7 ++--- ibis/expr/api.py | 18 +++++++++++++ ibis/formats/__init__.py | 3 --- ibis/formats/pyarrow.py | 41 +++++++++++++++++++++++++++++- 5 files changed, 71 insertions(+), 8 deletions(-) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 0bdaa57a0f66..09d1f22fab74 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -1609,7 +1609,15 @@ def _in_memory_table_exists(self, name: str) -> bool: return True def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: - self.con.register(op.name, op.data.to_pyarrow(op.schema)) + data = op.data + schema = op.schema + + try: + obj = data.to_pyarrow_dataset(schema) + except AttributeError: + obj = data.to_pyarrow(schema) + + self.con.register(op.name, obj) def _finalize_memtable(self, name: str) -> None: # if we don't aggressively unregister tables duckdb will keep a diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 7d1ae751e58b..652592642911 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -933,21 +933,22 @@ def test_self_join_memory_table(backend, con, monkeypatch): [ "bigquery", "clickhouse", - "duckdb", "exasol", "impala", "mssql", "mysql", "oracle", - "polars", "postgres", "pyspark", "risingwave", "snowflake", "sqlite", "trino", - ] + ], + raises=com.UnsupportedOperationError, + reason="we don't materialize datasets to avoid perf footguns", ), + pytest.mark.notimpl(["polars"], raises=NotImplementedError), ], id="pyarrow dataset", ), diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 3ea297672c90..e89ed7b8ee99 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -50,6 +50,7 @@ import pandas as pd import polars as pl import pyarrow as pa + import pyarrow.dataset as ds from ibis.expr.schema import SchemaLike @@ -480,6 +481,23 @@ def _memtable_from_pyarrow_table( ).to_expr() +@_memtable.register("pyarrow.dataset.Dataset") +def _memtable_from_pyarrow_dataset( + data: ds.Dataset, + *, + name: str | None = None, + schema: SchemaLike | None = None, + columns: Iterable[str] | None = None, +): + from ibis.formats.pyarrow import PyArrowDatasetProxy + + return ops.InMemoryTable( + name=name if name is not None else util.gen_name("pyarrow_memtable"), + schema=Schema.from_pyarrow(data.schema), + data=PyArrowDatasetProxy(data), + ).to_expr() + + @_memtable.register("polars.LazyFrame") def _memtable_from_polars_lazyframe(data: pl.LazyFrame, **kwargs): return _memtable_from_polars_dataframe(data.collect(), **kwargs) diff --git a/ibis/formats/__init__.py b/ibis/formats/__init__.py index d8c3b24669c7..0e76b1db735e 100644 --- a/ibis/formats/__init__.py +++ b/ibis/formats/__init__.py @@ -239,9 +239,6 @@ def __repr__(self) -> str: data_repr = indent(repr(self.obj), spaces=2) return f"{self.__class__.__name__}:\n{data_repr}" - def __len__(self) -> int: - return len(self.obj) - @abstractmethod def to_frame(self) -> pd.DataFrame: # pragma: no cover """Convert this input to a pandas DataFrame.""" diff --git a/ibis/formats/pyarrow.py b/ibis/formats/pyarrow.py index 5791c4cc1b7a..93c41b7169aa 100644 --- a/ibis/formats/pyarrow.py +++ b/ibis/formats/pyarrow.py @@ -5,14 +5,18 @@ import pyarrow as pa import pyarrow_hotfix # noqa: F401 +import ibis.common.exceptions as com import ibis.expr.datatypes as dt from ibis.expr.schema import Schema from ibis.formats import DataMapper, SchemaMapper, TableProxy, TypeMapper +from ibis.util import V if TYPE_CHECKING: from collections.abc import Sequence + import pandas as pd import polars as pl + import pyarrow.dataset as ds _from_pyarrow_types = { @@ -327,7 +331,7 @@ def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table: return table -class PyArrowTableProxy(TableProxy): +class PyArrowTableProxy(TableProxy[V]): def to_frame(self): return self.obj.to_pandas() @@ -341,3 +345,38 @@ def to_polars(self, schema: Schema) -> pl.DataFrame: df = pl.from_arrow(self.obj) return PolarsData.convert_table(df, schema) + + +class PyArrowDatasetProxy(TableProxy[V]): + ERROR_MESSAGE = """\ +You are trying to use a PyArrow Dataset with a backend that will require +materializing the entire dataset in local memory. + +If you would like to materialize this dataset, please construct the memtable +directly by running `ibis.memtable(my_dataset.to_table())`.""" + + __slots__ = ("obj",) + obj: V + + def __init__(self, obj: V) -> None: + self.obj = obj + + # pyarrow datasets are hashable, so we override the hash from TableProxy + def __hash__(self): + return hash(self.obj) + + def to_frame(self) -> pd.DataFrame: + raise com.UnsupportedOperationError(self.ERROR_MESSAGE) + + def to_pyarrow(self, schema: Schema) -> pa.Table: + raise com.UnsupportedOperationError(self.ERROR_MESSAGE) + + def to_pyarrow_dataset(self, schema: Schema) -> ds.Dataset: + """Return the dataset object itself. + + Use with backends that can perform pushdowns into dataset objects. + """ + return self.obj + + def to_polars(self, schema: Schema) -> pa.Table: + raise com.UnsupportedOperationError(self.ERROR_MESSAGE)