Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Mar 2, 2024
1 parent 46cbf22 commit a1bdaca
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private static Filter prepareFilter(Filter filter) {
// we try to reduce and simplify the filter
ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = ReduceExpressionsRuleProxy.INSTANCE;
SimplifyFilterConditionRule simplifyFilterConditionRule =
SimplifyFilterConditionRule.INSTANCE();
SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.INSTANCE;
// max iteration num for reducing and simplifying filter,
// we use 5 as the max iteration num which is same with the iteration num in Flink's plan
// optimizing.
Expand All @@ -169,9 +169,9 @@ private static Filter prepareFilter(Filter filter) {
// create a new filter
filter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition);
// then apply the rule to simplify filter
Option<Filter> changedFilter =
Optional<Filter> changedFilter =
simplifyFilterConditionRule.simplify(filter, new boolean[] {false});
if (changedFilter.isDefined()) {
if (changedFilter.isPresent()) {
filter = changedFilter.get();
changed = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSubQuery;
import org.apache.calcite.rex.RexUtil;
import org.immutables.value.Value;

import java.util.Optional;

/**
* Planner rule that apply various simplifying transformations on filter condition.
*
* <p>if `simplifySubQuery` is true, this rule will also simplify the filter condition in
* [[RexSubQuery]].
*/
@Value.Enclosing
public class SimplifyFilterConditionRule
extends RelRule<SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig> {
/**
* Creates a RelRule.
*
* @param config
*/
protected SimplifyFilterConditionRule(SimplifyFilterConditionRuleConfig config) {
super(config);
}
/*
(
simplifySubQuery: Boolean, description: String)
extends RelOptRule(operand(classOf[Filter], any()), description) {
*/

public void onMatch(RelOptRuleCall call) {
Filter filter = call.rel(0);
boolean[] changed = new boolean[] {false};
Optional<Filter> newFilter = simplify(filter, changed);
if (newFilter.isPresent()) {
call.transformTo(newFilter.get());
call.getPlanner().prune(filter);
}
}

public Optional<Filter> simplify(Filter filter, boolean[] changed) {
RexNode condition =
// config.isSimplifySubQuery
simplifyFilterConditionInSubQuery(filter.getCondition(), changed);
// : filter.getCondition();

RexBuilder rexBuilder = filter.getCluster().getRexBuilder();
RexNode simplifiedCondition =
FlinkRexUtil.simplify(
rexBuilder, condition, filter.getCluster().getPlanner().getExecutor());
RexNode newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition);

if (!changed[0] && !condition.equals(newCondition)) {
changed[0] = true;
}

// just replace modified RexNode
return changed[0]
? Optional.of(filter.copy(filter.getTraitSet(), filter.getInput(), newCondition))
: Optional.empty();
}

private RexNode simplifyFilterConditionInSubQuery(RexNode condition, boolean[] changed) {
return condition.accept(
new RexShuttle() {
public RexNode visitSubQuery(RexSubQuery subQuery) {
RelNode newRel =
subQuery.rel.accept(
new RelShuttleImpl() {
public RelNode visit(LogicalFilter filter) {
return simplify(filter, changed).orElse(filter);
}
});
if (changed[0]) {
return subQuery.clone(newRel);
} else {
return subQuery;
}
}
});
}

/** Rule configuration. */
@Value.Immutable(singleton = false)
@Value.Style(
get = {"is*", "get*"},
init = "with*",
defaults = @Value.Immutable(copy = false))
public interface SimplifyFilterConditionRuleConfig extends RelRule.Config {
SimplifyFilterConditionRuleConfig DEFAULT =
ImmutableSimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.builder()
.build()
.withOperandSupplier(b0 -> b0.operand(Filter.class).anyInputs())
.withDescription("SimplifyFilterConditionRule");

@Value.Default
default boolean isSimplifySubQuery() {
return false;
}

/** Sets {@link #isSimplifySubQuery()}. */
SimplifyFilterConditionRuleConfig withIsSimplifySubQuery(boolean simplifySubQuery);

@Override
default SimplifyFilterConditionRule toRule() {
return new SimplifyFilterConditionRule(this);
}
}

/** Holder for SimplifyFilterConditionRule. */
public static class SimplifyFilterConditionRuleHolder {
public static final SimplifyFilterConditionRule INSTANCE =
SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT.toRule();
public static final SimplifyFilterConditionRule EXTENDED =
SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT
.withDescription("SimplifyFilterConditionRule:simplifySubQuery")
.as(SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.class)
.withIsSimplifySubQuery(true)
.toRule();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.JavaConverters._
object FlinkBatchRuleSets {

val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED,
FlinkRewriteSubQueryRule.FILTER,
FlinkSubQueryRemoveRule.FILTER,
JoinConditionTypeCoerceRule.INSTANCE,
Expand Down Expand Up @@ -84,7 +84,7 @@ object FlinkBatchRuleSets {

/** RuleSet to simplify predicate expressions in filters and joins */
private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.INSTANCE,
SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.INSTANCE,
SimplifyJoinConditionRule.INSTANCE,
JoinConditionTypeCoerceRule.INSTANCE,
CoreRules.JOIN_PUSH_EXPRESSIONS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.JavaConverters._
object FlinkStreamRuleSets {

val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED,
FlinkRewriteSubQueryRule.FILTER,
FlinkSubQueryRemoveRule.FILTER,
JoinConditionTypeCoerceRule.INSTANCE,
Expand Down Expand Up @@ -88,7 +88,7 @@ object FlinkStreamRuleSets {

/** RuleSet to simplify predicate expressions in filters and joins */
private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.INSTANCE,
SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.INSTANCE,
SimplifyJoinConditionRule.INSTANCE,
JoinConditionTypeCoerceRule.INSTANCE,
CoreRules.JOIN_PUSH_EXPRESSIONS
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FlinkJoinPushExpressionsRuleTest extends TableTestBase {
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED,
FlinkRewriteSubQueryRule.FILTER,
FlinkSubQueryRemoveRule.FILTER,
JoinConditionTypeCoerceRule.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class SimplifyFilterConditionRuleTest extends TableTestBase {
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(RuleSets.ofList(SimplifyFilterConditionRule.EXTENDED))
.add(
RuleSets.ofList(SimplifyFilterConditionRule.SimplifyFilterConditionRuleHolder.EXTENDED))
.build()
)
util.replaceBatchProgram(programs)
Expand Down

0 comments on commit a1bdaca

Please sign in to comment.