From 2d36722709b902b17d9f806988d495ef8f4f4f7b Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sat, 14 Oct 2023 06:00:04 -0400 Subject: [PATCH] feat(dask): enable pyarrow conversion --- ibis/backends/base/__init__.py | 2 +- ibis/backends/dask/__init__.py | 7 ++++ ibis/backends/pandas/__init__.py | 8 ++--- .../tests/test_dataframe_interchange.py | 6 ++-- ibis/backends/tests/test_export.py | 33 ++++++++----------- 5 files changed, 27 insertions(+), 29 deletions(-) diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index 9e00231f4d95..39538bd7215a 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -945,7 +945,7 @@ def _run_pre_execute_hooks(self, expr: ir.Expr) -> None: self._register_in_memory_tables(expr) def _define_udf_translation_rules(self, expr): - if self.supports_in_memory_tables: + if self.supports_python_udfs: raise NotImplementedError(self.name) def compile( diff --git a/ibis/backends/dask/__init__.py b/ibis/backends/dask/__init__.py index 88ef89a9cdf6..6357ab44d5f4 100644 --- a/ibis/backends/dask/__init__.py +++ b/ibis/backends/dask/__init__.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from collections.abc import Mapping, MutableMapping + from pathlib import Path # Make sure that the pandas backend options have been loaded ibis.pandas # noqa: B018 @@ -29,6 +30,7 @@ class Backend(BasePandasBackend): name = "dask" backend_table_type = dd.DataFrame + supports_in_memory_tables = False def do_connect( self, @@ -133,3 +135,8 @@ def _convert_object(cls, obj: dd.DataFrame) -> dd.DataFrame: def _load_into_cache(self, name, expr): self.create_table(name, self.compile(expr).persist()) + + def read_delta( + self, source: str | Path, table_name: str | None = None, **kwargs: Any + ): + raise NotImplementedError(self.name) diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index 97bf9f0961fa..a7d84f7e3d86 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -229,10 +229,6 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: def _clean_up_cached_table(self, op): del self.dictionary[op.name] - -class Backend(BasePandasBackend): - name = "pandas" - def to_pyarrow( self, expr: ir.Expr, @@ -264,6 +260,10 @@ def to_pyarrow_batches( pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size) ) + +class Backend(BasePandasBackend): + name = "pandas" + def execute(self, query, params=None, limit="default", **kwargs): from ibis.backends.pandas.core import execute_and_reset diff --git a/ibis/backends/tests/test_dataframe_interchange.py b/ibis/backends/tests/test_dataframe_interchange.py index 6fbaaba9efa3..5631d550045e 100644 --- a/ibis/backends/tests/test_dataframe_interchange.py +++ b/ibis/backends/tests/test_dataframe_interchange.py @@ -13,7 +13,7 @@ ] -@pytest.mark.notimpl(["dask", "druid"]) +@pytest.mark.notimpl(["druid"]) @pytest.mark.notimpl( ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" ) @@ -60,7 +60,6 @@ def test_dataframe_interchange_no_execute(con, alltypes, mocker): assert not to_pyarrow.called -@pytest.mark.notimpl(["dask"]) @pytest.mark.notimpl( ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" ) @@ -80,7 +79,7 @@ def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker): assert to_pyarrow.call_count == 1 -@pytest.mark.notimpl(["dask", "druid"]) +@pytest.mark.notimpl(["druid"]) @pytest.mark.notimpl( ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" ) @@ -112,7 +111,6 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): assert col2.size() == pa_col2.size() -@pytest.mark.notimpl(["dask"]) @pytest.mark.notimpl( ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" ) diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index a794c1ff88c8..b397dd464117 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -43,9 +43,7 @@ ] no_limit = [ - param( - None, id="nolimit", marks=[pytest.mark.notimpl(["dask", "impala", "pyspark"])] - ) + param(None, id="nolimit", marks=[pytest.mark.notimpl(["impala", "pyspark"])]) ] limit_no_limit = limit + no_limit @@ -117,7 +115,7 @@ def test_scalar_to_pyarrow_scalar(limit, awards_players): } -@pytest.mark.notimpl(["dask", "impala", "pyspark", "druid"]) +@pytest.mark.notimpl(["impala", "pyspark", "druid"]) def test_table_to_pyarrow_table_schema(con, awards_players): table = awards_players.to_pyarrow() assert isinstance(table, pa.Table) @@ -136,7 +134,7 @@ def test_table_to_pyarrow_table_schema(con, awards_players): assert table.schema == expected_schema -@pytest.mark.notimpl(["dask", "impala", "pyspark"]) +@pytest.mark.notimpl(["impala", "pyspark"]) def test_column_to_pyarrow_table_schema(awards_players): expr = awards_players.awardID array = expr.to_pyarrow() @@ -193,7 +191,7 @@ def test_to_pyarrow_batches_borked_types(batting): util.consume(batch_reader) -@pytest.mark.notimpl(["dask", "impala", "pyspark"]) +@pytest.mark.notimpl(["impala", "pyspark"]) def test_to_pyarrow_memtable(con): expr = ibis.memtable({"x": [1, 2, 3]}) table = con.to_pyarrow(expr) @@ -201,7 +199,7 @@ def test_to_pyarrow_memtable(con): assert len(table) == 3 -@pytest.mark.notimpl(["dask", "impala", "pyspark"]) +@pytest.mark.notimpl(["impala", "pyspark"]) def test_to_pyarrow_batches_memtable(con): expr = ibis.memtable({"x": [1, 2, 3]}) n = 0 @@ -212,7 +210,7 @@ def test_to_pyarrow_batches_memtable(con): assert n == 3 -@pytest.mark.notimpl(["dask", "impala", "pyspark"]) +@pytest.mark.notimpl(["impala", "pyspark"]) def test_table_to_parquet(tmp_path, backend, awards_players): outparquet = tmp_path / "out.parquet" awards_players.to_parquet(outparquet) @@ -265,9 +263,7 @@ def test_roundtrip_partitioned_parquet(tmp_path, con, backend, awards_players): backend.assert_frame_equal(reingest.to_pandas(), awards_players.to_pandas()) -@pytest.mark.notimpl( - ["dask", "impala", "pyspark"], reason="No support for exporting files" -) +@pytest.mark.notimpl(["impala", "pyspark"], reason="No support for exporting files") @pytest.mark.parametrize("ftype", ["csv", "parquet"]) def test_memtable_to_file(tmp_path, con, ftype, monkeypatch): """ @@ -288,7 +284,7 @@ def test_memtable_to_file(tmp_path, con, ftype, monkeypatch): assert outfile.is_file() -@pytest.mark.notimpl(["dask", "impala", "pyspark"]) +@pytest.mark.notimpl(["impala", "pyspark"]) def test_table_to_csv(tmp_path, backend, awards_players): outcsv = tmp_path / "out.csv" @@ -314,7 +310,6 @@ def test_table_to_csv(tmp_path, backend, awards_players): ["impala"], raises=AttributeError, reason="fetchmany doesn't exist" ), pytest.mark.notyet(["druid"], raises=sa.exc.ProgrammingError), - pytest.mark.notyet(["dask"], raises=NotImplementedError), pytest.mark.notyet(["pyspark"], raises=NotImplementedError), ], ), @@ -329,7 +324,6 @@ def test_table_to_csv(tmp_path, backend, awards_players): ["druid", "snowflake", "trino"], raises=sa.exc.ProgrammingError ), pytest.mark.notyet(["oracle"], raises=sa.exc.DatabaseError), - pytest.mark.notyet(["dask"], raises=NotImplementedError), pytest.mark.notyet(["mssql", "mysql"], raises=sa.exc.OperationalError), pytest.mark.notyet(["pyspark"], raises=ParseException), ], @@ -390,7 +384,6 @@ def test_roundtrip_delta(con, alltypes, tmp_path, monkeypatch): @pytest.mark.xfail_version( duckdb=["duckdb<0.8.1"], raises=AssertionError, reason="bug in duckdb" ) -@pytest.mark.notimpl(["dask"], raises=NotImplementedError) @pytest.mark.notimpl( ["druid"], raises=AttributeError, reason="string type is used for timestamp_col" ) @@ -419,7 +412,7 @@ def test_arrow_timestamp_with_time_zone(alltypes): assert batch.schema.types == expected -@pytest.mark.notimpl(["dask", "druid"]) +@pytest.mark.notimpl(["druid"]) @pytest.mark.notimpl( ["impala"], raises=AttributeError, reason="missing `fetchmany` on the cursor" ) @@ -447,7 +440,7 @@ def test_empty_memtable(backend, con): backend.assert_frame_equal(result, expected) -@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["flink", "impala", "pyspark"]) def test_to_pandas_batches_empty_table(backend, con): t = backend.functional_alltypes.limit(0) n = t.count().execute() @@ -456,7 +449,7 @@ def test_to_pandas_batches_empty_table(backend, con): assert sum(map(len, t.to_pandas_batches())) == n -@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["druid", "flink", "impala", "pyspark"]) @pytest.mark.parametrize("n", [None, 1]) def test_to_pandas_batches_nonempty_table(backend, con, n): t = backend.functional_alltypes.limit(n) @@ -466,7 +459,7 @@ def test_to_pandas_batches_nonempty_table(backend, con, n): assert sum(map(len, t.to_pandas_batches())) == n -@pytest.mark.notimpl(["dask", "flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["flink", "impala", "pyspark"]) @pytest.mark.parametrize("n", [None, 0, 1, 2]) def test_to_pandas_batches_column(backend, con, n): t = backend.functional_alltypes.limit(n).timestamp_col @@ -476,7 +469,7 @@ def test_to_pandas_batches_column(backend, con, n): assert sum(map(len, t.to_pandas_batches())) == n -@pytest.mark.notimpl(["dask", "druid", "flink", "impala", "pyspark"]) +@pytest.mark.notimpl(["druid", "flink", "impala", "pyspark"]) def test_to_pandas_batches_scalar(backend, con): t = backend.functional_alltypes.timestamp_col.max() expected = t.execute()