Skip to content

Commit

Permalink
feat(flink): deep dive into test_window/timecontext.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Jan 16, 2024
1 parent c94c0df commit 51924c9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 16 deletions.
9 changes: 9 additions & 0 deletions ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def _interval_subtract(
def _literal(translator: ExprTranslator, op: ops.Literal) -> str:
from ibis.backends.flink.utils import translate_literal

if op.dtype.is_numeric():
return f"{op.value}"

Check warning on line 102 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L102

Added line #L102 was not covered by tests

return translate_literal(op)


Expand Down Expand Up @@ -294,6 +297,11 @@ def _map_get(translator: ExprTranslator, op: ops.maps.MapGet) -> str:
return f"{map_} [ {key} ]"


def _struct_field(translator: ExprTranslator, op: ops.StructField) -> str:
arg = translator.translate(op.arg)
return f"{arg}.`{op.field}`"

Check warning on line 302 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L301-L302

Added lines #L301 - L302 were not covered by tests


def _day_of_week_index(
translator: ExprTranslator, op: ops.temporal.DayOfWeekIndex
) -> str:
Expand Down Expand Up @@ -448,6 +456,7 @@ def _struct_field(translator, op):
ops.JSONGetItem: _json_get_item,
ops.Map: _map,
ops.MapGet: _map_get,
ops.StructField: _struct_field,
# Temporal functions
ops.DateAdd: _date_add,
ops.DateDiff: _date_diff,
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ def connect(*, tmpdir, worker_id, **kw: Any):
def _load_data(self, **_: Any) -> None:
import pandas as pd

from ibis.backends.tests.data import json_types, struct_types
from ibis.backends.tests.data import json_types, struct_types, win

Check warning on line 52 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L52

Added line #L52 was not covered by tests

for table_name in TEST_TABLES:
path = self.data_dir / "parquet" / f"{table_name}.parquet"
self.connection.create_table(table_name, pd.read_parquet(path), temp=True)

self.connection.create_table("json_t", json_types, temp=True)
self.connection.create_table("struct", struct_types, temp=True)
self.connection.create_table("win", win, temp=True)

Check warning on line 60 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L60

Added line #L60 was not covered by tests


class TestConfForStreaming(TestConf):
Expand Down
48 changes: 33 additions & 15 deletions ibis/backends/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def calc_zscore(s):
raises=Exception,
reason="Exception: Internal error: Expects default value to have Int64 type.",
),
pytest.mark.notimpl(["flink"], raises=Py4JJavaError),
pytest.mark.notimpl(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions",
),
],
),
param(
Expand All @@ -112,7 +116,11 @@ def calc_zscore(s):
raises=BaseException,
),
pytest.mark.notimpl(["dask"], raises=NotImplementedError),
pytest.mark.notimpl(["flink"], raises=Py4JJavaError),
pytest.mark.notimpl(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions",
),
],
),
param(
Expand Down Expand Up @@ -218,7 +226,11 @@ def calc_zscore(s):
["impala", "mssql"], raises=com.OperationNotDefinedError
),
pytest.mark.notimpl(["dask"], raises=NotImplementedError),
pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError),
pytest.mark.notimpl(
["flink"],
raises=com.OperationNotDefinedError,
reason="No translation rule for <class 'ibis.expr.operations.analytic.NthValue'>",
),
],
),
param(
Expand Down Expand Up @@ -442,7 +454,11 @@ def test_ungrouped_bounded_expanding_window(
@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
@pytest.mark.notimpl(["pandas"], raises=AssertionError)
@pytest.mark.notimpl(["flink"], raises=com.UnsupportedOperationError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="OVER RANGE FOLLOWING windows are not supported in Flink yet",
)
def test_grouped_bounded_following_window(backend, alltypes, df, preceding, following):
window = ibis.window(
preceding=preceding,
Expand Down Expand Up @@ -621,7 +637,11 @@ def test_grouped_unbounded_window(
@pytest.mark.broken(["snowflake"], raises=AssertionError)
@pytest.mark.broken(["dask", "pandas", "mssql"], raises=AssertionError)
@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["flink"], raises=com.UnsupportedOperationError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="OVER RANGE FOLLOWING windows are not supported in Flink yet",
)
def test_simple_ungrouped_unbound_following_window(
backend, alltypes, ibis_method, pandas_fn
):
Expand Down Expand Up @@ -694,6 +714,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand All @@ -711,11 +732,6 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
raises=PySparkAnalysisException,
reason="pyspark requires CURRENT ROW",
),
pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="CalciteContextException: Argument to function 'NTILE' must be a literal",
),
],
),
param(
Expand Down Expand Up @@ -776,6 +792,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand Down Expand Up @@ -945,6 +962,7 @@ def test_simple_ungrouped_window_with_scalar_order_by(alltypes):
pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
),
],
),
Expand Down Expand Up @@ -1129,17 +1147,17 @@ def test_mutate_window_filter(backend, alltypes):


@pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["flink"],
raises=Exception,
reason="KeyError: Table with name win doesn't exist.",
)
@pytest.mark.broken(
["impala"],
reason="the database returns incorrect results",
raises=AssertionError,
)
@pytest.mark.notimpl(["dask"], raises=NotImplementedError)
@pytest.mark.notimpl(
["flink"],
raises=com.UnsupportedOperationError,
reason="Windows in Flink can only be ordered by a single time column",
)
def test_first_last(backend):
t = backend.win
w = ibis.window(group_by=t.g, order_by=[t.x, t.y], preceding=1, following=0)
Expand Down

0 comments on commit 51924c9

Please sign in to comment.