Skip to content

Commit

Permalink
[FLINK-36417][table] Add support for hints in WatermarkAssigner
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin authored Oct 3, 2024
1 parent 2cac884 commit 4101316
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ public RelBuilder windowAggregate(
public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
final RelNode input = build();
final RelNode relNode =
LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr);
LogicalWatermarkAssigner.create(
cluster, input, Collections.emptyList(), rowtimeFieldIndex, watermarkExpr);
return push(relNode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.hint;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner;
import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;

Expand Down Expand Up @@ -119,7 +120,8 @@ public static Optional<String> getTableAlias(RelNode node) {
public static boolean canTransposeToTableScan(RelNode node) {
return node instanceof LogicalProject // computed column on table
|| node instanceof LogicalFilter
|| node instanceof LogicalSnapshot;
|| node instanceof LogicalSnapshot
|| node instanceof WatermarkAssigner;
}

/** Returns the qualified name of a table scan, otherwise returns empty. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ private RelNode transmitSnapshotRequirement(RelNode node) {
final RelNode newChild = transmitSnapshotRequirement(child);
if (newChild != child) {
return wma.copy(
wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), wma.watermarkExpr());
wma.getTraitSet(),
newChild,
wma.getHints(),
wma.rowtimeFieldIndex(),
wma.watermarkExpr());
}
return wma;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ private RelNode pushDownTransformedWatermark(
watermark.copy(
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()),
exchange.getInput(),
Collections.emptyList(),
newRowTimeFieldIndex,
newWatermarkExpr);
final RelNode newChangelogNormalize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.apache.flink.table.planner.plan.nodes.calcite

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

import java.util

/**
* Sub-class of [[WatermarkAssigner]] that is a relational operator which generates
* [[org.apache.flink.streaming.api.watermark.Watermark]]. This class corresponds to Calcite logical
Expand All @@ -30,16 +33,22 @@ final class LogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) {
extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) {

override def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
new LogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
new LogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark)
}

override def withHints(hintList: util.List[RelHint]): RelNode = {
new LogicalWatermarkAssigner(cluster, traits, input, hintList, rowtimeFieldIndex, watermarkExpr)
}
}

Expand All @@ -48,9 +57,10 @@ object LogicalWatermarkAssigner {
def create(
cluster: RelOptCluster,
input: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode): LogicalWatermarkAssigner = {
val traits = cluster.traitSetOf(Convention.NONE)
new LogicalWatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr)
new LogicalWatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.calcite

import org.apache.flink.table.planner.calcite.FlinkTypeFactory

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rel.hint.{Hintable, RelHint}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName

import java.util
import java.util.ArrayList

import scala.collection.JavaConversions._

Expand All @@ -34,9 +37,11 @@ abstract class WatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode,
val hints: util.List[RelHint],
val rowtimeFieldIndex: Int,
val watermarkExpr: RexNode)
extends SingleRel(cluster, traits, inputRel) {
extends SingleRel(cluster, traits, inputRel)
with Hintable {

override def deriveRowType(): RelDataType = {
val inputRowType = inputRel.getRowType
Expand Down Expand Up @@ -68,10 +73,21 @@ abstract class WatermarkAssigner(
}

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
copy(traitSet, inputs.get(0), hints, rowtimeFieldIndex, watermarkExpr)
}

/** Copies a new WatermarkAssigner. */
def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: RexNode): RelNode
def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode

override def getHints: ImmutableList[RelHint] = {
val arrayHints = hints.toArray(new Array[RelHint](0))
ImmutableList.copyOf(arrayHints)
}

def withHints(hintList: util.List[RelHint]): RelNode
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

import java.util
import java.util.Collections

/**
* Sub-class of [[WatermarkAssigner]] that is a relational operator which generates
* [[org.apache.flink.streaming.api.watermark.Watermark]].
Expand All @@ -34,20 +38,31 @@ class FlinkLogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr)
extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr)
with FlinkLogicalRel {

/** Copies a new WatermarkAssigner. */
override def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark)
}

override def withHints(hintList: util.List[RelHint]): RelNode = {
new FlinkLogicalWatermarkAssigner(
cluster,
traitSet,
input,
hints,
rowtimeFieldIndex,
watermarkExpr)
}
}

class FlinkLogicalWatermarkAssignerConverter(config: Config) extends ConverterRule(config) {
Expand Down Expand Up @@ -76,6 +91,12 @@ object FlinkLogicalWatermarkAssigner {
watermarkExpr: RexNode): FlinkLogicalWatermarkAssigner = {
val cluster = input.getCluster
val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeFieldIndex, watermarkExpr)
new FlinkLogicalWatermarkAssigner(
cluster,
traitSet,
input,
Collections.emptyList(),
rowtimeFieldIndex,
watermarkExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,33 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

import java.util

import scala.collection.JavaConversions._

/** Stream physical RelNode for [[WatermarkAssigner]]. */
class StreamPhysicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex, watermarkExpr)
extends WatermarkAssigner(cluster, traits, inputRel, hints, rowtimeFieldIndex, watermarkExpr)
with StreamPhysicalRel {

override def requireWatermark: Boolean = false

override def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark)
}

/** Fully override this method to have a better display name of this RelNode. */
Expand Down Expand Up @@ -75,4 +80,14 @@ class StreamPhysicalWatermarkAssigner(
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}

override def withHints(hintList: util.List[RelHint]): RelNode = {
new StreamPhysicalWatermarkAssigner(
cluster,
traitSet,
input,
hints,
rowtimeFieldIndex,
watermarkExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config

import java.util.Collections

/** Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamPhysicalWatermarkAssigner]]. */
class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(config) {

Expand All @@ -39,6 +41,7 @@ class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(
watermarkAssigner.getCluster,
traitSet,
convertInput,
Collections.emptyList(),
watermarkAssigner.rowtimeFieldIndex,
watermarkAssigner.watermarkExpr
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ void before() {
+ " 'connector' = 'values'\n"
+ ")");

util.tableEnv()
.executeSql(
"CREATE TABLE tableWithWatermark1 (\n"
+ " a INT,\n"
+ " b BIGINT,\n"
+ " c TIMESTAMP(3),"
+ " WATERMARK FOR c AS c"
+ ") WITH ("
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false'\n"
+ ")");

util.tableEnv()
.executeSql(
"CREATE TABLE tableWithWatermark2 ("
+ " a int,\n"
+ " b BIGINT,\n"
+ " c ROW<c1 TIMESTAMP(3)>,\n"
+ " d AS c.c1 + INTERVAL '5' SECOND,\n"
+ " WATERMARK FOR d as d - INTERVAL '5' second"
+ ") WITH ("
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false'\n"
+ ")");

util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as b4 from T3");

util.tableEnv()
Expand Down Expand Up @@ -297,6 +322,26 @@ void testAggStateTtlWithEmptyKV() {
"Invalid STATE_TTL hint, expecting at least one key-value options specified.");
}

@Test
void testWatermarkAssigner() {
String sql =
"\n"
+ "SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1\n"
+ "LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2\n"
+ "ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL";
verify(sql);
}

@Test
void testWatermarkAssignerWithAliases() {
String sql =
"\n"
+ "SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM tableWithWatermark1 tww1\n"
+ "LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2\n"
+ "ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL";
verify(sql);
}

private void verify(String sql) {
util.doVerifyPlan(
sql,
Expand Down
Loading

0 comments on commit 4101316

Please sign in to comment.