Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(memtable): add pyarrow.dataset support #10206

Merged
merged 9 commits into from
Sep 26, 2024
10 changes: 9 additions & 1 deletion ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,15 @@ def _in_memory_table_exists(self, name: str) -> bool:
return True

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.register(op.name, op.data.to_pyarrow(op.schema))
data = op.data
schema = op.schema

try:
obj = data.to_pyarrow_dataset(schema)
except AttributeError:
obj = data.to_pyarrow(schema)

self.con.register(op.name, obj)

def _finalize_memtable(self, name: str) -> None:
# if we don't aggressively unregister tables duckdb will keep a
Expand Down
7 changes: 4 additions & 3 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,21 +933,22 @@ def test_self_join_memory_table(backend, con, monkeypatch):
[
"bigquery",
"clickhouse",
"duckdb",
"exasol",
"impala",
"mssql",
"mysql",
"oracle",
"polars",
"postgres",
"pyspark",
"risingwave",
"snowflake",
"sqlite",
"trino",
]
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
],
raises=com.UnsupportedOperationError,
reason="we don't materialize datasets to avoid perf footguns",
),
pytest.mark.notimpl(["polars"], raises=NotImplementedError),
],
id="pyarrow dataset",
),
Expand Down
18 changes: 18 additions & 0 deletions ibis/expr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.dataset as ds

from ibis.expr.schema import SchemaLike

Expand Down Expand Up @@ -480,6 +481,23 @@ def _memtable_from_pyarrow_table(
).to_expr()


@_memtable.register("pyarrow.dataset.Dataset")
def _memtable_from_pyarrow_dataset(
data: ds.Dataset,
*,
name: str | None = None,
schema: SchemaLike | None = None,
columns: Iterable[str] | None = None,
):
from ibis.formats.pyarrow import PyArrowDatasetProxy

return ops.InMemoryTable(
name=name if name is not None else util.gen_name("pyarrow_memtable"),
schema=Schema.from_pyarrow(data.schema),
data=PyArrowDatasetProxy(data),
).to_expr()


@_memtable.register("polars.LazyFrame")
def _memtable_from_polars_lazyframe(data: pl.LazyFrame, **kwargs):
return _memtable_from_polars_dataframe(data.collect(), **kwargs)
Expand Down
3 changes: 0 additions & 3 deletions ibis/formats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,6 @@ def __repr__(self) -> str:
data_repr = indent(repr(self.obj), spaces=2)
return f"{self.__class__.__name__}:\n{data_repr}"

def __len__(self) -> int:
return len(self.obj)

@abstractmethod
def to_frame(self) -> pd.DataFrame: # pragma: no cover
"""Convert this input to a pandas DataFrame."""
Expand Down
41 changes: 40 additions & 1 deletion ibis/formats/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
from ibis.expr.schema import Schema
from ibis.formats import DataMapper, SchemaMapper, TableProxy, TypeMapper
from ibis.util import V

if TYPE_CHECKING:
from collections.abc import Sequence

import pandas as pd
import polars as pl
import pyarrow.dataset as ds


_from_pyarrow_types = {
Expand Down Expand Up @@ -327,7 +331,7 @@
return table


class PyArrowTableProxy(TableProxy):
class PyArrowTableProxy(TableProxy[V]):
def to_frame(self):
return self.obj.to_pandas()

Expand All @@ -341,3 +345,38 @@

df = pl.from_arrow(self.obj)
return PolarsData.convert_table(df, schema)


class PyArrowDatasetProxy(TableProxy[V]):
ERROR_MESSAGE = """\
You are trying to use a PyArrow Dataset with a backend that will require
materializing the entire dataset in local memory.

If you would like to materialize this dataset, please construct the memtable
directly by running `ibis.memtable(my_dataset.to_table())`."""

__slots__ = ("obj",)
obj: V

def __init__(self, obj: V) -> None:
self.obj = obj

# pyarrow datasets are hashable, so we override the hash from TableProxy
def __hash__(self):
return hash(self.obj)

def to_frame(self) -> pd.DataFrame:
raise com.UnsupportedOperationError(self.ERROR_MESSAGE)

def to_pyarrow(self, schema: Schema) -> pa.Table:
raise com.UnsupportedOperationError(self.ERROR_MESSAGE)

def to_pyarrow_dataset(self, schema: Schema) -> ds.Dataset:
"""Return the dataset object itself.

Use with backends that can perform pushdowns into dataset objects.
"""
return self.obj

def to_polars(self, schema: Schema) -> pa.Table:
raise com.UnsupportedOperationError(self.ERROR_MESSAGE)

Check warning on line 382 in ibis/formats/pyarrow.py

View check run for this annotation

Codecov / codecov/patch

ibis/formats/pyarrow.py#L382

Added line #L382 was not covered by tests