Skip to content

Commit

Permalink
feat(flink): deep dive into test_window/timecontext.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Jan 16, 2024
1 parent 175f141 commit 5a4299b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
11 changes: 11 additions & 0 deletions ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ def _clip(translator: ExprTranslator, op: ops.Node) -> str:
return f"CAST({arg} AS {FlinkType.from_ibis(op.dtype)!s})"


def _ntile(translator: ExprTranslator, op: ops.NTile) -> str:
return f"NTILE({op.buckets.value})"

Check warning on line 253 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L253

Added line #L253 was not covered by tests


def _floor_divide(translator: ExprTranslator, op: ops.Node) -> str:
left = translator.translate(op.left)
right = translator.translate(op.right)
Expand Down Expand Up @@ -294,6 +298,11 @@ def _map_get(translator: ExprTranslator, op: ops.maps.MapGet) -> str:
return f"{map_} [ {key} ]"


def _struct_field(translator: ExprTranslator, op: ops.StructField) -> str:
arg = translator.translate(op.arg)
return f"{arg}.`{op.field}`"

Check warning on line 303 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L302-L303

Added lines #L302 - L303 were not covered by tests


def _day_of_week_index(
translator: ExprTranslator, op: ops.temporal.DayOfWeekIndex
) -> str:
Expand Down Expand Up @@ -439,6 +448,7 @@ def _struct_field(translator, op):
ops.IfElse: _filter,
ops.Window: _window,
ops.Clip: _clip,
ops.NTile: _ntile,
# Binary operations
ops.Power: fixed_arity("power", 2),
ops.FloorDivide: _floor_divide,
Expand All @@ -448,6 +458,7 @@ def _struct_field(translator, op):
ops.JSONGetItem: _json_get_item,
ops.Map: _map,
ops.MapGet: _map_get,
ops.StructField: _struct_field,
# Temporal functions
ops.DateAdd: _date_add,
ops.DateDiff: _date_diff,
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ def connect(*, tmpdir, worker_id, **kw: Any):
def _load_data(self, **_: Any) -> None:
import pandas as pd

from ibis.backends.tests.data import json_types, struct_types
from ibis.backends.tests.data import json_types, struct_types, win

for table_name in TEST_TABLES:
path = self.data_dir / "parquet" / f"{table_name}.parquet"
self.connection.create_table(table_name, pd.read_parquet(path), temp=True)

self.connection.create_table("json_t", json_types, temp=True)
self.connection.create_table("struct", struct_types, temp=True)
self.connection.create_table("win", win, temp=True)


class TestConfForStreaming(TestConf):
Expand Down
48 changes: 33 additions & 15 deletions ibis/backends/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def calc_zscore(s):
raises=Exception,
reason="Exception: Internal error: Expects default value to have Int64 type.",
),
pytest.mark.notimpl(["flink"], raises=Py4JJavaError),
pytest.mark.notimpl(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions",
),
],
),
param(
Expand All @@ -112,7 +116,11 @@ def calc_zscore(s):
raises=BaseException,
),
pytest.mark.notimpl(["dask"], raises=NotImplementedError),
pytest.mark.notimpl(["flink"], raises=Py4JJavaError),
pytest.mark.notimpl(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions",
),
],
),
param(
Expand Down Expand Up @@ -218,7 +226,11 @@ def calc_zscore(s):
["impala", "mssql"], raises=com.OperationNotDefinedError
),
pytest.mark.notimpl(["dask"], raises=NotImplementedError),
pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError),
pytest.mark.notimpl(
["flink"],
raises=com.OperationNotDefinedError,
reason="No translation rule for <class 'ibis.expr.operations.analytic.NthValue'>",
),
],
),
param(
Expand Down Expand Up @@ -442,7 +454,11 @@ def test_ungrouped_bounded_expanding_window(
@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
@pytest.mark.notimpl(["pandas"], raises=AssertionError)
@pytest.mark.notimpl(["flink"], raises=com.UnsupportedOperationError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="OVER RANGE FOLLOWING windows are not supported in Flink yet",
)
def test_grouped_bounded_following_window(backend, alltypes, df, preceding, following):
window = ibis.window(
preceding=preceding,
Expand Down Expand Up @@ -621,7 +637,11 @@ def test_grouped_unbounded_window(
@pytest.mark.broken(["snowflake"], raises=AssertionError)
@pytest.mark.broken(["dask", "pandas", "mssql"], raises=AssertionError)
@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["flink"], raises=com.UnsupportedOperationError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="OVER RANGE FOLLOWING windows are not supported in Flink yet",
)
def test_simple_ungrouped_unbound_following_window(
backend, alltypes, ibis_method, pandas_fn
):
Expand Down Expand Up @@ -694,6 +714,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand All @@ -711,11 +732,6 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
raises=PySparkAnalysisException,
reason="pyspark requires CURRENT ROW",
),
pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: Argument to function 'NTILE' must be a literal",
),
],
),
param(
Expand Down Expand Up @@ -776,6 +792,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand Down Expand Up @@ -945,6 +962,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand Down Expand Up @@ -1129,17 +1147,17 @@ def test_mutate_window_filter(backend, alltypes):


@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["flink"],
raises=Exception,
reason="KeyError: Table with name win doesn't exist.",
)
@pytest.mark.broken(
["impala"],
reason="the database returns incorrect results",
raises=AssertionError,
)
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Windows in Flink can only be ordered by a single time column",
)
def test_first_last(backend):
t = backend.win
w = ibis.window(group_by=t.g, order_by=[t.x, t.y], preceding=1, following=0)
Expand Down

0 comments on commit 5a4299b

Please sign in to comment.