Skip to content

Commit

Permalink
feat(flink): create views from more mem data types (#9622)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored Jul 17, 2024
1 parent 16998df commit b83fc2b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 26 deletions.
35 changes: 15 additions & 20 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sqlglot as sg
import sqlglot.expressions as sge

import ibis
import ibis.common.exceptions as exc
import ibis.expr.operations as ops
import ibis.expr.schema as sch
Expand Down Expand Up @@ -445,27 +446,21 @@ def create_table(

# In-memory data is created as views in `pyflink`
if obj is not None:
if isinstance(obj, pd.DataFrame):
dataframe = obj

elif isinstance(obj, pa.Table):
dataframe = obj.to_pandas()

elif isinstance(obj, ir.Table):
# Note (mehmet): If obj points to in-memory data, we create a view.
# Other cases are unsupported for now, e.g., obj is of UnboundTable.
# See TODO right below for more context on how we handle in-memory data.
op = obj.op()
if isinstance(op, ops.InMemoryTable):
dataframe = op.data.to_frame()
else:
raise exc.IbisError(
"`obj` is of type ibis.expr.types.Table but it is not in-memory. "
"Currently, only in-memory tables are supported. "
"See ibis.memtable() for info on creating in-memory table."
)
if not isinstance(obj, ir.Table):
obj = ibis.memtable(obj)

# Note (mehmet): If obj points to in-memory data, we create a view.
# Other cases are unsupported for now, e.g., obj is of UnboundTable.
# See TODO right below for more context on how we handle in-memory data.
op = obj.op()
if isinstance(op, ops.InMemoryTable):
dataframe = op.data.to_frame()
else:
raise exc.IbisError(f"Unsupported `obj` type: {type(obj)}")
raise exc.IbisError(
"`obj` is of type ibis.expr.types.Table but it is not in-memory. "
"Currently, only in-memory tables are supported. "
"See ibis.memtable() for info on creating in-memory table."
)

# TODO (mehmet): Flink requires a source connector to create regular tables.
# In-memory data can only be created as a view (virtual table). So we decided
Expand Down
71 changes: 65 additions & 6 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas.testing as tm
import pyarrow as pa
import pytest
from pytest import param

import ibis
import ibis.common.exceptions as exc
Expand Down Expand Up @@ -62,6 +63,66 @@ def test_create_table(con, awards_players_schema, temp_table, csv_source_configs
assert temp_table not in con.list_tables()


@pytest.mark.parametrize(
"obj, table_name",
[
param(lambda: pa.table({"a": ["a"], "b": [1]}), "df_arrow", id="pyarrow table"),
param(lambda: pd.DataFrame({"a": ["a"], "b": [1]}), "df_pandas", id="pandas"),
param(
lambda: pytest.importorskip("polars").DataFrame({"a": ["a"], "b": [1]}),
"df_polars_eager",
id="polars dataframe",
),
param(
lambda: pytest.importorskip("polars").LazyFrame({"a": ["a"], "b": [1]}),
"df_polars_lazy",
id="polars lazyframe",
),
param(
lambda: ibis.memtable([("a", 1)], columns=["a", "b"]),
"memtable",
id="memtable_list",
),
param(
lambda: ibis.memtable(pa.table({"a": ["a"], "b": [1]})),
"memtable_pa",
id="memtable pyarrow",
),
param(
lambda: ibis.memtable(pd.DataFrame({"a": ["a"], "b": [1]})),
"memtable_pandas",
id="memtable pandas",
),
param(
lambda: ibis.memtable(
pytest.importorskip("polars").DataFrame({"a": ["a"], "b": [1]})
),
"memtable_polars_eager",
id="memtable polars dataframe",
),
param(
lambda: ibis.memtable(
pytest.importorskip("polars").LazyFrame({"a": ["a"], "b": [1]})
),
"memtable_polars_lazy",
id="memtable polars lazyframe",
),
],
)
def test_create_table_in_memory(con, obj, table_name, monkeypatch):
"""Same as in ibis/backends/tests/test_client.py, with temp=True."""
monkeypatch.setattr(ibis.options, "default_backend", con)
obj = obj()
t = con.create_table(table_name, obj, temp=True)

result = pa.table({"a": ["a"], "b": [1]})
assert table_name in con.list_tables()

assert result.equals(t.to_pyarrow())

con.drop_table(table_name, force=True)


def test_recreate_table_from_schema(
con, awards_players_schema, temp_table, csv_source_configs
):
Expand Down Expand Up @@ -362,25 +423,23 @@ def test_rename_table(con, awards_players_schema, temp_table, csv_source_configs
@pytest.mark.parametrize(
"obj",
[
pytest.param(
[("fred flintstone", 35, 1.28), ("barney rubble", 32, 2.32)], id="list"
),
pytest.param(
param([("fred flintstone", 35, 1.28), ("barney rubble", 32, 2.32)], id="list"),
param(
{
"name": ["fred flintstone", "barney rubble"],
"age": [35, 32],
"gpa": [1.28, 2.32],
},
id="dict",
),
pytest.param(
param(
pd.DataFrame(
[("fred flintstone", 35, 1.28), ("barney rubble", 32, 2.32)],
columns=["name", "age", "gpa"],
),
id="pandas_dataframe",
),
pytest.param(
param(
pa.Table.from_arrays(
[
pa.array(["fred flintstone", "barney rubble"]),
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,9 @@ def test_schema_with_caching(alltypes):
@pytest.mark.notyet(
["datafusion"], reason="Doesn't support table creation from records"
)
@pytest.mark.notimpl(
["flink"], reason="Temp tables are implemented as views, which don't support insert"
)
@pytest.mark.parametrize(
"first_row, second_row",
[
Expand Down

0 comments on commit b83fc2b

Please sign in to comment.