Skip to content

Commit

Permalink
feat: add to_pyarrow and to_pyarrow_batches
Browse files Browse the repository at this point in the history
Adds `to_pyarrow` and `to_pyarrow_batches` to the `BaseBackend`.

`to_pyarrow` returns pyarrow objects consistent with the dimension of
the output:
  - a table -> pa.Table
  - a column -> pa.Array
  - a scalar -> pa.Scalar

`to_pyarrow_batches` returns a RecordBatchReader that returns batches of
pyarrow tables.  It does not have the same dimension handling because
that is not available in RecordBatchReaders.

`to_pyarrow_batches` is implemented for `AlchemyBackend`, `datafusion`,
and `duckdb`.

The `pandas` backend has `to_pyarrow` implemented by using
`pandas.DataFrame.to_pyarrow()`.

Backends that do not require pyarrow already will only require it when
using to_pyarrow* methods.

There are warnings on these methods to indicate that they are
experimental and that they may break in the future irrespective of
semantic versioning.

The DuckDB `to_pyarrow_batches` makes use of a proxy object to escape
garbage collection so that the underlying record batches are still
available even after the `cursor` used to generate them _would have_
been garbage collected (but isn't because it is embedded in the proxy
object)
  • Loading branch information
gforsyth authored and kszucs committed Oct 5, 2022
1 parent 522db9c commit a059cf9
Show file tree
Hide file tree
Showing 9 changed files with 542 additions and 25 deletions.
61 changes: 60 additions & 1 deletion ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa

import ibis
import ibis.common.exceptions as exc
Expand Down Expand Up @@ -206,7 +207,65 @@ def _ipython_key_completions_(self) -> list[str]:
return self._backend.list_tables()


class BaseBackend(abc.ABC):
# should have a better name
class ResultHandler:
@staticmethod
def _import_pyarrow():
try:
import pyarrow
except ImportError:
raise ModuleNotFoundError(
"Exporting to arrow formats requires `pyarrow` but it is not installed" # noqa: ignore
)
else:
return pyarrow

def to_pyarrow(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
) -> pa.Table:
pa = self._import_pyarrow()
try:
# Can't construct an array from record batches
# so construct at one column table (if applicable)
# then return the column _from_ the table
table = pa.Table.from_batches(
self.to_pyarrow_batches(
expr, params=params, limit=limit, **kwargs
)
)
if isinstance(expr, ir.Table):
return table
elif isinstance(expr, ir.Column):
# Column will be a ChunkedArray, `combine_chunks` will
# flatten it
return table.columns[0].combine_chunks()
elif isinstance(expr, ir.Scalar):
return table.columns[0].combine_chunks()[0]
else:
raise ValueError
except ValueError:
# The pyarrow batches iterator is empty so pass in an empty
# iterator and the pyarrow schema
return pa.Table.from_batches([], schema=expr.schema().to_pyarrow())

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> pa.RecordBatchReader:
raise NotImplementedError


class BaseBackend(abc.ABC, ResultHandler):
"""Base backend class.
All Ibis backends must subclass this class and implement all the
Expand Down
56 changes: 53 additions & 3 deletions ibis/backends/base/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import contextlib
import os
from functools import lru_cache
from typing import Any, Mapping
from typing import TYPE_CHECKING, Any, Iterable, Mapping

import sqlalchemy as sa

Expand All @@ -17,6 +17,9 @@
from ibis.backends.base.sql.compiler import Compiler
from ibis.expr.typing import TimeContext

if TYPE_CHECKING:
import pyarrow as pa

__all__ = [
'BaseSQLBackend',
]
Expand Down Expand Up @@ -127,8 +130,6 @@ def raw_sql(self, query: str) -> Any:
Any
Backend cursor
"""
# TODO results is unused, it can be removed
# (requires updating Impala tests)
# TODO `self.con` is assumed to be defined in subclasses, but there
# is nothing that enforces it. We should find a way to make sure
# `self.con` is always a DBAPI2 connection, or raise an error
Expand All @@ -141,6 +142,55 @@ def raw_sql(self, query: str) -> Any:
def _safe_raw_sql(self, *args, **kwargs):
yield self.raw_sql(*args, **kwargs)

def _cursor_batches(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
) -> Iterable[list]:
query_ast = self.compiler.to_ast_ensure_limit(
expr, limit, params=params
)
sql = query_ast.compile()

with self._safe_raw_sql(sql) as cursor:
while batch := cursor.fetchmany(chunk_size):
yield batch

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> pa.RecordBatchReader:
pa = self._import_pyarrow()

from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct

if hasattr(expr, "schema"):
schema = expr.schema()
else:
# ColumnExpr has no schema method, define single-column schema
schema = sch.schema([(expr.get_name(), expr.type())])

def _batches():
for batch in self._cursor_batches(
expr, params=params, limit=limit, chunk_size=chunk_size
):
struct_array = pa.array(
map(tuple, batch),
type=ibis_to_pyarrow_struct(schema),
)
yield pa.RecordBatch.from_struct_array(struct_array)

return pa.RecordBatchReader.from_batches(
schema.to_pyarrow(), _batches()
)

