From 6b334545e6b9e0cddd119f2911c55b4b02aa941d Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sat, 14 Oct 2023 06:06:46 -0400 Subject: [PATCH] feat(impala/pyspark): implement `to_pyarrow` --- ibis/backends/impala/__init__.py | 38 ++++++++++ ibis/backends/pyspark/__init__.py | 38 +++++++++- .../tests/test_dataframe_interchange.py | 24 +----- ibis/backends/tests/test_export.py | 74 +++++++------------ 4 files changed, 106 insertions(+), 68 deletions(-) diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index e92f06dc44bd..e465b33cae3d 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -52,9 +52,11 @@ from ibis.formats.pandas import PandasData if TYPE_CHECKING: + from collections.abc import Mapping from pathlib import Path import pandas as pd + import pyarrow as pa __all__ = ( @@ -1378,3 +1380,39 @@ def write_dataframe( """ writer = DataFrameWriter(self, df) return writer.write_csv(path) + + def to_pyarrow( + self, + expr: ir.Expr, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pa.Table: + import pyarrow as pa + + from ibis.formats.pyarrow import PyArrowData + + table_expr = expr.as_table() + output = pa.Table.from_pandas( + self.execute(table_expr, params=params, limit=limit, **kwargs), + preserve_index=False, + ) + table = PyArrowData.convert_table(output, table_expr.schema()) + return expr.__pyarrow_result__(table) + + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1000000, + **kwargs: Any, + ) -> pa.ipc.RecordBatchReader: + pa = self._import_pyarrow() + pa_table = self.to_pyarrow( + expr.as_table(), params=params, limit=limit, **kwargs + ) + return pa.RecordBatchReader.from_batches( + pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size) + ) diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 9599fde7a73b..7d65da4c2f03 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -33,7 +33,7 @@ from ibis.backends.pyspark.datatypes import PySparkType if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Mapping, Sequence import pandas as pd import pyarrow as pa @@ -816,3 +816,39 @@ def to_delta( PySpark Delta Lake table write arguments. https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.save.html """ expr.compile().write.format("delta").save(os.fspath(path), **kwargs) + + def to_pyarrow( + self, + expr: ir.Expr, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pa.Table: + import pyarrow as pa + + from ibis.formats.pyarrow import PyArrowData + + table_expr = expr.as_table() + output = pa.Table.from_pandas( + self.execute(table_expr, params=params, limit=limit, **kwargs), + preserve_index=False, + ) + table = PyArrowData.convert_table(output, table_expr.schema()) + return expr.__pyarrow_result__(table) + + def to_pyarrow_batches( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1000000, + **kwargs: Any, + ) -> pa.ipc.RecordBatchReader: + pa = self._import_pyarrow() + pa_table = self.to_pyarrow( + expr.as_table(), params=params, limit=limit, **kwargs + ) + return pa.RecordBatchReader.from_batches( + pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size) + ) diff --git a/ibis/backends/tests/test_dataframe_interchange.py b/ibis/backends/tests/test_dataframe_interchange.py index 5631d550045e..8ecbc32ba4ee 100644 --- a/ibis/backends/tests/test_dataframe_interchange.py +++ b/ibis/backends/tests/test_dataframe_interchange.py @@ -2,21 +2,14 @@ import pyarrow as pa import pytest +from packaging.version import parse as vparse -pytestmark = [ - pytest.mark.skipif(pa.__version__ < "12.", reason="pyarrow >= 12 required"), - pytest.mark.notimpl( - ["pyspark"], - raises=NotImplementedError, - reason="PySpark doesn't implement fetchmany", - ), -] +pytestmark = pytest.mark.skipif( + vparse(pa.__version__) < vparse("12"), reason="pyarrow >= 12 required" +) @pytest.mark.notimpl(["druid"]) -@pytest.mark.notimpl( - ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" -) def test_dataframe_interchange_no_execute(con, alltypes, mocker): t = alltypes.select("int_col", "double_col", "string_col") pa_df = t.to_pyarrow().__dataframe__() @@ -60,9 +53,6 @@ def test_dataframe_interchange_no_execute(con, alltypes, mocker): assert not to_pyarrow.called -@pytest.mark.notimpl( - ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" -) def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker): t = alltypes.select("int_col", "double_col", "string_col") pa_df = t.to_pyarrow().__dataframe__() @@ -80,9 +70,6 @@ def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker): @pytest.mark.notimpl(["druid"]) -@pytest.mark.notimpl( - ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" -) def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): t = alltypes.select("int_col", "double_col", "string_col") pa_df = t.to_pyarrow().__dataframe__() @@ -111,9 +98,6 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): assert col2.size() == pa_col2.size() -@pytest.mark.notimpl( - ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" -) def test_dataframe_interchange_select_after_execution_no_reexecute( con, alltypes, mocker ): diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index b397dd464117..662f8c0e9468 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -5,7 +5,6 @@ import pyarrow as pa import pytest import sqlalchemy as sa -from packaging.version import parse as vparse from pytest import param import ibis @@ -14,15 +13,16 @@ from ibis.formats.pyarrow import PyArrowType try: - from pyspark.sql.utils import ParseException + from pyspark.sql.utils import AnalysisException except ImportError: - ParseException = None + AnalysisException = None try: from deltalake import PyDeltaTableError except ImportError: PyDeltaTableError = None + limit = [ param( 42, @@ -33,7 +33,6 @@ # limit not implemented for pandas backend execution "dask", "datafusion", - "impala", "pandas", "pyspark", ] @@ -42,9 +41,7 @@ ), ] -no_limit = [ - param(None, id="nolimit", marks=[pytest.mark.notimpl(["impala", "pyspark"])]) -] +no_limit = [param(None, id="nolimit")] limit_no_limit = limit + no_limit @@ -108,19 +105,12 @@ def test_scalar_to_pyarrow_scalar(limit, awards_players): assert isinstance(scalar, pa.Scalar) -ARROW_STRING_TYPE = { - # duckdb changed to pa.large_string in 0.8.0 and then reverted that change - # back to pa.string in 0.8.1 - ("duckdb", vparse("0.8.0")): pa.large_string() -} - - -@pytest.mark.notimpl(["impala", "pyspark", "druid"]) -def test_table_to_pyarrow_table_schema(con, awards_players): +@pytest.mark.notimpl(["druid"]) +def test_table_to_pyarrow_table_schema(awards_players): table = awards_players.to_pyarrow() assert isinstance(table, pa.Table) - string = ARROW_STRING_TYPE.get((con.name, vparse(con.version)), pa.string()) + string = pa.string() expected_schema = pa.schema( [ pa.field("playerID", string), @@ -134,7 +124,6 @@ def test_table_to_pyarrow_table_schema(con, awards_players): assert table.schema == expected_schema -@pytest.mark.notimpl(["impala", "pyspark"]) def test_column_to_pyarrow_table_schema(awards_players): expr = awards_players.awardID array = expr.to_pyarrow() @@ -142,7 +131,7 @@ def test_column_to_pyarrow_table_schema(awards_players): assert array.type == pa.string() or array.type == pa.large_string() -@pytest.mark.notimpl(["pandas", "dask", "impala", "pyspark", "datafusion"]) +@pytest.mark.notimpl(["pandas", "dask", "datafusion"]) @pytest.mark.notyet( ["clickhouse"], raises=AssertionError, @@ -157,7 +146,7 @@ def test_table_pyarrow_batch_chunk_size(awards_players): util.consume(batch_reader) -@pytest.mark.notimpl(["pandas", "dask", "impala", "pyspark", "datafusion"]) +@pytest.mark.notimpl(["pandas", "dask", "datafusion"]) @pytest.mark.notyet( ["clickhouse"], raises=AssertionError, @@ -174,7 +163,10 @@ def test_column_pyarrow_batch_chunk_size(awards_players): util.consume(batch_reader) -@pytest.mark.notimpl(["pandas", "dask", "impala", "pyspark", "datafusion"]) +@pytest.mark.notimpl(["pandas", "dask", "datafusion"]) +@pytest.mark.broken( + ["pyspark"], raises=AssertionError, reason="chunk_size isn't respected" +) @pytest.mark.broken( ["sqlite"], raises=pa.ArrowException, @@ -191,7 +183,6 @@ def test_to_pyarrow_batches_borked_types(batting): util.consume(batch_reader) -@pytest.mark.notimpl(["impala", "pyspark"]) def test_to_pyarrow_memtable(con): expr = ibis.memtable({"x": [1, 2, 3]}) table = con.to_pyarrow(expr) @@ -199,7 +190,6 @@ def test_to_pyarrow_memtable(con): assert len(table) == 3 -@pytest.mark.notimpl(["impala", "pyspark"]) def test_to_pyarrow_batches_memtable(con): expr = ibis.memtable({"x": [1, 2, 3]}) n = 0 @@ -210,7 +200,6 @@ def test_to_pyarrow_batches_memtable(con): assert n == 3 -@pytest.mark.notimpl(["impala", "pyspark"]) def test_table_to_parquet(tmp_path, backend, awards_players): outparquet = tmp_path / "out.parquet" awards_players.to_parquet(outparquet) @@ -224,22 +213,23 @@ def test_table_to_parquet(tmp_path, backend, awards_players): [ "bigquery", "clickhouse", + "dask", "datafusion", + "impala", "mssql", "mysql", "oracle", "pandas", "polars", "postgres", + "pyspark", "snowflake", "sqlite", "trino", ], reason="no partitioning support", ) -@pytest.mark.notimpl( - ["dask", "impala", "pyspark", "druid"], reason="No to_parquet support" -) +@pytest.mark.notimpl(["druid"], reason="No to_parquet support") def test_roundtrip_partitioned_parquet(tmp_path, con, backend, awards_players): outparquet = tmp_path / "outhive.parquet" awards_players.to_parquet(outparquet, partition_by="yearID") @@ -263,7 +253,6 @@ def test_roundtrip_partitioned_parquet(tmp_path, con, backend, awards_players): backend.assert_frame_equal(reingest.to_pandas(), awards_players.to_pandas()) -@pytest.mark.notimpl(["impala", "pyspark"], reason="No support for exporting files") @pytest.mark.parametrize("ftype", ["csv", "parquet"]) def test_memtable_to_file(tmp_path, con, ftype, monkeypatch): """ @@ -284,7 +273,6 @@ def test_memtable_to_file(tmp_path, con, ftype, monkeypatch): assert outfile.is_file() -@pytest.mark.notimpl(["impala", "pyspark"]) def test_table_to_csv(tmp_path, backend, awards_players): outcsv = tmp_path / "out.csv" @@ -305,13 +293,7 @@ def test_table_to_csv(tmp_path, backend, awards_players): dt.Decimal(38, 9), pa.Decimal128Type, id="decimal128", - marks=[ - pytest.mark.broken( - ["impala"], raises=AttributeError, reason="fetchmany doesn't exist" - ), - pytest.mark.notyet(["druid"], raises=sa.exc.ProgrammingError), - pytest.mark.notyet(["pyspark"], raises=NotImplementedError), - ], + marks=[pytest.mark.notyet(["druid"], raises=sa.exc.ProgrammingError)], ), param( dt.Decimal(76, 38), @@ -325,7 +307,11 @@ def test_table_to_csv(tmp_path, backend, awards_players): ), pytest.mark.notyet(["oracle"], raises=sa.exc.DatabaseError), pytest.mark.notyet(["mssql", "mysql"], raises=sa.exc.OperationalError), - pytest.mark.notyet(["pyspark"], raises=ParseException), + pytest.mark.notyet( + ["pyspark"], + raises=AnalysisException, + reason="precision is out of range", + ), ], ), ], @@ -388,12 +374,6 @@ def test_roundtrip_delta(con, alltypes, tmp_path, monkeypatch): ["druid"], raises=AttributeError, reason="string type is used for timestamp_col" ) @pytest.mark.notimpl(["mssql"], raises=pa.ArrowTypeError) -@pytest.mark.notimpl( - ["pyspark"], raises=NotImplementedError, reason="fetchmany not implemented" -) -@pytest.mark.notimpl( - ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" -) def test_arrow_timestamp_with_time_zone(alltypes): t = alltypes.select( tz=alltypes.timestamp_col.cast( @@ -440,7 +420,7 @@ def test_empty_memtable(backend, con): backend.assert_frame_equal(result, expected) -@pytest.mark.notimpl(["flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["flink"]) def test_to_pandas_batches_empty_table(backend, con): t = backend.functional_alltypes.limit(0) n = t.count().execute() @@ -449,7 +429,7 @@ def test_to_pandas_batches_empty_table(backend, con): assert sum(map(len, t.to_pandas_batches())) == n -@pytest.mark.notimpl(["druid", "flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["druid", "flink"]) @pytest.mark.parametrize("n", [None, 1]) def test_to_pandas_batches_nonempty_table(backend, con, n): t = backend.functional_alltypes.limit(n) @@ -459,7 +439,7 @@ def test_to_pandas_batches_nonempty_table(backend, con, n): assert sum(map(len, t.to_pandas_batches())) == n -@pytest.mark.notimpl(["flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["flink"]) @pytest.mark.parametrize("n", [None, 0, 1, 2]) def test_to_pandas_batches_column(backend, con, n): t = backend.functional_alltypes.limit(n).timestamp_col @@ -469,7 +449,7 @@ def test_to_pandas_batches_column(backend, con, n): assert sum(map(len, t.to_pandas_batches())) == n -@pytest.mark.notimpl(["druid", "flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["druid", "flink"]) def test_to_pandas_batches_scalar(backend, con): t = backend.functional_alltypes.timestamp_col.max() expected = t.execute()