diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index 69059cba9c90..44e4cc08f6c7 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -6,6 +6,7 @@ import sqlglot as sg import sqlglot.expressions as sge +import ibis import ibis.common.exceptions as exc import ibis.expr.operations as ops import ibis.expr.schema as sch @@ -445,27 +446,21 @@ def create_table( # In-memory data is created as views in `pyflink` if obj is not None: - if isinstance(obj, pd.DataFrame): - dataframe = obj - - elif isinstance(obj, pa.Table): - dataframe = obj.to_pandas() - - elif isinstance(obj, ir.Table): - # Note (mehmet): If obj points to in-memory data, we create a view. - # Other cases are unsupported for now, e.g., obj is of UnboundTable. - # See TODO right below for more context on how we handle in-memory data. - op = obj.op() - if isinstance(op, ops.InMemoryTable): - dataframe = op.data.to_frame() - else: - raise exc.IbisError( - "`obj` is of type ibis.expr.types.Table but it is not in-memory. " - "Currently, only in-memory tables are supported. " - "See ibis.memtable() for info on creating in-memory table." - ) + if not isinstance(obj, ir.Table): + obj = ibis.memtable(obj) + + # Note (mehmet): If obj points to in-memory data, we create a view. + # Other cases are unsupported for now, e.g., obj is of UnboundTable. + # See TODO right below for more context on how we handle in-memory data. + op = obj.op() + if isinstance(op, ops.InMemoryTable): + dataframe = op.data.to_frame() else: - raise exc.IbisError(f"Unsupported `obj` type: {type(obj)}") + raise exc.IbisError( + "`obj` is of type ibis.expr.types.Table but it is not in-memory. " + "Currently, only in-memory tables are supported. " + "See ibis.memtable() for info on creating in-memory table." + ) # TODO (mehmet): Flink requires a source connector to create regular tables. # In-memory data can only be created as a view (virtual table). So we decided diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py index 44742bc7c821..c80ec2439f5e 100644 --- a/ibis/backends/flink/tests/test_ddl.py +++ b/ibis/backends/flink/tests/test_ddl.py @@ -8,6 +8,7 @@ import pandas.testing as tm import pyarrow as pa import pytest +from pytest import param import ibis import ibis.common.exceptions as exc @@ -62,6 +63,66 @@ def test_create_table(con, awards_players_schema, temp_table, csv_source_configs assert temp_table not in con.list_tables() +@pytest.mark.parametrize( + "obj, table_name", + [ + param(lambda: pa.table({"a": ["a"], "b": [1]}), "df_arrow", id="pyarrow table"), + param(lambda: pd.DataFrame({"a": ["a"], "b": [1]}), "df_pandas", id="pandas"), + param( + lambda: pytest.importorskip("polars").DataFrame({"a": ["a"], "b": [1]}), + "df_polars_eager", + id="polars dataframe", + ), + param( + lambda: pytest.importorskip("polars").LazyFrame({"a": ["a"], "b": [1]}), + "df_polars_lazy", + id="polars lazyframe", + ), + param( + lambda: ibis.memtable([("a", 1)], columns=["a", "b"]), + "memtable", + id="memtable_list", + ), + param( + lambda: ibis.memtable(pa.table({"a": ["a"], "b": [1]})), + "memtable_pa", + id="memtable pyarrow", + ), + param( + lambda: ibis.memtable(pd.DataFrame({"a": ["a"], "b": [1]})), + "memtable_pandas", + id="memtable pandas", + ), + param( + lambda: ibis.memtable( + pytest.importorskip("polars").DataFrame({"a": ["a"], "b": [1]}) + ), + "memtable_polars_eager", + id="memtable polars dataframe", + ), + param( + lambda: ibis.memtable( + pytest.importorskip("polars").LazyFrame({"a": ["a"], "b": [1]}) + ), + "memtable_polars_lazy", + id="memtable polars lazyframe", + ), + ], +) +def test_create_table_in_memory(con, obj, table_name, monkeypatch): + """Same as in ibis/backends/tests/test_client.py, with temp=True.""" + monkeypatch.setattr(ibis.options, "default_backend", con) + obj = obj() + t = con.create_table(table_name, obj, temp=True) + + result = pa.table({"a": ["a"], "b": [1]}) + assert table_name in con.list_tables() + + assert result.equals(t.to_pyarrow()) + + con.drop_table(table_name, force=True) + + def test_recreate_table_from_schema( con, awards_players_schema, temp_table, csv_source_configs ): @@ -362,10 +423,8 @@ def test_rename_table(con, awards_players_schema, temp_table, csv_source_configs @pytest.mark.parametrize( "obj", [ - pytest.param( - [("fred flintstone", 35, 1.28), ("barney rubble", 32, 2.32)], id="list" - ), - pytest.param( + param([("fred flintstone", 35, 1.28), ("barney rubble", 32, 2.32)], id="list"), + param( { "name": ["fred flintstone", "barney rubble"], "age": [35, 32], @@ -373,14 +432,14 @@ def test_rename_table(con, awards_players_schema, temp_table, csv_source_configs }, id="dict", ), - pytest.param( + param( pd.DataFrame( [("fred flintstone", 35, 1.28), ("barney rubble", 32, 2.32)], columns=["name", "age", "gpa"], ), id="pandas_dataframe", ), - pytest.param( + param( pa.Table.from_arrays( [ pa.array(["fred flintstone", "barney rubble"]), diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 1bbadd4c8957..1d7ea55c5168 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1732,6 +1732,9 @@ def test_schema_with_caching(alltypes): @pytest.mark.notyet( ["datafusion"], reason="Doesn't support table creation from records" ) +@pytest.mark.notimpl( + ["flink"], reason="Temp tables are implemented as views, which don't support insert" +) @pytest.mark.parametrize( "first_row, second_row", [