Skip to content

Commit

Permalink
feat(flink): implement windowed computations
Browse files Browse the repository at this point in the history
  • Loading branch information
chloeh13q authored and cpcloud committed Nov 16, 2023
1 parent 751cfcf commit 256767f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 20 deletions.
7 changes: 1 addition & 6 deletions ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ def _format_window_frame(translator: ExprTranslator, func, frame):
components.append(f"PARTITION BY {partition_args}")

(order_by,) = frame.order_by
if order_by.descending is True:
raise com.UnsupportedOperationError(
"Flink only supports windows ordered in ASCENDING mode"
)
components.append(f"ORDER BY {translator.translate(order_by)}")

if frame.start is None and frame.end is None:
Expand Down Expand Up @@ -221,8 +217,7 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str:

if isinstance(func, (ops.RankBase, ops.NTile)):
return f"({result} - 1)"
else:
return result
return result


def _clip(translator: ExprTranslator, op: ops.Node) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT t0.`window_start`, t0.`window_end`, t0.`g`, avg(t0.`d`) AS `mean`
FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '15' MINUTE)) t0
GROUP BY t0.`window_start`, t0.`window_end`, t0.`g`
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
WITH t0 AS (
SELECT t2.`a`, t2.`b`, t2.`c`, t2.`d`, t2.`g`, t2.`window_start`,
t2.`window_end`
FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '10' MINUTE)) t2
)
SELECT t1.*
FROM (
SELECT t0.*,
(row_number() OVER (PARTITION BY t0.`window_start`, t0.`window_end` ORDER BY t0.`g` DESC) - 1) AS `rownum`
FROM t0
) t1
WHERE t1.`rownum` <= CAST(3 AS TINYINT)
28 changes: 27 additions & 1 deletion ibis/backends/flink/tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pytest import param

import ibis
from ibis.common.deferred import _


def test_sum(con, snapshot, simple_table):
Expand Down Expand Up @@ -137,9 +138,34 @@ def test_having(con, snapshot, simple_table):
),
],
)
def test_tvf(con, snapshot, simple_table, function_type, params):
def test_windowing_tvf(con, snapshot, simple_table, function_type, params):
expr = getattr(simple_table.window_by(time_col=simple_table.i), function_type)(
**params
)
result = con.compile(expr)
snapshot.assert_match(result, "out.sql")


def test_window_aggregation(con, snapshot, simple_table):
expr = (
simple_table.window_by(time_col=simple_table.i)
.tumble(window_size=ibis.interval(minutes=15))
.group_by(["window_start", "window_end", "g"])
.aggregate(mean=_.d.mean())
)
result = con.compile(expr)
snapshot.assert_match(result, "out.sql")


def test_window_topn(con, snapshot, simple_table):
expr = simple_table.window_by(time_col="i").tumble(
window_size=ibis.interval(seconds=600),
)["a", "b", "c", "d", "g", "window_start", "window_end"]
expr = expr.mutate(
rownum=ibis.row_number().over(
group_by=["window_start", "window_end"], order_by=ibis.desc("g")
)
)
expr = expr[expr.rownum <= 3]
result = con.compile(expr)
snapshot.assert_match(result, "out.sql")
13 changes: 0 additions & 13 deletions ibis/backends/flink/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@ def test_window_does_not_support_multiple_order_by(con, simple_table):
con.compile(expr)


def test_window_does_not_support_desc_order(con, simple_table):
expr = simple_table.f.sum().over(
rows=(-1, 1),
group_by=[simple_table.g, simple_table.a],
order_by=[simple_table.f.desc()],
)
with pytest.raises(
UnsupportedOperationError,
match="Flink only supports windows ordered in ASCENDING mode",
):
con.compile(expr)


@pytest.mark.parametrize(
("window", "err"),
[
Expand Down

0 comments on commit 256767f

Please sign in to comment.