From c44506bcb288e01b2f619aefb23416ded553f4f0 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Sat, 2 Mar 2024 16:49:43 +0100 Subject: [PATCH] [FLINK-34597][table] Migrate SimplifyFilterConditionRule to java --- .../operations/DeletePushDownUtils.java | 6 +- .../logical/SimplifyFilterConditionRule.java | 139 ++++++++++++++++++ .../logical/SimplifyFilterConditionRule.scala | 98 ------------ 3 files changed, 142 insertions(+), 101 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java index d5270cb89b7c2f..c2e6bfb759c5f0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java @@ -146,7 +146,7 @@ private static Filter prepareFilter(Filter filter) { // we try to reduce and simplify the filter ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = ReduceExpressionsRuleProxy.INSTANCE; SimplifyFilterConditionRule simplifyFilterConditionRule = - SimplifyFilterConditionRule.INSTANCE(); + SimplifyFilterConditionRule.INSTANCE; // max iteration num for reducing and simplifying filter, // we use 5 as the max iteration num which is same with the iteration num in Flink's plan // optimizing. @@ -169,9 +169,9 @@ private static Filter prepareFilter(Filter filter) { // create a new filter filter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); // then apply the rule to simplify filter - Option changedFilter = + Optional changedFilter = simplifyFilterConditionRule.simplify(filter, new boolean[] {false}); - if (changedFilter.isDefined()) { + if (changedFilter.isPresent()) { filter = changedFilter.get(); changed = true; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java new file mode 100644 index 00000000000000..4807ec020ca69d --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.Optional; + +/** + * Planner rule that apply various simplifying transformations on filter condition. + * + *

If `simplifySubQuery` is true, this rule will also simplify the filter condition in {@link + * RexSubQuery}. + */ +@Value.Enclosing +public class SimplifyFilterConditionRule + extends RelRule { + + public static final SimplifyFilterConditionRule INSTANCE = + SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT.toRule(); + public static final SimplifyFilterConditionRule EXTENDED = + SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT + .withDescription("SimplifyFilterConditionRule:simplifySubQuery") + .as(SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.class) + .withIsSimplifySubQuery(true) + .toRule(); + + protected SimplifyFilterConditionRule(SimplifyFilterConditionRuleConfig config) { + super(config); + } + + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + boolean[] changed = new boolean[] {false}; + Optional newFilter = simplify(filter, changed); + if (newFilter.isPresent()) { + call.transformTo(newFilter.get()); + call.getPlanner().prune(filter); + } + } + + public Optional simplify(Filter filter, boolean[] changed) { + RexNode condition = + config.isSimplifySubQuery() + ? simplifyFilterConditionInSubQuery(filter.getCondition(), changed) + : filter.getCondition(); + + RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); + RexNode simplifiedCondition = + FlinkRexUtil.simplify( + rexBuilder, condition, filter.getCluster().getPlanner().getExecutor()); + RexNode newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition); + + if (!changed[0] && !condition.equals(newCondition)) { + changed[0] = true; + } + + // just replace modified RexNode + return changed[0] + ? Optional.of(filter.copy(filter.getTraitSet(), filter.getInput(), newCondition)) + : Optional.empty(); + } + + private RexNode simplifyFilterConditionInSubQuery(RexNode condition, boolean[] changed) { + return condition.accept( + new RexShuttle() { + public RexNode visitSubQuery(RexSubQuery subQuery) { + RelNode newRel = + subQuery.rel.accept( + new RelShuttleImpl() { + public RelNode visit(LogicalFilter filter) { + return simplify(filter, changed).orElse(filter); + } + }); + if (changed[0]) { + return subQuery.clone(newRel); + } else { + return subQuery; + } + } + }); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + @Value.Style( + get = {"is*", "get*"}, + init = "with*", + defaults = @Value.Immutable(copy = false)) + public interface SimplifyFilterConditionRuleConfig extends RelRule.Config { + SimplifyFilterConditionRuleConfig DEFAULT = + ImmutableSimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.builder() + .description("SimplifyFilterConditionRule") + .build() + .withOperandSupplier(b0 -> b0.operand(Filter.class).anyInputs()); + + @Value.Default + default boolean isSimplifySubQuery() { + return false; + } + + /** Sets {@link #isSimplifySubQuery()}. */ + SimplifyFilterConditionRuleConfig withIsSimplifySubQuery(boolean simplifySubQuery); + + @Override + default SimplifyFilterConditionRule toRule() { + return new SimplifyFilterConditionRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala deleted file mode 100644 index 79df7194245d9e..00000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.utils.FlinkRexUtil - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.{RelNode, RelShuttleImpl} -import org.apache.calcite.rel.core.Filter -import org.apache.calcite.rel.logical.LogicalFilter -import org.apache.calcite.rex._ - -/** - * Planner rule that apply various simplifying transformations on filter condition. - * - * if `simplifySubQuery` is true, this rule will also simplify the filter condition in - * [[RexSubQuery]]. - */ -class SimplifyFilterConditionRule(simplifySubQuery: Boolean, description: String) - extends RelOptRule(operand(classOf[Filter], any()), description) { - - override def onMatch(call: RelOptRuleCall): Unit = { - val filter: Filter = call.rel(0) - val changed = Array(false) - val newFilter = simplify(filter, changed) - newFilter match { - case Some(f) => - call.transformTo(f) - call.getPlanner.prune(filter) - case _ => // do nothing - } - } - - def simplify(filter: Filter, changed: Array[Boolean]): Option[Filter] = { - val condition = if (simplifySubQuery) { - simplifyFilterConditionInSubQuery(filter.getCondition, changed) - } else { - filter.getCondition - } - - val rexBuilder = filter.getCluster.getRexBuilder - val simplifiedCondition = - FlinkRexUtil.simplify(rexBuilder, condition, filter.getCluster.getPlanner.getExecutor) - val newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition) - - if (!changed.head && !condition.equals(newCondition)) { - changed(0) = true - } - - // just replace modified RexNode - if (changed.head) { - Some(filter.copy(filter.getTraitSet, filter.getInput, newCondition)) - } else { - None - } - } - - def simplifyFilterConditionInSubQuery(condition: RexNode, changed: Array[Boolean]): RexNode = { - condition.accept(new RexShuttle() { - override def visitSubQuery(subQuery: RexSubQuery): RexNode = { - val newRel = subQuery.rel.accept(new RelShuttleImpl() { - override def visit(filter: LogicalFilter): RelNode = { - simplify(filter, changed).getOrElse(filter) - } - }) - if (changed.head) { - subQuery.clone(newRel) - } else { - subQuery - } - } - }) - } - -} - -object SimplifyFilterConditionRule { - val INSTANCE = new SimplifyFilterConditionRule(false, "SimplifyFilterConditionRule") - - val EXTENDED = - new SimplifyFilterConditionRule(true, "SimplifyFilterConditionRule:simplifySubQuery") -}