From 0bcb9216feab685ea4b7087c82f4c40f2466c86d Mon Sep 17 00:00:00 2001 From: snuyanzin Date: Sun, 22 Sep 2024 01:53:26 +0200 Subject: [PATCH] [FLINK-34702][table] Do not deduplicate GroupAggregate and some joins for rank --- .../table/planner/plan/utils/RankUtil.scala | 60 ++- .../plan/stream/sql/DeduplicateTest.xml | 415 ++++++++++++++++++ .../plan/stream/sql/DeduplicateTest.scala | 167 +++++++ 3 files changed, 640 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala index e4719f5d6c9b3..a595f55ee50c2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala @@ -26,8 +26,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalD import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange} import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.plan.volcano.RelSubset import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.RelCollation +import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.calcite.rel.core.{Aggregate, Join, JoinRelType, SetOp} import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, RexNode, RexUtil} import org.apache.calcite.sql.SqlKind @@ -354,7 +356,61 @@ object RankUtil { val inputRowType = rank.getInput.getRowType val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation, inputRowType) - !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType + val isInputConvertableToDeduplicate = canConvertInputToDeduplicate(rank) + + !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType && isInputConvertableToDeduplicate + } + + private def canConvertInputToDeduplicate(rank: RelNode): Boolean = { + rank match { + case singleRel: SingleRel => + return canConvertInputToDeduplicate(singleRel.getInput) + + case setOp: SetOp => + for (relNode <- setOp.getInputs) { + if (!canConvertInputToDeduplicate(relNode)) { + return false + } + } + return true + + case subset: RelSubset => + subset.getOriginal match { + case agg: Aggregate => + return false + + case rel: SingleRel => + return canConvertInputToDeduplicate(rel) + + case setOp: SetOp => + return canConvertInputToDeduplicate(setOp) + + case join: Join => + join.getJoinType match { + case JoinRelType.LEFT => + return false + case JoinRelType.RIGHT => + return false + case JoinRelType.FULL => + return false + case JoinRelType.SEMI => + return false + case JoinRelType.ANTI => + return false + case _ => + return canConvertInputToDeduplicate(join.getLeft) && canConvertInputToDeduplicate( + join.getRight) + } + + case _ => + return true + + } + + case _ => + return true + } + return true } private def sortOnTimeAttribute( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml index e5a52edd29051..1db32253ea918 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml @@ -363,4 +363,419 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1 AS ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala index 093b3a12415d0..a891f28095cf0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala @@ -40,6 +40,14 @@ class DeduplicateTest extends TableTestBase { 'c, 'proctime.proctime, 'rowtime.rowtime) + + util.addDataStream[(Int, String, Long)]( + "MyTable2", + 'a, + 'b, + 'c, + 'proctime.proctime, + 'rowtime.rowtime) } @Test @@ -272,4 +280,163 @@ class DeduplicateTest extends TableTestBase { util.verifyExecPlan(sqlQuery) } + @Test + def testRankConsumeChangelogGroupAggregate(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT a, COUNT(b) as b FROM MyTable GROUP BY a) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogDistinct(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT DISTINCT a, b FROM MyTable) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogLeftOuterJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT m.a, m.b FROM MyTable m LEFT OUTER JOIN MyTable2 m2 ON m.a = m2.a) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogRightOuterJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT m.a, m.b FROM MyTable m RIGHT OUTER JOIN MyTable2 m2 ON m.a = m2.a) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogFullOuterJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT m.a, m.b FROM MyTable m FULL OUTER JOIN MyTable2 m2 ON m.a = m2.a) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogUnionGroupAggregate(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT DISTINCT a, b FROM MyTable + | UNION ALL + | SELECT count(a), b FROM MyTable GROUP BY b) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogIntersectSemiJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT DISTINCT a, b FROM MyTable + | INTERSECT + | SELECT a, b FROM MyTable2) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogExceptAntiJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT DISTINCT a, b FROM MyTable + | EXCEPT + | SELECT a, b FROM MyTable2) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogExistsAntiJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT a, b FROM MyTable + | WHERE NOT EXISTS ( + | SELECT a, b FROM MyTable2)) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } + + @Test + def testRankConsumeChangelogExistsSemiJoin(): Unit = { + val sqlQuery = + """ + |SELECT * + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM (SELECT a, b FROM MyTable + | WHERE EXISTS ( + | SELECT a, b FROM MyTable2)) + |) + |WHERE rowNum = 1 + """.stripMargin + util.verifyExecPlan(sqlQuery) + } }