diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index a74ad3dc942d..c9216395d731 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: import pandas as pd + import pyarrow as pa import ibis import ibis.common.exceptions as exc @@ -206,7 +207,65 @@ def _ipython_key_completions_(self) -> list[str]: return self._backend.list_tables() -class BaseBackend(abc.ABC): +# should have a better name +class ResultHandler: + @staticmethod + def _import_pyarrow(): + try: + import pyarrow + except ImportError: + raise ModuleNotFoundError( + "Exporting to arrow formats requires `pyarrow` but it is not installed" # noqa: ignore + ) + else: + return pyarrow + + def to_pyarrow( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pa.Table: + pa = self._import_pyarrow() + try: + # Can't construct an array from record batches + # so construct at one column table (if applicable) + # then return the column _from_ the table + table = pa.Table.from_batches( + self.to_pyarrow_batches( + expr, params=params, limit=limit, **kwargs + ) + ) + if isinstance(expr, ir.Table): + return table + elif isinstance(expr, ir.Column): + # Column will be a ChunkedArray, `combine_chunks` will + # flatten it + return table.columns[0].combine_chunks() + elif isinstance(expr, ir.Scalar): + return table.columns[0].combine_chunks()[0] + else: + raise ValueError + except ValueError: + # The pyarrow batches iterator is empty so pass in an empty + # iterator and the pyarrow schema + return pa.Table.from_batches([], schema=expr.schema().to_pyarrow()) + + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> pa.RecordBatchReader: + raise NotImplementedError + + +class BaseBackend(abc.ABC, ResultHandler): """Base backend class. All Ibis backends must subclass this class and implement all the diff --git a/ibis/backends/base/sql/__init__.py b/ibis/backends/base/sql/__init__.py index d7cc0de4b7d2..d3f8e51d7b4b 100644 --- a/ibis/backends/base/sql/__init__.py +++ b/ibis/backends/base/sql/__init__.py @@ -4,7 +4,7 @@ import contextlib import os from functools import lru_cache -from typing import Any, Mapping +from typing import TYPE_CHECKING, Any, Iterable, Mapping import sqlalchemy as sa @@ -17,6 +17,9 @@ from ibis.backends.base.sql.compiler import Compiler from ibis.expr.typing import TimeContext +if TYPE_CHECKING: + import pyarrow as pa + __all__ = [ 'BaseSQLBackend', ] @@ -127,8 +130,6 @@ def raw_sql(self, query: str) -> Any: Any Backend cursor """ - # TODO results is unused, it can be removed - # (requires updating Impala tests) # TODO `self.con` is assumed to be defined in subclasses, but there # is nothing that enforces it. We should find a way to make sure # `self.con` is always a DBAPI2 connection, or raise an error @@ -141,6 +142,55 @@ def raw_sql(self, query: str) -> Any: def _safe_raw_sql(self, *args, **kwargs): yield self.raw_sql(*args, **kwargs) + def _cursor_batches( + self, + expr: ir.Expr, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + ) -> Iterable[list]: + query_ast = self.compiler.to_ast_ensure_limit( + expr, limit, params=params + ) + sql = query_ast.compile() + + with self._safe_raw_sql(sql) as cursor: + while batch := cursor.fetchmany(chunk_size): + yield batch + + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> pa.RecordBatchReader: + pa = self._import_pyarrow() + + from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct + + if hasattr(expr, "schema"): + schema = expr.schema() + else: + # ColumnExpr has no schema method, define single-column schema + schema = sch.schema([(expr.get_name(), expr.type())]) + + def _batches(): + for batch in self._cursor_batches( + expr, params=params, limit=limit, chunk_size=chunk_size + ): + struct_array = pa.array( + map(tuple, batch), + type=ibis_to_pyarrow_struct(schema), + ) + yield pa.RecordBatch.from_struct_array(struct_array) + + return pa.RecordBatchReader.from_batches( + schema.to_pyarrow(), _batches() + ) + def execute( self, expr: ir.Expr, diff --git a/ibis/backends/datafusion/__init__.py b/ibis/backends/datafusion/__init__.py index 23f0ba1596e3..1d962491b174 100644 --- a/ibis/backends/datafusion/__init__.py +++ b/ibis/backends/datafusion/__init__.py @@ -20,6 +20,8 @@ except ImportError: from datafusion import SessionContext +import datafusion + def _to_pyarrow_table(frame): batches = frame.collect() @@ -155,23 +157,19 @@ def register_parquet( """ self._context.register_parquet(name, str(path)) - def execute( + def _get_frame( self, expr: ir.Expr, - params: Mapping[ir.Expr, object] = None, - limit: str = 'default', + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, **kwargs: Any, - ): + ) -> datafusion.DataFrame: if isinstance(expr, ir.Table): - frame = self.compile(expr, params, **kwargs) - table = _to_pyarrow_table(frame) - return table.to_pandas() + return self.compile(expr, params, **kwargs) elif isinstance(expr, ir.Column): # expression must be named for the projection expr = expr.name('tmp').to_projection() - frame = self.compile(expr, params, **kwargs) - table = _to_pyarrow_table(frame) - return table['tmp'].to_pandas() + return self.compile(expr, params, **kwargs) elif isinstance(expr, ir.Scalar): if an.find_immediate_parent_tables(expr.op()): # there are associated datafusion tables so convert the expr @@ -184,8 +182,43 @@ def execute( # dummy datafusion table compiled = self.compile(expr, params, **kwargs) frame = self._context.empty_table().select(compiled) - table = _to_pyarrow_table(frame) - return table[0][0].as_py() + return frame + else: + raise com.IbisError( + f"Cannot execute expression of type: {type(expr)}" + ) + + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> pa.RecordBatchReader: + pa = self._import_pyarrow() + frame = self._get_frame(expr, params, limit, **kwargs) + return pa.RecordBatchReader.from_batches( + frame.schema(), frame.collect() + ) + + def execute( + self, + expr: ir.Expr, + params: Mapping[ir.Expr, object] = None, + limit: int | str | None = "default", + **kwargs: Any, + ): + output = self.to_pyarrow(expr, params=params, limit=limit, **kwargs) + if isinstance(expr, ir.Table): + return output.to_pandas() + elif isinstance(expr, ir.Column): + series = output.to_pandas() + series.name = "tmp" + return series + elif isinstance(expr, ir.Scalar): + return output.as_py() else: raise com.IbisError( f"Cannot execute expression of type: {type(expr)}" diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 7ddd41c11c67..a1b0a7115eeb 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -7,9 +7,10 @@ import os import warnings from pathlib import Path -from typing import TYPE_CHECKING, Any, Iterator, MutableMapping +from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping import pandas as pd +import pyarrow as pa import pyarrow.types as pat import sqlalchemy as sa import toolz @@ -19,10 +20,9 @@ if TYPE_CHECKING: import duckdb - import ibis.expr.types as ir - import pyarrow as pa import ibis.expr.schema as sch +import ibis.expr.types as ir from ibis.backends.base.sql.alchemy import BaseAlchemyBackend from ibis.backends.duckdb.compiler import DuckDBSQLCompiler from ibis.backends.duckdb.datatypes import parse @@ -213,6 +213,57 @@ def register( return self.table(table_name) + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + ) -> Iterator[pa.RecordBatch]: + _ = self._import_pyarrow() + query_ast = self.compiler.to_ast_ensure_limit( + expr, limit, params=params + ) + sql = query_ast.compile() + + cursor = self.raw_sql(sql) + + _reader = cursor.cursor.fetch_record_batch(chunk_size=chunk_size) + # Horrible hack to make sure cursor isn't garbage collected + # before batches are streamed out of the RecordBatchReader + batches = IbisRecordBatchReader(_reader, cursor) + return batches + # TODO: duckdb seems to not care about the `chunk_size` argument + # and returns batches in 1024 row chunks + + def to_pyarrow( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + ) -> pa.Table: + _ = self._import_pyarrow() + query_ast = self.compiler.to_ast_ensure_limit( + expr, limit, params=params + ) + sql = query_ast.compile() + + cursor = self.raw_sql(sql) + table = cursor.cursor.fetch_arrow_table() + + if isinstance(expr, ir.Table): + return table + elif isinstance(expr, ir.Column): + # Column will be a ChunkedArray, `combine_chunks` will + # flatten it + return table.columns[0].combine_chunks() + elif isinstance(expr, ir.Scalar): + return table.columns[0].combine_chunks()[0] + else: + raise ValueError + def fetch_from_cursor( self, cursor: duckdb.DuckDBPyConnection, @@ -294,3 +345,25 @@ def _get_temp_view_definition( definition: sa.sql.compiler.Compiled, ) -> str: return f"CREATE OR REPLACE TEMPORARY VIEW {name} AS {definition}" + + +class IbisRecordBatchReader(pa.RecordBatchReader): + def __init__(self, reader, cursor): + self.reader = reader + self.cursor = cursor + + def close(self): + self.reader.close() + del self.cursor + + def read_all(self): + return self.reader.read_all() + + def read_next_batch(self): + return self.reader.read_next_batch() + + def read_pandas(self): + return self.reader.read_pandas() + + def schema(self): + return self.reader.schema diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index fc7c741814bb..7f08e4806ee9 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -2,7 +2,7 @@ import importlib from functools import lru_cache -from typing import Any, MutableMapping +from typing import TYPE_CHECKING, Any, Mapping, MutableMapping import pandas as pd @@ -18,6 +18,9 @@ ibis_schema_to_pandas, ) +if TYPE_CHECKING: + import pyarrow as pa + class BasePandasBackend(BaseBackend): """Base class for backends based on pandas.""" @@ -192,6 +195,22 @@ class Backend(BasePandasBackend): database_class = PandasDatabase table_class = PandasTable + def to_pyarrow( + self, + expr: ir.Expr, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + ) -> pa.Table: + pa = self._import_pyarrow() + output = self.execute(expr, params=params, limit=limit) + + if isinstance(output, pd.DataFrame): + return pa.Table.from_pandas(output) + elif isinstance(output, pd.Series): + return pa.Array.from_pandas(output) + else: + return pa.scalar(output) + def execute(self, query, params=None, limit='default', **kwargs): from ibis.backends.pandas.core import execute_and_reset diff --git a/ibis/backends/pyarrow/datatypes.py b/ibis/backends/pyarrow/datatypes.py index 47f6ec7e1a82..9eeaf3938f3a 100644 --- a/ibis/backends/pyarrow/datatypes.py +++ b/ibis/backends/pyarrow/datatypes.py @@ -1,6 +1,7 @@ from __future__ import annotations import functools +from typing import Iterable import pyarrow as pa @@ -24,6 +25,7 @@ dt.Binary: pa.binary(), dt.Boolean: pa.bool_(), dt.Timestamp: pa.timestamp('ns'), + dt.Date: pa.date64(), } @@ -129,3 +131,20 @@ def infer_pyarrow_schema(schema: pa.Schema) -> sch.Schema: return sch.schema( [(f.name, dt.dtype(f.type, nullable=f.nullable)) for f in schema] ) + + +def _schema_to_pyarrow_schema_fields(schema: sch.Schema) -> Iterable[pa.Field]: + for name, dtype in schema.items(): + yield pa.field(name, dtype.to_pyarrow(), nullable=dtype.nullable) + + +def ibis_to_pyarrow_struct(schema: sch.Schema) -> pa.StructType: + return pa.struct(_schema_to_pyarrow_schema_fields(schema)) + + +def ibis_to_pyarrow_schema(schema: sch.Schema) -> pa.Schema: + return pa.schema(_schema_to_pyarrow_schema_fields(schema)) + + +dt.DataType.to_pyarrow = to_pyarrow_type # type: ignore +sch.Schema.to_pyarrow = ibis_to_pyarrow_schema # type: ignore diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py new file mode 100644 index 000000000000..2ff92be6cd67 --- /dev/null +++ b/ibis/backends/tests/test_export.py @@ -0,0 +1,195 @@ +import sys + +import pyarrow as pa +import pytest +from pytest import param + +# Adds `to_pyarrow` to created schema objects +from ibis.backends.pyarrow.datatypes import sch # noqa: F401 + + +class PackageDiscarder: + def __init__(self): + self.pkgnames = [] + + def find_spec(self, fullname, path, target=None): + if fullname in self.pkgnames: + raise ImportError() + + +@pytest.fixture +def no_pyarrow(backend): + _pyarrow = sys.modules.pop('pyarrow', None) + d = PackageDiscarder() + d.pkgnames.append('pyarrow') + sys.meta_path.insert(0, d) + yield + sys.meta_path.remove(d) + if _pyarrow is not None: + sys.modules["pyarrow"] = _pyarrow + + +limit = [ + param( + 42, + id='limit', + marks=[ + pytest.mark.notimpl( + [ + # limit not implemented for pandas backend execution + "clickhouse", + "dask", + "datafusion", + "impala", + "pandas", + "pyspark", + "snowflake", + ] + ), + ], + ), +] + +no_limit = [ + param( + None, + id='nolimit', + marks=[ + pytest.mark.notimpl( + [ + "clickhouse", + "dask", + "impala", + "pyspark", + "snowflake", + ] + ), + ], + ), +] + +limit_no_limit = limit + no_limit + + +@pytest.mark.notyet( + ["pandas"], reason="DataFrames have no option for outputting in batches" +) +@pytest.mark.parametrize("limit", limit_no_limit) +def test_table_to_pyarrow_batches(limit, backend, awards_players): + batch_reader = awards_players.to_pyarrow_batches(limit=limit) + assert isinstance(batch_reader, pa.RecordBatchReader) + batch = batch_reader.read_next_batch() + assert isinstance(batch, pa.RecordBatch) + if limit is not None: + assert len(batch) == limit + + +@pytest.mark.notyet( + ["pandas"], reason="DataFrames have no option for outputting in batches" +) +@pytest.mark.parametrize("limit", limit_no_limit) +def test_column_to_pyarrow_batches(limit, backend, awards_players): + batch_reader = awards_players.awardID.to_pyarrow_batches(limit=limit) + assert isinstance(batch_reader, pa.RecordBatchReader) + batch = batch_reader.read_next_batch() + assert isinstance(batch, pa.RecordBatch) + if limit is not None: + assert len(batch) == limit + + +@pytest.mark.parametrize("limit", limit_no_limit) +def test_table_to_pyarrow_table(limit, backend, awards_players): + table = awards_players.to_pyarrow(limit=limit) + assert isinstance(table, pa.Table) + if limit is not None: + assert len(table) == limit + + +@pytest.mark.parametrize("limit", limit_no_limit) +def test_column_to_pyarrow_array(limit, backend, awards_players): + array = awards_players.awardID.to_pyarrow(limit=limit) + assert isinstance(array, pa.Array) + if limit is not None: + assert len(array) == limit + + +@pytest.mark.notyet( + ["datafusion"], reason="DataFusion backend doesn't support sum" +) +@pytest.mark.parametrize("limit", no_limit) +def test_scalar_to_pyarrow_scalar(limit, backend, awards_players): + scalar = awards_players.yearID.sum().to_pyarrow(limit=limit) + assert isinstance(scalar, pa.Scalar) + + +@pytest.mark.notimpl(["dask", "clickhouse", "impala", "pyspark"]) +@pytest.mark.notyet( + ["datafusion"], + reason=""" + fields' nullability from frame.schema() is not always consistent with + the first record batch's schema +""", +) +def test_table_to_pyarrow_table_schema(backend, awards_players): + table = awards_players.to_pyarrow() + assert isinstance(table, pa.Table) + assert table.schema == awards_players.schema().to_pyarrow() + + +@pytest.mark.notimpl(["dask", "clickhouse", "impala", "pyspark"]) +def test_column_to_pyarrow_table_schema(backend, awards_players): + expr = awards_players.awardID + array = expr.to_pyarrow() + assert isinstance(array, pa.Array) + assert array.type == expr.type().to_pyarrow() + + +@pytest.mark.notimpl( + ["pandas", "dask", "clickhouse", "impala", "pyspark", "datafusion"] +) +def test_table_pyarrow_batch_chunk_size(backend, awards_players): + batch_reader = awards_players.to_pyarrow_batches( + limit=2050, chunk_size=2048 + ) + assert isinstance(batch_reader, pa.RecordBatchReader) + batch = batch_reader.read_next_batch() + assert isinstance(batch, pa.RecordBatch) + assert len(batch) == 2048 + + +@pytest.mark.notimpl( + ["pandas", "dask", "clickhouse", "impala", "pyspark", "datafusion"] +) +def test_column_pyarrow_batch_chunk_size(backend, awards_players): + batch_reader = awards_players.awardID.to_pyarrow_batches( + limit=2050, chunk_size=2048 + ) + assert isinstance(batch_reader, pa.RecordBatchReader) + batch = batch_reader.read_next_batch() + assert isinstance(batch, pa.RecordBatch) + assert len(batch) == 2048 + + +@pytest.mark.notimpl( + ["pandas", "dask", "clickhouse", "impala", "pyspark", "datafusion"] +) +@pytest.mark.broken( + ["sqlite"], + raises=pa.ArrowException, + reason="Test data has empty strings in columns typed as int64", +) +def test_to_pyarrow_batches_borked_types(backend, batting): + """This is a temporary test to expose an(other) issue with sqlite typing + shenanigans.""" + batch_reader = batting.to_pyarrow_batches(limit=42) + assert isinstance(batch_reader, pa.RecordBatchReader) + batch = batch_reader.read_next_batch() + assert isinstance(batch, pa.RecordBatch) + assert len(batch) == 42 + + +def test_no_pyarrow_message(backend, awards_players, no_pyarrow): + with pytest.raises(ModuleNotFoundError) as excinfo: + awards_players.to_pyarrow() + + assert "requires `pyarrow` but" in str(excinfo.value) diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index f0a7c922bf82..1b9277920fb3 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -2,7 +2,7 @@ import os import webbrowser -from typing import TYPE_CHECKING, Any, Mapping +from typing import TYPE_CHECKING, Any, Iterable, Mapping import toolz from public import public @@ -16,6 +16,8 @@ from ibis.util import UnnamedMarker if TYPE_CHECKING: + import pyarrow as pa + import ibis.expr.types as ir from ibis.backends.base import BaseBackend @@ -306,6 +308,73 @@ def compile( self, limit=limit, timecontext=timecontext, params=params ) + def to_pyarrow_batches( + self, + *, + limit: int | str | None = None, + params: Mapping[ir.Value, Any] | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> Iterable[pa.RecordBatch]: + """Execute expression and return results in an iterator of pyarrow + record batches. + + **Warning**: This method is eager and will execute the associated + expression immediately. This API is experimental and subject to change. + + Parameters + ---------- + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + params + Mapping of scalar parameter expressions to value. + chunk_size + Number of rows in each returned record batch. + + Returns + ------- + record_batches + An iterator of pyarrow record batches. + """ + return self._find_backend().to_pyarrow_batches( + self, + params=params, + limit=limit, + chunk_size=chunk_size, + **kwargs, + ) + + def to_pyarrow( + self, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pa.Table: + """Execute expression and return results in as a pyarrow table. + + **Warning**: This method is eager and will execute the associated + expression immediately. This API is experimental and subject to change. + + + Parameters + ---------- + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + params + Mapping of scalar parameter expressions to value. + + Returns + ------- + Table + A pyarrow table holding the results of the executed expression. + """ + return self._find_backend().to_pyarrow( + self, params=params, limit=limit, **kwargs + ) + unnamed = UnnamedMarker() diff --git a/ibis/expr/types/generic.py b/ibis/expr/types/generic.py index 273b3f9e4ffc..81f27cac3fd4 100644 --- a/ibis/expr/types/generic.py +++ b/ibis/expr/types/generic.py @@ -2,10 +2,6 @@ from typing import TYPE_CHECKING, Any, Iterable, Literal, Sequence -if TYPE_CHECKING: - import ibis.expr.types as ir - import ibis.expr.window as win - from public import public from rich.jupyter import JupyterMixin @@ -15,6 +11,10 @@ import ibis.expr.operations as ops from ibis.expr.types.core import Expr, _binop +if TYPE_CHECKING: + import ibis.expr.types as ir + import ibis.expr.window as win + @public class Value(Expr):