Skip to content

Commit

Permalink
feat(impala/pyspark): implement to_pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and kszucs committed Oct 16, 2023
1 parent 2d36722 commit 6b33454
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 68 deletions.
38 changes: 38 additions & 0 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@
from ibis.formats.pandas import PandasData

if TYPE_CHECKING:
from collections.abc import Mapping
from pathlib import Path

import pandas as pd
import pyarrow as pa


__all__ = (
Expand Down Expand Up @@ -1378,3 +1380,39 @@ def write_dataframe(
"""
writer = DataFrameWriter(self, df)
return writer.write_csv(path)

def to_pyarrow(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
) -> pa.Table:
import pyarrow as pa

from ibis.formats.pyarrow import PyArrowData

table_expr = expr.as_table()
output = pa.Table.from_pandas(
self.execute(table_expr, params=params, limit=limit, **kwargs),
preserve_index=False,
)
table = PyArrowData.convert_table(output, table_expr.schema())
return expr.__pyarrow_result__(table)

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1000000,
**kwargs: Any,
) -> pa.ipc.RecordBatchReader:
pa = self._import_pyarrow()
pa_table = self.to_pyarrow(
expr.as_table(), params=params, limit=limit, **kwargs
)
return pa.RecordBatchReader.from_batches(
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
)
38 changes: 37 additions & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from ibis.backends.pyspark.datatypes import PySparkType

if TYPE_CHECKING:
from collections.abc import Sequence
from collections.abc import Mapping, Sequence

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -816,3 +816,39 @@ def to_delta(
PySpark Delta Lake table write arguments. https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.save.html
"""
expr.compile().write.format("delta").save(os.fspath(path), **kwargs)

def to_pyarrow(
self,
expr: ir.Expr,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
) -> pa.Table:
import pyarrow as pa

from ibis.formats.pyarrow import PyArrowData

table_expr = expr.as_table()
output = pa.Table.from_pandas(
self.execute(table_expr, params=params, limit=limit, **kwargs),
preserve_index=False,
)
table = PyArrowData.convert_table(output, table_expr.schema())
return expr.__pyarrow_result__(table)

def to_pyarrow_batches(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1000000,
**kwargs: Any,
) -> pa.ipc.RecordBatchReader:
pa = self._import_pyarrow()
pa_table = self.to_pyarrow(
expr.as_table(), params=params, limit=limit, **kwargs
)
return pa.RecordBatchReader.from_batches(
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
)
24 changes: 4 additions & 20 deletions ibis/backends/tests/test_dataframe_interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,14 @@

import pyarrow as pa
import pytest
from packaging.version import parse as vparse

pytestmark = [
pytest.mark.skipif(pa.__version__ < "12.", reason="pyarrow >= 12 required"),
pytest.mark.notimpl(
["pyspark"],
raises=NotImplementedError,
reason="PySpark doesn't implement fetchmany",
),
]
pytestmark = pytest.mark.skipif(
vparse(pa.__version__) < vparse("12"), reason="pyarrow >= 12 required"
)


@pytest.mark.notimpl(["druid"])
@pytest.mark.notimpl(
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
)
def test_dataframe_interchange_no_execute(con, alltypes, mocker):
t = alltypes.select("int_col", "double_col", "string_col")
pa_df = t.to_pyarrow().__dataframe__()
Expand Down Expand Up @@ -60,9 +53,6 @@ def test_dataframe_interchange_no_execute(con, alltypes, mocker):
assert not to_pyarrow.called


@pytest.mark.notimpl(
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
)
def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker):
t = alltypes.select("int_col", "double_col", "string_col")
pa_df = t.to_pyarrow().__dataframe__()
Expand All @@ -80,9 +70,6 @@ def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker):


@pytest.mark.notimpl(["druid"])
@pytest.mark.notimpl(
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
)
def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker):
t = alltypes.select("int_col", "double_col", "string_col")
pa_df = t.to_pyarrow().__dataframe__()
Expand Down Expand Up @@ -111,9 +98,6 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker):
assert col2.size() == pa_col2.size()


@pytest.mark.notimpl(
["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor"
)
def test_dataframe_interchange_select_after_execution_no_reexecute(
con, alltypes, mocker
):
Expand Down
Loading

0 comments on commit 6b33454

Please sign in to comment.