From 256767f4ace61ea3a168bcce1c4c7715d9547a31 Mon Sep 17 00:00:00 2001 From: Chloe He Date: Thu, 16 Nov 2023 12:46:10 -0800 Subject: [PATCH] feat(flink): implement windowed computations --- ibis/backends/flink/registry.py | 7 +---- .../test_window_aggregation/out.sql | 3 ++ .../test_compiler/test_window_topn/out.sql | 12 ++++++++ .../cumulate_window/out.sql | 0 .../hop_window/out.sql | 0 .../tumble_window/out.sql | 0 ibis/backends/flink/tests/test_compiler.py | 28 ++++++++++++++++++- ibis/backends/flink/tests/test_window.py | 13 --------- 8 files changed, 43 insertions(+), 20 deletions(-) create mode 100644 ibis/backends/flink/tests/snapshots/test_compiler/test_window_aggregation/out.sql create mode 100644 ibis/backends/flink/tests/snapshots/test_compiler/test_window_topn/out.sql rename ibis/backends/flink/tests/snapshots/test_compiler/{test_tvf => test_windowing_tvf}/cumulate_window/out.sql (100%) rename ibis/backends/flink/tests/snapshots/test_compiler/{test_tvf => test_windowing_tvf}/hop_window/out.sql (100%) rename ibis/backends/flink/tests/snapshots/test_compiler/{test_tvf => test_windowing_tvf}/tumble_window/out.sql (100%) diff --git a/ibis/backends/flink/registry.py b/ibis/backends/flink/registry.py index 3f59f7e90237..147448b7510f 100644 --- a/ibis/backends/flink/registry.py +++ b/ibis/backends/flink/registry.py @@ -156,10 +156,6 @@ def _format_window_frame(translator: ExprTranslator, func, frame): components.append(f"PARTITION BY {partition_args}") (order_by,) = frame.order_by - if order_by.descending is True: - raise com.UnsupportedOperationError( - "Flink only supports windows ordered in ASCENDING mode" - ) components.append(f"ORDER BY {translator.translate(order_by)}") if frame.start is None and frame.end is None: @@ -221,8 +217,7 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str: if isinstance(func, (ops.RankBase, ops.NTile)): return f"({result} - 1)" - else: - return result + return result def _clip(translator: ExprTranslator, op: ops.Node) -> str: diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_window_aggregation/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_window_aggregation/out.sql new file mode 100644 index 000000000000..9867d27155f1 --- /dev/null +++ b/ibis/backends/flink/tests/snapshots/test_compiler/test_window_aggregation/out.sql @@ -0,0 +1,3 @@ +SELECT t0.`window_start`, t0.`window_end`, t0.`g`, avg(t0.`d`) AS `mean` +FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '15' MINUTE)) t0 +GROUP BY t0.`window_start`, t0.`window_end`, t0.`g` \ No newline at end of file diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_window_topn/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_window_topn/out.sql new file mode 100644 index 000000000000..92d4e692424f --- /dev/null +++ b/ibis/backends/flink/tests/snapshots/test_compiler/test_window_topn/out.sql @@ -0,0 +1,12 @@ +WITH t0 AS ( + SELECT t2.`a`, t2.`b`, t2.`c`, t2.`d`, t2.`g`, t2.`window_start`, + t2.`window_end` + FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '10' MINUTE)) t2 +) +SELECT t1.* +FROM ( + SELECT t0.*, + (row_number() OVER (PARTITION BY t0.`window_start`, t0.`window_end` ORDER BY t0.`g` DESC) - 1) AS `rownum` + FROM t0 +) t1 +WHERE t1.`rownum` <= CAST(3 AS TINYINT) \ No newline at end of file diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/cumulate_window/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_windowing_tvf/cumulate_window/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/cumulate_window/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_windowing_tvf/cumulate_window/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/hop_window/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_windowing_tvf/hop_window/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/hop_window/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_windowing_tvf/hop_window/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/tumble_window/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_windowing_tvf/tumble_window/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/tumble_window/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_windowing_tvf/tumble_window/out.sql diff --git a/ibis/backends/flink/tests/test_compiler.py b/ibis/backends/flink/tests/test_compiler.py index eaec12cdf89a..0c404c32e378 100644 --- a/ibis/backends/flink/tests/test_compiler.py +++ b/ibis/backends/flink/tests/test_compiler.py @@ -4,6 +4,7 @@ from pytest import param import ibis +from ibis.common.deferred import _ def test_sum(con, snapshot, simple_table): @@ -137,9 +138,34 @@ def test_having(con, snapshot, simple_table): ), ], ) -def test_tvf(con, snapshot, simple_table, function_type, params): +def test_windowing_tvf(con, snapshot, simple_table, function_type, params): expr = getattr(simple_table.window_by(time_col=simple_table.i), function_type)( **params ) result = con.compile(expr) snapshot.assert_match(result, "out.sql") + + +def test_window_aggregation(con, snapshot, simple_table): + expr = ( + simple_table.window_by(time_col=simple_table.i) + .tumble(window_size=ibis.interval(minutes=15)) + .group_by(["window_start", "window_end", "g"]) + .aggregate(mean=_.d.mean()) + ) + result = con.compile(expr) + snapshot.assert_match(result, "out.sql") + + +def test_window_topn(con, snapshot, simple_table): + expr = simple_table.window_by(time_col="i").tumble( + window_size=ibis.interval(seconds=600), + )["a", "b", "c", "d", "g", "window_start", "window_end"] + expr = expr.mutate( + rownum=ibis.row_number().over( + group_by=["window_start", "window_end"], order_by=ibis.desc("g") + ) + ) + expr = expr[expr.rownum <= 3] + result = con.compile(expr) + snapshot.assert_match(result, "out.sql") diff --git a/ibis/backends/flink/tests/test_window.py b/ibis/backends/flink/tests/test_window.py index e40ac8744252..ca67a317fa2e 100644 --- a/ibis/backends/flink/tests/test_window.py +++ b/ibis/backends/flink/tests/test_window.py @@ -29,19 +29,6 @@ def test_window_does_not_support_multiple_order_by(con, simple_table): con.compile(expr) -def test_window_does_not_support_desc_order(con, simple_table): - expr = simple_table.f.sum().over( - rows=(-1, 1), - group_by=[simple_table.g, simple_table.a], - order_by=[simple_table.f.desc()], - ) - with pytest.raises( - UnsupportedOperationError, - match="Flink only supports windows ordered in ASCENDING mode", - ): - con.compile(expr) - - @pytest.mark.parametrize( ("window", "err"), [