def execute(
self,
expr: ir.Expr,
Expand Down
57 changes: 45 additions & 12 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
except ImportError:
from datafusion import SessionContext

import datafusion


def _to_pyarrow_table(frame):
batches = frame.collect()
Expand Down Expand Up @@ -155,23 +157,19 @@ def register_parquet(
"""
self._context.register_parquet(name, str(path))

def execute(
def _get_frame(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] = None,
limit: str = 'default',
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
):
) -> datafusion.DataFrame:
if isinstance(expr, ir.Table):
frame = self.compile(expr, params, **kwargs)
table = _to_pyarrow_table(frame)
return table.to_pandas()
return self.compile(expr, params, **kwargs)
elif isinstance(expr, ir.Column):
# expression must be named for the projection
expr = expr.name('tmp').to_projection()
frame = self.compile(expr, params, **kwargs)
table = _to_pyarrow_table(frame)
return table['tmp'].to_pandas()
return self.compile(expr, params, **kwargs)
elif isinstance(expr, ir.Scalar):
if an.find_immediate_parent_tables(expr.op()):
# there are associated datafusion tables so convert the expr
Expand All @@ -184,8 +182,43 @@ def execute(
# dummy datafusion table
compiled = self.compile(expr, params, **kwargs)
frame = self._context.empty_table().select(compiled)
table = _to_pyarrow_table(frame)
return table[0][0].as_py()
return frame
else:
raise com.IbisError(
f"Cannot execute expression of type: {type(expr)}"
)

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
) -> pa.RecordBatchReader:
pa = self._import_pyarrow()
frame = self._get_frame(expr, params, limit, **kwargs)
return pa.RecordBatchReader.from_batches(
frame.schema(), frame.collect()
)

def execute(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] = None,
limit: int | str | None = "default",
**kwargs: Any,
):
output = self.to_pyarrow(expr, params=params, limit=limit, **kwargs)
if isinstance(expr, ir.Table):
return output.to_pandas()
elif isinstance(expr, ir.Column):
series = output.to_pandas()
series.name = "tmp"
return series
elif isinstance(expr, ir.Scalar):
return output.as_py()
else:
raise com.IbisError(
f"Cannot execute expression of type: {type(expr)}"
Expand Down
79 changes: 76 additions & 3 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import os
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, MutableMapping
from typing import TYPE_CHECKING, Any, Iterator, Mapping, MutableMapping

import pandas as pd
import pyarrow as pa
import pyarrow.types as pat
import sqlalchemy as sa
import toolz
Expand All @@ -19,10 +20,9 @@

if TYPE_CHECKING:
import duckdb
import ibis.expr.types as ir
import pyarrow as pa

import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis.backends.base.sql.alchemy import BaseAlchemyBackend
from ibis.backends.duckdb.compiler import DuckDBSQLCompiler
from ibis.backends.duckdb.datatypes import parse
Expand Down Expand Up @@ -213,6 +213,57 @@ def register(

return self.table(table_name)

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
) -> Iterator[pa.RecordBatch]:
_ = self._import_pyarrow()
query_ast = self.compiler.to_ast_ensure_limit(
expr, limit, params=params
)
sql = query_ast.compile()

cursor = self.raw_sql(sql)

_reader = cursor.cursor.fetch_record_batch(chunk_size=chunk_size)
# Horrible hack to make sure cursor isn't garbage collected
# before batches are streamed out of the RecordBatchReader
batches = IbisRecordBatchReader(_reader, cursor)
return batches
# TODO: duckdb seems to not care about the `chunk_size` argument
# and returns batches in 1024 row chunks

def to_pyarrow(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
) -> pa.Table:
_ = self._import_pyarrow()
query_ast = self.compiler.to_ast_ensure_limit(
expr, limit, params=params
)
sql = query_ast.compile()

cursor = self.raw_sql(sql)
table = cursor.cursor.fetch_arrow_table()

if isinstance(expr, ir.Table):
return table
elif isinstance(expr, ir.Column):
# Column will be a ChunkedArray, `combine_chunks` will
# flatten it
return table.columns[0].combine_chunks()
elif isinstance(expr, ir.Scalar):
return table.columns[0].combine_chunks()[0]
else:
raise ValueError

def fetch_from_cursor(
self,
cursor: duckdb.DuckDBPyConnection,
Expand Down Expand Up @@ -294,3 +345,25 @@ def _get_temp_view_definition(
definition: sa.sql.compiler.Compiled,
) -> str:
return f"CREATE OR REPLACE TEMPORARY VIEW {name} AS {definition}"


class IbisRecordBatchReader(pa.RecordBatchReader):
def __init__(self, reader, cursor):
self.reader = reader
self.cursor = cursor

def close(self):
self.reader.close()
del self.cursor

def read_all(self):
return self.reader.read_all()

def read_next_batch(self):
return self.reader.read_next_batch()

def read_pandas(self):
return self.reader.read_pandas()

def schema(self):
return self.reader.schema
21 changes: 20 additions & 1 deletion ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import importlib
from functools import lru_cache
from typing import Any, MutableMapping
from typing import TYPE_CHECKING, Any, Mapping, MutableMapping

import pandas as pd

Expand All @@ -18,6 +18,9 @@
ibis_schema_to_pandas,
)

if TYPE_CHECKING:
import pyarrow as pa


class BasePandasBackend(BaseBackend):
"""Base class for backends based on pandas."""
Expand Down Expand Up @@ -192,6 +195,22 @@ class Backend(BasePandasBackend):
database_class = PandasDatabase
table_class = PandasTable

def to_pyarrow(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
) -> pa.Table:
pa = self._import_pyarrow()
output = self.execute(expr, params=params, limit=limit)

if isinstance(output, pd.DataFrame):
return pa.Table.from_pandas(output)
elif isinstance(output, pd.Series):
return pa.Array.from_pandas(output)
else:
return pa.scalar(output)

def execute(self, query, params=None, limit='default', **kwargs):
from ibis.backends.pandas.core import execute_and_reset

Expand Down
Loading

0 comments on commit a059cf9

Please sign in to comment.