Skip to content

Commit

Permalink
feat(api): add to_pandas_batches
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 28, 2023
1 parent be7e53a commit 740778f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 1 deletion.
44 changes: 44 additions & 0 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ibis.expr.types as ir
from ibis import util
from ibis.common.caching import RefCountedCache
from ibis.formats.pandas import PandasData

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Mapping, MutableMapping
Expand Down Expand Up @@ -234,6 +235,49 @@ def _import_pyarrow():
else:
return pyarrow

def to_pandas_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> Iterator[pd.DataFrame | pd.Series | Any]:
"""Execute an Ibis expression and return an iterator of pandas `DataFrame`s.
Parameters
----------
expr
Ibis expression to execute
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
chunk_size
Maximum number of rows in each returned record batch. This may have
no effect depending on the backend.
kwargs
Keyword arguments
Returns
-------
Iterator[pd.DataFrame]
An iterator pandas `DataFrame`s
"""
orig_expr = expr
expr = expr.as_table()
schema = expr.schema()
yield from (
orig_expr.__pandas_result__(
PandasData.convert_table(batch.to_pandas(), schema)
)
for batch in self.to_pyarrow_batches(
expr, params=params, limit=limit, chunk_size=chunk_size, **kwargs
)
)

@util.experimental
def to_pyarrow(
self,
Expand Down
23 changes: 23 additions & 0 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import functools
import glob
import inspect
import itertools
Expand Down Expand Up @@ -374,6 +375,28 @@ def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
df = table.to_pandas(timestamp_as_object=True)
return SnowflakePandasData.convert_table(df, schema)

def to_pandas_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**_: Any,
) -> Iterator[pd.DataFrame | pd.Series | Any]:
self._run_pre_execute_hooks(expr)
query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params)
sql = query_ast.compile()
target_schema = expr.as_table().schema()
converter = functools.partial(
SnowflakePandasData.convert_table, schema=target_schema
)

with self.begin() as con, contextlib.closing(con.execute(sql)) as cur:
yield from map(
expr.__pandas_result__,
map(converter, cur.cursor.fetch_pandas_batches()),
)

def to_pyarrow_batches(
self,
expr: ir.Expr,
Expand Down
41 changes: 41 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,44 @@ def test_empty_memtable(backend, con):
table = ibis.memtable(expected)
result = con.execute(table)
backend.assert_frame_equal(result, expected)


@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"])
def test_to_pandas_batches_empty_table(backend, con):
t = backend.functional_alltypes.limit(0)
n = t.count().execute()

assert sum(map(len, con.to_pandas_batches(t))) == n
assert sum(map(len, t.to_pandas_batches())) == n


@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
@pytest.mark.parametrize("n", [None, 1])
def test_to_pandas_batches_nonempty_table(backend, con, n):
t = backend.functional_alltypes.limit(n)
n = t.count().execute()

assert sum(map(len, con.to_pandas_batches(t))) == n
assert sum(map(len, t.to_pandas_batches())) == n


@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"])
@pytest.mark.parametrize("n", [None, 0, 1, 2])
def test_to_pandas_batches_column(backend, con, n):
t = backend.functional_alltypes.limit(n).timestamp_col
n = t.count().execute()

assert sum(map(len, con.to_pandas_batches(t))) == n
assert sum(map(len, t.to_pandas_batches())) == n


@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"])
def test_to_pandas_batches_scalar(backend, con):
t = backend.functional_alltypes.timestamp_col.max()
expected = t.execute()

result1 = list(con.to_pandas_batches(t))
assert result1 == [expected]

result2 = list(t.to_pandas_batches())
assert result2 == [expected]
40 changes: 39 additions & 1 deletion ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import contextlib
import os
import webbrowser
from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Tuple
from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Tuple, Iterator

from public import public
from rich.jupyter import JupyterMixin
Expand Down Expand Up @@ -422,6 +422,44 @@ def to_pyarrow(
self, params=params, limit=limit, **kwargs
)

@experimental
def to_pandas_batches(
self,
*,
limit: int | str | None = None,
params: Mapping[ir.Value, Any] | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> Iterator[pd.DataFrame | pd.Series | Any]:
"""Execute expression and return an iterator of pandas DataFrames.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
params
Mapping of scalar parameter expressions to value.
chunk_size
Maximum number of rows in each returned batch.
kwargs
Keyword arguments
Returns
-------
Iterator[pd.DataFrame]
"""
return self._find_backend(use_default=True).to_pandas_batches(
self,
params=params,
limit=limit,
chunk_size=chunk_size,
**kwargs,
)

@experimental
def to_parquet(
self,
Expand Down

0 comments on commit 740778f

Please sign in to comment.