From 740778fafc78f291966f3951a1ce00531cacb0c1 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sat, 9 Sep 2023 05:53:14 -0400 Subject: [PATCH] feat(api): add `to_pandas_batches` --- ibis/backends/base/__init__.py | 44 +++++++++++++++++++++++++++++ ibis/backends/snowflake/__init__.py | 23 +++++++++++++++ ibis/backends/tests/test_export.py | 41 +++++++++++++++++++++++++++ ibis/expr/types/core.py | 40 +++++++++++++++++++++++++- 4 files changed, 147 insertions(+), 1 deletion(-) diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index ac3702776afe..5143a67e388d 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -22,6 +22,7 @@ import ibis.expr.types as ir from ibis import util from ibis.common.caching import RefCountedCache +from ibis.formats.pandas import PandasData if TYPE_CHECKING: from collections.abc import Iterable, Iterator, Mapping, MutableMapping @@ -234,6 +235,49 @@ def _import_pyarrow(): else: return pyarrow + def to_pandas_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> Iterator[pd.DataFrame | pd.Series | Any]: + """Execute an Ibis expression and return an iterator of pandas `DataFrame`s. + + Parameters + ---------- + expr + Ibis expression to execute + params + Mapping of scalar parameter expressions to value. + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + chunk_size + Maximum number of rows in each returned record batch. This may have + no effect depending on the backend. + kwargs + Keyword arguments + + Returns + ------- + Iterator[pd.DataFrame] + An iterator pandas `DataFrame`s + """ + orig_expr = expr + expr = expr.as_table() + schema = expr.schema() + yield from ( + orig_expr.__pandas_result__( + PandasData.convert_table(batch.to_pandas(), schema) + ) + for batch in self.to_pyarrow_batches( + expr, params=params, limit=limit, chunk_size=chunk_size, **kwargs + ) + ) + @util.experimental def to_pyarrow( self, diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index f875ef66cda2..9638e8a83321 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import functools import glob import inspect import itertools @@ -374,6 +375,28 @@ def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: df = table.to_pandas(timestamp_as_object=True) return SnowflakePandasData.convert_table(df, schema) + def to_pandas_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **_: Any, + ) -> Iterator[pd.DataFrame | pd.Series | Any]: + self._run_pre_execute_hooks(expr) + query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params) + sql = query_ast.compile() + target_schema = expr.as_table().schema() + converter = functools.partial( + SnowflakePandasData.convert_table, schema=target_schema + ) + + with self.begin() as con, contextlib.closing(con.execute(sql)) as cur: + yield from map( + expr.__pandas_result__, + map(converter, cur.cursor.fetch_pandas_batches()), + ) + def to_pyarrow_batches( self, expr: ir.Expr, diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index 7d8c4bbc63ab..a794c1ff88c8 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -445,3 +445,44 @@ def test_empty_memtable(backend, con): table = ibis.memtable(expected) result = con.execute(table) backend.assert_frame_equal(result, expected) + + +@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"]) +def test_to_pandas_batches_empty_table(backend, con): + t = backend.functional_alltypes.limit(0) + n = t.count().execute() + + assert sum(map(len, con.to_pandas_batches(t))) == n + assert sum(map(len, t.to_pandas_batches())) == n + + +@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"]) +@pytest.mark.parametrize("n", [None, 1]) +def test_to_pandas_batches_nonempty_table(backend, con, n): + t = backend.functional_alltypes.limit(n) + n = t.count().execute() + + assert sum(map(len, con.to_pandas_batches(t))) == n + assert sum(map(len, t.to_pandas_batches())) == n + + +@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"]) +@pytest.mark.parametrize("n", [None, 0, 1, 2]) +def test_to_pandas_batches_column(backend, con, n): + t = backend.functional_alltypes.limit(n).timestamp_col + n = t.count().execute() + + assert sum(map(len, con.to_pandas_batches(t))) == n + assert sum(map(len, t.to_pandas_batches())) == n + + +@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"]) +def test_to_pandas_batches_scalar(backend, con): + t = backend.functional_alltypes.timestamp_col.max() + expected = t.execute() + + result1 = list(con.to_pandas_batches(t)) + assert result1 == [expected] + + result2 = list(t.to_pandas_batches()) + assert result2 == [expected] diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index 6f3612cbfd8f..5aafe5f87a86 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -3,7 +3,7 @@ import contextlib import os import webbrowser -from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Tuple +from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Tuple, Iterator from public import public from rich.jupyter import JupyterMixin @@ -422,6 +422,44 @@ def to_pyarrow( self, params=params, limit=limit, **kwargs ) + @experimental + def to_pandas_batches( + self, + *, + limit: int | str | None = None, + params: Mapping[ir.Value, Any] | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ) -> Iterator[pd.DataFrame | pd.Series | Any]: + """Execute expression and return an iterator of pandas DataFrames. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + params + Mapping of scalar parameter expressions to value. + chunk_size + Maximum number of rows in each returned batch. + kwargs + Keyword arguments + + Returns + ------- + Iterator[pd.DataFrame] + """ + return self._find_backend(use_default=True).to_pandas_batches( + self, + params=params, + limit=limit, + chunk_size=chunk_size, + **kwargs, + ) + @experimental def to_parquet( self,