Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flink): implement struct field, clean up literal, and adjust timecontext test markers #7997

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@
return f"CAST({arg} AS {FlinkType.from_ibis(op.dtype)!s})"


def _ntile(translator: ExprTranslator, op: ops.NTile) -> str:
return f"NTILE({op.buckets.value})"

Check warning on line 253 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L253

Added line #L253 was not covered by tests


def _floor_divide(translator: ExprTranslator, op: ops.Node) -> str:
left = translator.translate(op.left)
right = translator.translate(op.right)
Expand Down Expand Up @@ -294,6 +298,11 @@
return f"{map_} [ {key} ]"


def _struct_field(translator: ExprTranslator, op: ops.StructField) -> str:
arg = translator.translate(op.arg)
return f"{arg}.`{op.field}`"

Check warning on line 303 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L302-L303

Added lines #L302 - L303 were not covered by tests


def _day_of_week_index(
translator: ExprTranslator, op: ops.temporal.DayOfWeekIndex
) -> str:
Expand Down Expand Up @@ -439,6 +448,7 @@
ops.IfElse: _filter,
ops.Window: _window,
ops.Clip: _clip,
ops.NTile: _ntile,
# Binary operations
ops.Power: fixed_arity("power", 2),
ops.FloorDivide: _floor_divide,
Expand All @@ -448,6 +458,7 @@
ops.JSONGetItem: _json_get_item,
ops.Map: _map,
ops.MapGet: _map_get,
ops.StructField: _struct_field,
# Temporal functions
ops.DateAdd: _date_add,
ops.DateDiff: _date_diff,
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ def connect(*, tmpdir, worker_id, **kw: Any):
def _load_data(self, **_: Any) -> None:
import pandas as pd

from ibis.backends.tests.data import json_types, struct_types
from ibis.backends.tests.data import json_types, struct_types, win

for table_name in TEST_TABLES:
path = self.data_dir / "parquet" / f"{table_name}.parquet"
self.connection.create_table(table_name, pd.read_parquet(path), temp=True)

self.connection.create_table("json_t", json_types, temp=True)
self.connection.create_table("struct", struct_types, temp=True)
self.connection.create_table("win", win, temp=True)


class TestConfForStreaming(TestConf):
Expand Down
48 changes: 33 additions & 15 deletions ibis/backends/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def calc_zscore(s):
raises=Exception,
reason="Exception: Internal error: Expects default value to have Int64 type.",
),
pytest.mark.notimpl(["flink"], raises=Py4JJavaError),
pytest.mark.notimpl(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions",
),
],
),
param(
Expand All @@ -112,7 +116,11 @@ def calc_zscore(s):
raises=BaseException,
),
pytest.mark.notimpl(["dask"], raises=NotImplementedError),
pytest.mark.notimpl(["flink"], raises=Py4JJavaError),
pytest.mark.notimpl(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions",
),
],
),
param(
Expand Down Expand Up @@ -218,7 +226,11 @@ def calc_zscore(s):
["impala", "mssql"], raises=com.OperationNotDefinedError
),
pytest.mark.notimpl(["dask"], raises=NotImplementedError),
pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError),
pytest.mark.notimpl(
["flink"],
raises=com.OperationNotDefinedError,
reason="No translation rule for <class 'ibis.expr.operations.analytic.NthValue'>",
),
],
),
param(
Expand Down Expand Up @@ -442,7 +454,11 @@ def test_ungrouped_bounded_expanding_window(
@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
@pytest.mark.notimpl(["pandas"], raises=AssertionError)
@pytest.mark.notimpl(["flink"], raises=com.UnsupportedOperationError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="OVER RANGE FOLLOWING windows are not supported in Flink yet",
)
def test_grouped_bounded_following_window(backend, alltypes, df, preceding, following):
window = ibis.window(
preceding=preceding,
Expand Down Expand Up @@ -621,7 +637,11 @@ def test_grouped_unbounded_window(
@pytest.mark.broken(["snowflake"], raises=AssertionError)
@pytest.mark.broken(["dask", "pandas", "mssql"], raises=AssertionError)
@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["flink"], raises=com.UnsupportedOperationError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="OVER RANGE FOLLOWING windows are not supported in Flink yet",
)
def test_simple_ungrouped_unbound_following_window(
backend, alltypes, ibis_method, pandas_fn
):
Expand Down Expand Up @@ -694,6 +714,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand All @@ -711,11 +732,6 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
raises=PySparkAnalysisException,
reason="pyspark requires CURRENT ROW",
),
pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: Argument to function 'NTILE' must be a literal",
),
],
),
param(
Expand Down Expand Up @@ -776,6 +792,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand Down Expand Up @@ -945,6 +962,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand Down Expand Up @@ -1129,17 +1147,17 @@ def test_mutate_window_filter(backend, alltypes):


@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["flink"],
raises=Exception,
reason="KeyError: Table with name win doesn't exist.",
)
@pytest.mark.broken(
["impala"],
reason="the database returns incorrect results",
raises=AssertionError,
)
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Windows in Flink can only be ordered by a single time column",
)
def test_first_last(backend):
t = backend.win
w = ibis.window(group_by=t.g, order_by=[t.x, t.y], preceding=1, following=0)
Expand Down
Loading