From a1bdacafb5723edafcb653296523bf8fa55fcf86 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Sat, 2 Mar 2024 16:49:43 +0100 Subject: [PATCH] init --- .../operations/DeletePushDownUtils.java | 6 +- .../logical/SimplifyFilterConditionRule.java | 151 ++++++++++++++++++ .../plan/rules/FlinkBatchRuleSets.scala | 4 +- .../plan/rules/FlinkStreamRuleSets.scala | 4 +- .../logical/SimplifyFilterConditionRule.scala | 98 ------------ .../FlinkJoinPushExpressionsRuleTest.scala | 2 +- .../SimplifyFilterConditionRuleTest.scala | 3 +- 7 files changed, 161 insertions(+), 107 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java index d5270cb89b7c2f..e92190c4d2db2d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java @@ -146,7 +146,7 @@ private static Filter prepareFilter(Filter filter) { // we try to reduce and simplify the filter ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = ReduceExpressionsRuleProxy.INSTANCE; SimplifyFilterConditionRule simplifyFilterConditionRule = - SimplifyFilterConditionRule.INSTANCE(); + SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.INSTANCE; // max iteration num for reducing and simplifying filter, // we use 5 as the max iteration num which is same with the iteration num in Flink's plan // optimizing. @@ -169,9 +169,9 @@ private static Filter prepareFilter(Filter filter) { // create a new filter filter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); // then apply the rule to simplify filter - Option changedFilter = + Optional changedFilter = simplifyFilterConditionRule.simplify(filter, new boolean[] {false}); - if (changedFilter.isDefined()) { + if (changedFilter.isPresent()) { filter = changedFilter.get(); changed = true; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java new file mode 100644 index 00000000000000..837a8a90aec3a8 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.Optional; + +/** + * Planner rule that apply various simplifying transformations on filter condition. + * + *

if `simplifySubQuery` is true, this rule will also simplify the filter condition in + * [[RexSubQuery]]. + */ +@Value.Enclosing +public class SimplifyFilterConditionRule + extends RelRule { + /** + * Creates a RelRule. + * + * @param config + */ + protected SimplifyFilterConditionRule(SimplifyFilterConditionRuleConfig config) { + super(config); + } + /* + ( + simplifySubQuery: Boolean, description: String) + extends RelOptRule(operand(classOf[Filter], any()), description) { + */ + + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + boolean[] changed = new boolean[] {false}; + Optional newFilter = simplify(filter, changed); + if (newFilter.isPresent()) { + call.transformTo(newFilter.get()); + call.getPlanner().prune(filter); + } + } + + public Optional simplify(Filter filter, boolean[] changed) { + RexNode condition = + // config.isSimplifySubQuery + simplifyFilterConditionInSubQuery(filter.getCondition(), changed); + // : filter.getCondition(); + + RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); + RexNode simplifiedCondition = + FlinkRexUtil.simplify( + rexBuilder, condition, filter.getCluster().getPlanner().getExecutor()); + RexNode newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition); + + if (!changed[0] && !condition.equals(newCondition)) { + changed[0] = true; + } + + // just replace modified RexNode + return changed[0] + ? Optional.of(filter.copy(filter.getTraitSet(), filter.getInput(), newCondition)) + : Optional.empty(); + } + + private RexNode simplifyFilterConditionInSubQuery(RexNode condition, boolean[] changed) { + return condition.accept( + new RexShuttle() { + public RexNode visitSubQuery(RexSubQuery subQuery) { + RelNode newRel = + subQuery.rel.accept( + new RelShuttleImpl() { + public RelNode visit(LogicalFilter filter) { + return simplify(filter, changed).orElse(filter); + } + }); + if (changed[0]) { + return subQuery.clone(newRel); + } else { + return subQuery; + } + } + }); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + @Value.Style( + get = {"is*", "get*"}, + init = "with*", + defaults = @Value.Immutable(copy = false)) + public interface SimplifyFilterConditionRuleConfig extends RelRule.Config { + SimplifyFilterConditionRuleConfig DEFAULT = + ImmutableSimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.builder() + .build() + .withOperandSupplier(b0 -> b0.operand(Filter.class).anyInputs()) + .withDescription("SimplifyFilterConditionRule"); + + @Value.Default + default boolean isSimplifySubQuery() { + return false; + } + + /** Sets {@link #isSimplifySubQuery()}. */ + SimplifyFilterConditionRuleConfig withIsSimplifySubQuery(boolean simplifySubQuery); + + @Override + default SimplifyFilterConditionRule toRule() { + return new SimplifyFilterConditionRule(this); + } + } + + /** Holder for SimplifyFilterConditionRule. */ + public static class SimplifyFilterConditionRuleHolder { + public static final SimplifyFilterConditionRule INSTANCE = + SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT.toRule(); + public static final SimplifyFilterConditionRule EXTENDED = + SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT + .withDescription("SimplifyFilterConditionRule:simplifySubQuery") + .as(SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.class) + .withIsSimplifySubQuery(true) + .toRule(); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index d802156f4e1b96..94275f911dbff0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -32,7 +32,7 @@ import scala.collection.JavaConverters._ object FlinkBatchRuleSets { val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList( - SimplifyFilterConditionRule.EXTENDED, + SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED, FlinkRewriteSubQueryRule.FILTER, FlinkSubQueryRemoveRule.FILTER, JoinConditionTypeCoerceRule.INSTANCE, @@ -84,7 +84,7 @@ object FlinkBatchRuleSets { /** RuleSet to simplify predicate expressions in filters and joins */ private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList( - SimplifyFilterConditionRule.INSTANCE, + SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.INSTANCE, SimplifyJoinConditionRule.INSTANCE, JoinConditionTypeCoerceRule.INSTANCE, CoreRules.JOIN_PUSH_EXPRESSIONS diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index f079a8aca28730..b9ce10e60a98b1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -32,7 +32,7 @@ import scala.collection.JavaConverters._ object FlinkStreamRuleSets { val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList( - SimplifyFilterConditionRule.EXTENDED, + SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED, FlinkRewriteSubQueryRule.FILTER, FlinkSubQueryRemoveRule.FILTER, JoinConditionTypeCoerceRule.INSTANCE, @@ -88,7 +88,7 @@ object FlinkStreamRuleSets { /** RuleSet to simplify predicate expressions in filters and joins */ private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList( - SimplifyFilterConditionRule.INSTANCE, + SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.INSTANCE, SimplifyJoinConditionRule.INSTANCE, JoinConditionTypeCoerceRule.INSTANCE, CoreRules.JOIN_PUSH_EXPRESSIONS diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala deleted file mode 100644 index 79df7194245d9e..00000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.utils.FlinkRexUtil - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.{RelNode, RelShuttleImpl} -import org.apache.calcite.rel.core.Filter -import org.apache.calcite.rel.logical.LogicalFilter -import org.apache.calcite.rex._ - -/** - * Planner rule that apply various simplifying transformations on filter condition. - * - * if `simplifySubQuery` is true, this rule will also simplify the filter condition in - * [[RexSubQuery]]. - */ -class SimplifyFilterConditionRule(simplifySubQuery: Boolean, description: String) - extends RelOptRule(operand(classOf[Filter], any()), description) { - - override def onMatch(call: RelOptRuleCall): Unit = { - val filter: Filter = call.rel(0) - val changed = Array(false) - val newFilter = simplify(filter, changed) - newFilter match { - case Some(f) => - call.transformTo(f) - call.getPlanner.prune(filter) - case _ => // do nothing - } - } - - def simplify(filter: Filter, changed: Array[Boolean]): Option[Filter] = { - val condition = if (simplifySubQuery) { - simplifyFilterConditionInSubQuery(filter.getCondition, changed) - } else { - filter.getCondition - } - - val rexBuilder = filter.getCluster.getRexBuilder - val simplifiedCondition = - FlinkRexUtil.simplify(rexBuilder, condition, filter.getCluster.getPlanner.getExecutor) - val newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition) - - if (!changed.head && !condition.equals(newCondition)) { - changed(0) = true - } - - // just replace modified RexNode - if (changed.head) { - Some(filter.copy(filter.getTraitSet, filter.getInput, newCondition)) - } else { - None - } - } - - def simplifyFilterConditionInSubQuery(condition: RexNode, changed: Array[Boolean]): RexNode = { - condition.accept(new RexShuttle() { - override def visitSubQuery(subQuery: RexSubQuery): RexNode = { - val newRel = subQuery.rel.accept(new RelShuttleImpl() { - override def visit(filter: LogicalFilter): RelNode = { - simplify(filter, changed).getOrElse(filter) - } - }) - if (changed.head) { - subQuery.clone(newRel) - } else { - subQuery - } - } - }) - } - -} - -object SimplifyFilterConditionRule { - val INSTANCE = new SimplifyFilterConditionRule(false, "SimplifyFilterConditionRule") - - val EXTENDED = - new SimplifyFilterConditionRule(true, "SimplifyFilterConditionRule:simplifySubQuery") -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala index 8ae98ea755714e..958167f92701e6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala @@ -40,7 +40,7 @@ class FlinkJoinPushExpressionsRuleTest extends TableTestBase { .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(RuleSets.ofList( - SimplifyFilterConditionRule.EXTENDED, + SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED, FlinkRewriteSubQueryRule.FILTER, FlinkSubQueryRemoveRule.FILTER, JoinConditionTypeCoerceRule.INSTANCE, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala index fae4bb6d705177..c140cdd1eadff1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala @@ -39,7 +39,8 @@ class SimplifyFilterConditionRuleTest extends TableTestBase { FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) - .add(RuleSets.ofList(SimplifyFilterConditionRule.EXTENDED)) + .add( + RuleSets.ofList(SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED)) .build() ) util.replaceBatchProgram(programs)