diff --git a/ibis/backends/base/sql/compiler/select_builder.py b/ibis/backends/base/sql/compiler/select_builder.py index b88dd76aa2e4..41abc75b4a33 100644 --- a/ibis/backends/base/sql/compiler/select_builder.py +++ b/ibis/backends/base/sql/compiler/select_builder.py @@ -127,6 +127,8 @@ def _collect(self, op, toplevel=False): self._collect_PhysicalTable(op, toplevel=toplevel) elif isinstance(op, ops.Join): self._collect_Join(op, toplevel=toplevel) + elif isinstance(op, ops.WindowingTVF): + self._collect_WindowingTVF(op, toplevel=toplevel) else: raise NotImplementedError(type(op)) @@ -231,6 +233,11 @@ def _collect_SelfReference(self, op, toplevel=False): if toplevel: self._collect(op.table, toplevel=toplevel) + def _collect_WindowingTVF(self, op, toplevel=False): + if toplevel: + self.table_set = op + self.select_set = [op] + # -------------------------------------------------------------------- # Subquery analysis / extraction diff --git a/ibis/backends/flink/compiler/__init__.py b/ibis/backends/flink/compiler/__init__.py index 001d85010c48..e69de29bb2d1 100644 --- a/ibis/backends/flink/compiler/__init__.py +++ b/ibis/backends/flink/compiler/__init__.py @@ -1,9 +0,0 @@ -from __future__ import annotations - -from public import public - -from ibis.backends.flink.compiler.core import translate - -public( - translate=translate, -) diff --git a/ibis/backends/flink/compiler/core.py b/ibis/backends/flink/compiler/core.py index d746725cdae2..6be22d372888 100644 --- a/ibis/backends/flink/compiler/core.py +++ b/ibis/backends/flink/compiler/core.py @@ -6,12 +6,14 @@ import ibis.common.exceptions as com import ibis.expr.operations as ops +import ibis.expr.types as ir from ibis.backends.base.sql.compiler import ( Compiler, Select, SelectBuilder, TableSetFormatter, ) +from ibis.backends.base.sql.registry import quote_identifier from ibis.backends.flink.translator import FlinkExprTranslator @@ -28,8 +30,22 @@ def _format_in_memory_table(self, op): rows = ", ".join(f"({raw_row})" for raw_row in raw_rows) return f"(VALUES {rows})" + def _format_window_tvf(self, op) -> str: + if isinstance(op, ops.TumbleWindowingTVF): + function_type = "TUMBLE" + elif isinstance(op, ops.HopWindowingTVF): + function_type = "HOP" + elif isinstance(op, ops.CumulateWindowingTVF): + function_type = "CUMULATE" + return f"TABLE({function_type}({format_windowing_tvf_params(op, self)}))" + def _format_table(self, op) -> str: - result = super()._format_table(op) + ctx = self.context + if isinstance(op, ops.WindowingTVF): + formatted_table = self._format_window_tvf(op) + return f"{formatted_table} {ctx.get_ref(op)}" + else: + result = super()._format_table(op) ref_op = op if isinstance(op, ops.SelfReference): @@ -77,25 +93,72 @@ class FlinkCompiler(Compiler): cheap_in_memory_tables = True + @classmethod + def to_sql(cls, node, context=None, params=None): + if isinstance(node, ir.Expr): + node = node.op() -def translate(op: ops.TableNode) -> str: - return translate_op(op) - - -@functools.singledispatch -def translate_op(op: ops.TableNode) -> str: - raise com.OperationNotDefinedError(f"No translation rule for {type(op)}") - + if isinstance(node, ops.Literal): + from ibis.backends.flink.utils import translate_literal -@translate_op.register(ops.Literal) -def _literal(op: ops.Literal) -> str: - from ibis.backends.flink.utils import translate_literal + return translate_literal(node) - return translate_literal(op) + return super().to_sql(node, context, params) -@translate_op.register(ops.Selection) -@translate_op.register(ops.Aggregation) -@translate_op.register(ops.Limit) -def _(op: ops.Selection | ops.Aggregation | ops.Limit) -> str: - return FlinkCompiler.to_sql(op) # to_sql uses to_ast, which builds a select tree +@functools.singledispatch +def format_windowing_tvf_params( + op: ops.WindowingTVF, formatter: TableSetFormatter +) -> str: + raise com.OperationNotDefinedError(f"No formatting rule for {type(op)}") + + +@format_windowing_tvf_params.register(ops.TumbleWindowingTVF) +def _tumble_window_params( + op: ops.TumbleWindowingTVF, formatter: TableSetFormatter +) -> str: + return ", ".join( + filter( + None, + [ + f"TABLE {quote_identifier(op.table.name)}", + f"DESCRIPTOR({formatter._translate(op.time_col)})", + formatter._translate(op.window_size), + formatter._translate(op.offset) if op.offset else None, + ], + ) + ) + + +@format_windowing_tvf_params.register(ops.HopWindowingTVF) +def _hop_window_params(op: ops.HopWindowingTVF, formatter: TableSetFormatter) -> str: + return ", ".join( + filter( + None, + [ + f"TABLE {quote_identifier(op.table.name)}", + f"DESCRIPTOR({formatter._translate(op.time_col)})", + formatter._translate(op.window_slide), + formatter._translate(op.window_size), + formatter._translate(op.offset) if op.offset else None, + ], + ) + ) + + +@format_windowing_tvf_params.register(ops.CumulateWindowingTVF) +def _cumulate_window_params( + op: ops.CumulateWindowingTVF, formatter: TableSetFormatter +) -> str: + return ", ".join( + filter( + None, + [ + f"TABLE {quote_identifier(op.table.name)}", + f"DESCRIPTOR({formatter._translate(op.time_col)})", + formatter._translate(op.window_step), + formatter._translate(op.window_size), + formatter._translate(op.offset) if op.offset else None, + ], + ) + ) diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_complex_filtered_agg/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_complex_filtered_agg/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_complex_filtered_agg/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_complex_filtered_agg/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_complex_groupby_aggregation/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_complex_groupby_aggregation/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_complex_groupby_aggregation/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_complex_groupby_aggregation/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_complex_projections/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_complex_projections/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_complex_projections/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_complex_projections/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_count_star/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_count_star/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_count_star/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_count_star/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/day/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/day/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/day/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/day/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/day_of_year/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/day_of_year/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/day_of_year/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/day_of_year/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/hour/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/hour/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/hour/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/hour/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/minute/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/minute/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/minute/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/minute/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/month/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/month/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/month/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/month/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/quarter/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/quarter/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/quarter/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/quarter/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/second/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/second/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/second/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/second/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/week_of_year/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/week_of_year/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/week_of_year/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/week_of_year/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/year/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/year/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_extract_fields/year/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_extract_fields/year/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_filter/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_filter/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_filter/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_filter/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_having/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_having/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_having/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_having/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_simple_filtered_agg/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_simple_filtered_agg/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_simple_filtered_agg/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_simple_filtered_agg/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_sum/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_sum/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_sum/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_sum/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_timestamp_from_unix/timestamp_ms/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_timestamp_from_unix/timestamp_ms/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_timestamp_from_unix/timestamp_ms/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_timestamp_from_unix/timestamp_ms/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_timestamp_from_unix/timestamp_s/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_timestamp_from_unix/timestamp_s/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_timestamp_from_unix/timestamp_s/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_timestamp_from_unix/timestamp_s/out.sql diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/cumulate_window/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/cumulate_window/out.sql new file mode 100644 index 000000000000..522c6d576e1e --- /dev/null +++ b/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/cumulate_window/out.sql @@ -0,0 +1,2 @@ +SELECT t0.* +FROM TABLE(CUMULATE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '10' SECOND, INTERVAL '1' MINUTE)) t0 \ No newline at end of file diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/hop_window/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/hop_window/out.sql new file mode 100644 index 000000000000..38376568cba9 --- /dev/null +++ b/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/hop_window/out.sql @@ -0,0 +1,2 @@ +SELECT t0.* +FROM TABLE(HOP(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '1' MINUTE, INTERVAL '15' MINUTE)) t0 \ No newline at end of file diff --git a/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/tumble_window/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/tumble_window/out.sql new file mode 100644 index 000000000000..d46ca32d3d23 --- /dev/null +++ b/ibis/backends/flink/tests/snapshots/test_compiler/test_tvf/tumble_window/out.sql @@ -0,0 +1,2 @@ +SELECT t0.* +FROM TABLE(TUMBLE(TABLE `table`, DESCRIPTOR(`i`), INTERVAL '15' MINUTE)) t0 \ No newline at end of file diff --git a/ibis/backends/flink/tests/snapshots/test_translator/test_translate_value_counts/out.sql b/ibis/backends/flink/tests/snapshots/test_compiler/test_value_counts/out.sql similarity index 100% rename from ibis/backends/flink/tests/snapshots/test_translator/test_translate_value_counts/out.sql rename to ibis/backends/flink/tests/snapshots/test_compiler/test_value_counts/out.sql diff --git a/ibis/backends/flink/tests/test_translator.py b/ibis/backends/flink/tests/test_compiler.py similarity index 54% rename from ibis/backends/flink/tests/test_translator.py rename to ibis/backends/flink/tests/test_compiler.py index 27d4d2df917e..eaec12cdf89a 100644 --- a/ibis/backends/flink/tests/test_translator.py +++ b/ibis/backends/flink/tests/test_compiler.py @@ -3,18 +3,18 @@ import pytest from pytest import param -from ibis.backends.flink.compiler.core import translate +import ibis -def test_translate_sum(snapshot, simple_table): +def test_sum(con, snapshot, simple_table): expr = simple_table.a.sum() - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(str(result), "out.sql") -def test_translate_count_star(snapshot, simple_table): +def test_count_star(con, snapshot, simple_table): expr = simple_table.group_by(simple_table.i).size() - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(str(result), "out.sql") @@ -25,28 +25,28 @@ def test_translate_count_star(snapshot, simple_table): param("s", id="timestamp_s"), ], ) -def test_translate_timestamp_from_unix(snapshot, simple_table, unit): +def test_timestamp_from_unix(con, snapshot, simple_table, unit): expr = simple_table.d.to_timestamp(unit=unit) - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_complex_projections(snapshot, simple_table): +def test_complex_projections(con, snapshot, simple_table): expr = ( simple_table.group_by(["a", "c"]) .aggregate(the_sum=simple_table.b.sum()) .group_by("a") .aggregate(mad=lambda x: x.the_sum.abs().mean()) ) - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_filter(snapshot, simple_table): +def test_filter(con, snapshot, simple_table): expr = simple_table[ ((simple_table.c > 0) | (simple_table.c < 0)) & simple_table.g.isin(["A", "B"]) ] - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") @@ -64,50 +64,82 @@ def test_translate_filter(snapshot, simple_table): "second", ], ) -def test_translate_extract_fields(snapshot, simple_table, kind): +def test_extract_fields(con, snapshot, simple_table, kind): expr = getattr(simple_table.i, kind)().name("tmp") - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_complex_groupby_aggregation(snapshot, simple_table): +def test_complex_groupby_aggregation(con, snapshot, simple_table): keys = [simple_table.i.year().name("year"), simple_table.i.month().name("month")] b_unique = simple_table.b.nunique() expr = simple_table.group_by(keys).aggregate( total=simple_table.count(), b_unique=b_unique ) - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_simple_filtered_agg(snapshot, simple_table): +def test_simple_filtered_agg(con, snapshot, simple_table): expr = simple_table.b.nunique(where=simple_table.g == "A") - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_complex_filtered_agg(snapshot, simple_table): +def test_complex_filtered_agg(con, snapshot, simple_table): expr = simple_table.group_by("b").aggregate( total=simple_table.count(), avg_a=simple_table.a.mean(), avg_a_A=simple_table.a.mean(where=simple_table.g == "A"), avg_a_B=simple_table.a.mean(where=simple_table.g == "B"), ) - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_value_counts(snapshot, simple_table): +def test_value_counts(con, snapshot, simple_table): expr = simple_table.i.year().value_counts() - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_translate_having(snapshot, simple_table): +def test_having(con, snapshot, simple_table): expr = ( simple_table.group_by("g") .having(simple_table.count() >= 1000) .aggregate(simple_table.b.sum().name("b_sum")) ) - result = translate(expr.as_table().op()) + result = con.compile(expr) + snapshot.assert_match(result, "out.sql") + + +@pytest.mark.parametrize( + "function_type,params", + [ + pytest.param( + "tumble", {"window_size": ibis.interval(minutes=15)}, id="tumble_window" + ), + pytest.param( + "hop", + { + "window_size": ibis.interval(minutes=15), + "window_slide": ibis.interval(minutes=1), + }, + id="hop_window", + ), + pytest.param( + "cumulate", + { + "window_size": ibis.interval(minutes=1), + "window_step": ibis.interval(seconds=10), + }, + id="cumulate_window", + ), + ], +) +def test_tvf(con, snapshot, simple_table, function_type, params): + expr = getattr(simple_table.window_by(time_col=simple_table.i), function_type)( + **params + ) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") diff --git a/ibis/backends/flink/tests/test_literals.py b/ibis/backends/flink/tests/test_literals.py index ce6cd134ed01..091405d142d0 100644 --- a/ibis/backends/flink/tests/test_literals.py +++ b/ibis/backends/flink/tests/test_literals.py @@ -8,7 +8,6 @@ import ibis import ibis.expr.datatypes as dt -from ibis.backends.flink.compiler.core import translate @pytest.mark.parametrize( @@ -20,9 +19,9 @@ param(False, "FALSE", id="false"), ], ) -def test_simple_literals(value, expected): +def test_simple_literals(con, value, expected): expr = ibis.literal(value) - result = translate(expr.op()) + result = con.compile(expr) assert result == expected @@ -34,9 +33,9 @@ def test_simple_literals(value, expected): param('An "escape"', """'An "escape"'""", id="nested_token"), ], ) -def test_string_literals(value, expected): +def test_string_literals(con, value, expected): expr = ibis.literal(value) - result = translate(expr.op()) + result = con.compile(expr) assert result == expected @@ -54,9 +53,9 @@ def test_string_literals(value, expected): param(ibis.interval(seconds=5), "INTERVAL '5' SECOND", id="5seconds"), ], ) -def test_translate_interval_literal(value, expected): +def test_translate_interval_literal(con, value, expected): expr = ibis.literal(value) - result = translate(expr.op()) + result = con.compile(expr) assert result == expected @@ -75,7 +74,7 @@ def test_translate_interval_literal(value, expected): param("04:55:59", dt.time, id="string_time"), ], ) -def test_literal_timestamp_or_time(snapshot, case, dtype): +def test_literal_timestamp_or_time(con, snapshot, case, dtype): expr = ibis.literal(case, type=dtype) - result = translate(expr.op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") diff --git a/ibis/backends/flink/tests/test_window.py b/ibis/backends/flink/tests/test_window.py index 6a2631578de8..e40ac8744252 100644 --- a/ibis/backends/flink/tests/test_window.py +++ b/ibis/backends/flink/tests/test_window.py @@ -4,20 +4,19 @@ from pytest import param import ibis -from ibis.backends.flink.compiler.core import translate from ibis.common.exceptions import UnsupportedOperationError -def test_window_requires_order_by(simple_table): +def test_window_requires_order_by(con, simple_table): expr = simple_table.mutate(simple_table.c - simple_table.c.mean()) with pytest.raises( UnsupportedOperationError, match="Flink engine does not support generic window clause with no order by", ): - translate(expr.as_table().op()) + con.compile(expr) -def test_window_does_not_support_multiple_order_by(simple_table): +def test_window_does_not_support_multiple_order_by(con, simple_table): expr = simple_table.f.sum().over( rows=(-1, 1), group_by=[simple_table.g, simple_table.a], @@ -27,10 +26,10 @@ def test_window_does_not_support_multiple_order_by(simple_table): UnsupportedOperationError, match="Windows in Flink can only be ordered by a single time column", ): - translate(expr.as_table().op()) + con.compile(expr) -def test_window_does_not_support_desc_order(simple_table): +def test_window_does_not_support_desc_order(con, simple_table): expr = simple_table.f.sum().over( rows=(-1, 1), group_by=[simple_table.g, simple_table.a], @@ -40,7 +39,7 @@ def test_window_does_not_support_desc_order(simple_table): UnsupportedOperationError, match="Flink only supports windows ordered in ASCENDING mode", ): - translate(expr.as_table().op()) + con.compile(expr) @pytest.mark.parametrize( @@ -68,21 +67,21 @@ def test_window_does_not_support_desc_order(simple_table): ), ], ) -def test_window_invalid_start_end(simple_table, window, err): +def test_window_invalid_start_end(con, simple_table, window, err): expr = simple_table.f.sum().over(**window, order_by=simple_table.f) with pytest.raises(UnsupportedOperationError, match=err): - translate(expr.as_table().op()) + con.compile(expr) -def test_range_window(snapshot, simple_table): +def test_range_window(con, snapshot, simple_table): expr = simple_table.f.sum().over( range=(-ibis.interval(minutes=500), 0), order_by=simple_table.f ) - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") -def test_rows_window(snapshot, simple_table): +def test_rows_window(con, snapshot, simple_table): expr = simple_table.f.sum().over(rows=(-1000, 0), order_by=simple_table.f) - result = translate(expr.as_table().op()) + result = con.compile(expr) snapshot.assert_match(result, "out.sql") diff --git a/ibis/expr/format.py b/ibis/expr/format.py index 2c4028214b3f..3c85af889153 100644 --- a/ibis/expr/format.py +++ b/ibis/expr/format.py @@ -193,6 +193,7 @@ def fmt(op, **kwargs): @fmt.register(ops.Relation) @fmt.register(ops.DummyTable) +@fmt.register(ops.WindowingTVF) def _relation(op, **kwargs): schema = render_schema(op.schema, indent_level=1) return f"{op.__class__.__name__}\n{schema}" diff --git a/ibis/expr/operations/__init__.py b/ibis/expr/operations/__init__.py index f0ef10889bda..ebdcd931fe88 100644 --- a/ibis/expr/operations/__init__.py +++ b/ibis/expr/operations/__init__.py @@ -16,6 +16,7 @@ from ibis.expr.operations.strings import * # noqa: F403 from ibis.expr.operations.structs import * # noqa: F403 from ibis.expr.operations.temporal import * # noqa: F403 +from ibis.expr.operations.temporal_windows import * # noqa: F403 from ibis.expr.operations.udf import * # noqa: F403 from ibis.expr.operations.vectorized import * # noqa: F403 from ibis.expr.operations.window import * # noqa: F403 diff --git a/ibis/expr/operations/temporal_windows.py b/ibis/expr/operations/temporal_windows.py new file mode 100644 index 000000000000..415f0b026fd2 --- /dev/null +++ b/ibis/expr/operations/temporal_windows.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import Optional + +from public import public + +import ibis.expr.datatypes as dt +from ibis.expr.operations.core import Column, Scalar # noqa: TCH001 +from ibis.expr.operations.relations import Relation +from ibis.expr.schema import Schema + + +@public +class WindowingTVF(Relation): + """Generic windowing table-valued function.""" + + table: Relation + time_col: Column[dt.Timestamp] # enforce timestamp column type here + + @property + def schema(self): + names = list(self.table.schema.names) + types = list(self.table.schema.types) + + # The return value of windowing TVF is a new relation that includes all columns + # of original relation as well as additional 3 columns named “window_start”, + # “window_end”, “window_time” to indicate the assigned window + + names.extend(["window_start", "window_end", "window_time"]) + # window_start, window_end, window_time have type TIMESTAMP(3) in Flink + types.extend([dt.timestamp(scale=3)] * 3) + + return Schema.from_tuples(list(zip(names, types))) + + +@public +class TumbleWindowingTVF(WindowingTVF): + """TUMBLE window table-valued function.""" + + window_size: Scalar[dt.Interval] + offset: Optional[Scalar[dt.Interval]] = None + + +@public +class HopWindowingTVF(WindowingTVF): + """HOP window table-valued function.""" + + window_size: Scalar[dt.Interval] + window_slide: Scalar[dt.Interval] + offset: Optional[Scalar[dt.Interval]] = None + + +@public +class CumulateWindowingTVF(WindowingTVF): + """CUMULATE window table-valued function.""" + + window_size: Scalar[dt.Interval] + window_step: Scalar[dt.Interval] + offset: Optional[Scalar[dt.Interval]] = None diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py index df12ffe735db..548c1a080abb 100644 --- a/ibis/expr/types/relations.py +++ b/ibis/expr/types/relations.py @@ -30,6 +30,7 @@ import ibis.selectors as s from ibis.common.typing import SupportsSchema from ibis.expr.types.groupby import GroupedTable + from ibis.expr.types.tvf import WindowedTable from ibis.selectors import IfAnyAll, Selector _ALIASES = (f"_ibis_view_{n:d}" for n in itertools.count()) @@ -4220,6 +4221,26 @@ def relocate( return relocated + def window_by(self, time_col: ir.Value) -> WindowedTable: + """Create a windowing table-valued function (TVF) expression. + + Windowing table-valued functions (TVF) assign rows of a table to windows + based on a time attribute column in the table. + + Parameters + ---------- + time_col + Column of the table that will be mapped to windows. + + Returns + ------- + WindowedTable + WindowedTable expression. + """ + from ibis.expr.types.temporal_windows import WindowedTable + + return WindowedTable(self, time_col) + @public class CachedTable(Table): @@ -4241,7 +4262,7 @@ def _resolve_predicates( table: Table, predicates ) -> tuple[list[ir.BooleanValue], list[tuple[ir.BooleanValue, ir.Table]]]: import ibis.expr.types as ir - from ibis.expr.analysis import p, flatten_predicate, _ + from ibis.expr.analysis import _, flatten_predicate, p # TODO(kszucs): clean this up, too much flattening and resolving happens here predicates = [ diff --git a/ibis/expr/types/temporal_windows.py b/ibis/expr/types/temporal_windows.py new file mode 100644 index 000000000000..d357c127b15a --- /dev/null +++ b/ibis/expr/types/temporal_windows.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from public import public + +import ibis.common.exceptions as com +import ibis.expr.analysis as an +import ibis.expr.operations as ops +import ibis.expr.types as ir +from ibis.common.deferred import Deferred +from ibis.selectors import Selector + +if TYPE_CHECKING: + from ibis.expr.types import Table + + +def _get_window_by_key(table, value): + if isinstance(value, str): + return table[value] + elif isinstance(value, Deferred): + return value.resolve(table) + elif isinstance(value, Selector): + matches = value.expand(table) + if len(matches) != 1: + raise com.IbisInputError( + "Multiple columns match the selector; only 1 is expected" + ) + return next(iter(matches)) + elif isinstance(value, ir.Expr): + return an.sub_immediate_parents(value.op(), table.op()).to_expr() + else: + return value + + +@public +class WindowedTable: + """An intermediate table expression to hold windowing information.""" + + def __init__(self, table: ir.Table, time_col: ir.Value): + self.table = table + self.time_col = _get_window_by_key(table, time_col) + + if self.time_col is None: + raise com.IbisInputError( + "Window aggregations require `time_col` as an argument" + ) + + def tumble( + self, + window_size: ir.IntervalScalar, + offset: ir.IntervalScalar | None = None, + ) -> Table: + """Compute a tumble table valued function. + + Tumbling windows have a fixed size and do not overlap. The size of the windows is + determined by `window_size`, optionally shifted by a duration specified by `offset`. + + Parameters + ---------- + window_size + Width of the tumbling windows. + offset + An optional parameter to specify the offset which window start should be shifted by. + + Returns + ------- + Table + Table expression after applying tumbling table-valued function. + """ + return ops.TumbleWindowingTVF( + table=self.table, + time_col=_get_window_by_key(self.table, self.time_col), + window_size=window_size, + offset=offset, + ).to_expr() + + def hop( + self, + window_size: ir.IntervalScalar, + window_slide: ir.IntervalScalar, + offset: ir.IntervalScalar | None = None, + ): + """Compute a hop table valued function. + + Hopping windows have a fixed size and can be overlapping if the slide is smaller than the + window size (in which case elements can be assigned to multiple windows). Hopping windows + are also known as sliding windows. The size of the windows is determined by `window_size`, + how frequently a hopping window is started is determined by `window_slide`, and windows can + be optionally shifted by a duration specified by `offset`. + + For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, + you get every 5 minutes a window that contains the events that arrived during the last 10 minutes. + + Parameters + ---------- + window_size + Width of the hopping windows. + window_slide + The duration between the start of sequential hopping windows. + offset + An optional parameter to specify the offset which window start should be shifted by. + + Returns + ------- + Table + Table expression after applying hopping table-valued function. + """ + return ops.HopWindowingTVF( + table=self.table, + time_col=_get_window_by_key(self.table, self.time_col), + window_size=window_size, + window_slide=window_slide, + offset=offset, + ).to_expr() + + def cumulate( + self, + window_size: ir.IntervalScalar, + window_step: ir.IntervalScalar, + offset: ir.IntervalScalar | None = None, + ): + """Compute a cumulate table valued function. + + Cumulate windows don't have a fixed size and do overlap. Cumulate windows assign elements to windows + that cover rows within an initial interval of step size and expand to one more step size (keep window + start fixed) every step until the max window size. + + For example, you could have a cumulating window for 1 hour step and 1 day max size, and you will get + windows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) for every day. + + Parameters + ---------- + window_size + Max width of the cumulating windows. + window_step + A duration specifying the increased window size between the end of sequential cumulating windows. + offset + An optional parameter to specify the offset which window start should be shifted by. + + Returns + ------- + Table + Table expression after applying cumulate table-valued function. + """ + return ops.CumulateWindowingTVF( + table=self.table, + time_col=_get_window_by_key(self.table, self.time_col), + window_size=window_size, + window_step=window_step, + offset=offset, + ).to_expr() diff --git a/ibis/tests/expr/test_temporal_windows.py b/ibis/tests/expr/test_temporal_windows.py new file mode 100644 index 000000000000..928c26726d84 --- /dev/null +++ b/ibis/tests/expr/test_temporal_windows.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import datetime + +import pytest + +import ibis +import ibis.common.exceptions as com +import ibis.expr.datatypes as dt +import ibis.expr.operations as ops +from ibis import selectors as s +from ibis.common.annotations import ValidationError +from ibis.common.deferred import _ + + +def test_tumble_tvf_schema(schema, table): + expr = table.window_by(time_col=table.i).tumble( + window_size=ibis.interval(minutes=15) + ) + expected_schema = ibis.schema( + schema + + [ + ("window_start", dt.Timestamp(scale=3)), + ("window_end", dt.Timestamp(scale=3)), + ("window_time", dt.Timestamp(scale=3)), + ] + ) + assert expr.schema() == expected_schema + + +@pytest.mark.parametrize("wrong_type_window_size", ["60", 60]) +def test_create_tumble_tvf_with_wrong_scalar_type(table, wrong_type_window_size): + with pytest.raises(ValidationError, match=".* is not coercible to a .*"): + table.window_by(time_col=table.i).tumble(window_size=wrong_type_window_size) + + +def test_create_tumble_tvf_with_nonexistent_time_col(table): + with pytest.raises(com.IbisTypeError, match="Column .* is not found in table"): + table.window_by(time_col=table["nonexistent"]).tumble( + window_size=datetime.timedelta(seconds=60) + ) + + +def test_create_tumble_tvf_with_nonscalar_window_size(schema): + schema.append(("l", "interval")) + table = ibis.table(schema, name="table") + with pytest.raises(ValidationError, match=".* is not coercible to a .*"): + table.window_by(time_col=table.i).tumble(window_size=table.l) + + +def test_create_tumble_tvf_with_non_timestamp_time_col(table): + with pytest.raises(ValidationError, match=".* is not coercible to a .*"): + table.window_by(time_col=table.e).tumble(window_size=ibis.interval(minutes=15)) + + +def test_create_tumble_tvf_with_str_time_col(table): + expr = table.window_by(time_col="i").tumble(window_size=ibis.interval(minutes=15)) + assert isinstance(expr.op(), ops.TumbleWindowingTVF) + assert expr.op().time_col == table.i.op() + + +@pytest.mark.parametrize("deferred", [_["i"], _.i]) +def test_create_tumble_tvf_with_deferred_time_col(table, deferred): + expr = table.window_by(time_col=deferred.resolve(table)).tumble( + window_size=ibis.interval(minutes=15) + ) + assert isinstance(expr.op(), ops.TumbleWindowingTVF) + assert expr.op().time_col == table.i.op() + + +def test_create_tumble_tvf_with_selector_time_col(table): + expr = table.window_by(time_col=s.c("i")).tumble( + window_size=ibis.interval(minutes=15) + ) + assert isinstance(expr.op(), ops.TumbleWindowingTVF) + assert expr.op().time_col == table.i.op()