Skip to content

Commit

Permalink
[FLINK-34702][table] Do not deduplicate GroupAggregate and some joins…
Browse files Browse the repository at this point in the history
… for rank
  • Loading branch information
snuyanzin committed Sep 21, 2024
1 parent 52e23d5 commit 0bcb921
Show file tree
Hide file tree
Showing 3 changed files with 640 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalD
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange}

import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.RelCollation
import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel}
import org.apache.calcite.rel.core.{Aggregate, Join, JoinRelType, SetOp}
import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, RexNode, RexUtil}
import org.apache.calcite.sql.SqlKind

Expand Down Expand Up @@ -354,7 +356,61 @@ object RankUtil {
val inputRowType = rank.getInput.getRowType
val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation, inputRowType)

!rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType
val isInputConvertableToDeduplicate = canConvertInputToDeduplicate(rank)

!rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType && isInputConvertableToDeduplicate
}

private def canConvertInputToDeduplicate(rank: RelNode): Boolean = {
rank match {
case singleRel: SingleRel =>
return canConvertInputToDeduplicate(singleRel.getInput)

case setOp: SetOp =>
for (relNode <- setOp.getInputs) {
if (!canConvertInputToDeduplicate(relNode)) {
return false
}
}
return true

case subset: RelSubset =>
subset.getOriginal match {
case agg: Aggregate =>
return false

case rel: SingleRel =>
return canConvertInputToDeduplicate(rel)

case setOp: SetOp =>
return canConvertInputToDeduplicate(setOp)

case join: Join =>
join.getJoinType match {
case JoinRelType.LEFT =>
return false
case JoinRelType.RIGHT =>
return false
case JoinRelType.FULL =>
return false
case JoinRelType.SEMI =>
return false
case JoinRelType.ANTI =>
return false
case _ =>
return canConvertInputToDeduplicate(join.getLeft) && canConvertInputToDeduplicate(
join.getRight)
}

case _ =>
return true

}

case _ =>
return true
}
return true
}

private def sortOnTimeAttribute(
Expand Down
Loading

0 comments on commit 0bcb921

Please sign in to comment.