Skip to content

Commit

Permalink
[FLINK-31836][table] Upgrade Calcite version to 1.34.0
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Feb 3, 2024
1 parent 79f89b5 commit 6650bc8
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 115 deletions.
4 changes: 2 additions & 2 deletions flink-table/flink-sql-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ under the License.
<version>${calcite.version}</version>
<exclusions>
<!--
"mvn dependency:tree" as of Calcite 1.33.0:
"mvn dependency:tree" as of Calcite 1.34.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.33.0:compile
[INFO] +- org.apache.calcite:calcite-core:jar:1.34.0:compile
[INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.23.0:compile
[INFO] | +- org.apiguardian:apiguardian-api:jar:1.1.2:compile
[INFO] | +- org.checkerframework:checker-qual:jar:3.12.0:compile
Expand Down
6 changes: 3 additions & 3 deletions flink-table/flink-table-calcite-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ under the License.
<version>${calcite.version}</version>
<exclusions>
<!--
"mvn dependency:tree" as of Calcite 1.33.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.33.0:compile
[INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.33.0:compile
"mvn dependency:tree" as of Calcite 1.34.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.34.0:compile
[INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.34.0:compile
[INFO] | +- org.locationtech.jts:jts-core:jar:1.19.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.15.3:compile
[INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.23.0:compile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,17 @@ private static void assertBasic(SqlTypeName typeName) {
resultType, nullCount > 0 || nullableCount > 0);
}
}

if (type.getSqlTypeName() == resultType.getSqlTypeName()
&& type.getSqlTypeName().allowsPrec()
&& type.getPrecision() != resultType.getPrecision()) {
final int precision =
SqlTypeUtil.maxPrecision(
resultType.getPrecision(), type.getPrecision());

resultType = createSqlType(type.getSqlTypeName(), precision);
}
} else {
// TODO: datetime precision details; for now we let
// leastRestrictiveByCast handle it
return null;
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1065,17 +1065,11 @@ private Frame decorrelateInputWithValueGenerator(RelNode rel, Frame frame) {

// can directly add positions into corDefOutputs since join
// does not change the output ordering from the inputs.
RelNode valueGen =
requireNonNull(
createValueGenerator(corVarList, leftInputOutputCount, corDefOutputs),
"createValueGenerator(...) is null");
final RelNode valueGen =
createValueGenerator(corVarList, leftInputOutputCount, corDefOutputs);
requireNonNull(valueGen, "valueGen");

RelNode join =
relBuilder
.push(frame.r)
.push(valueGen)
.join(JoinRelType.INNER, relBuilder.literal(true), ImmutableSet.of())
.build();
RelNode join = relBuilder.push(frame.r).push(valueGen).join(JoinRelType.INNER).build();

// Join or Filter does not change the old input ordering. All
// input fields from newLeftInput (i.e. the original input to the old
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexPatternFieldRef;
import org.apache.calcite.rex.RexRangeRef;
import org.apache.calcite.rex.RexShuttle;
Expand Down Expand Up @@ -225,6 +226,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.calcite.linq4j.Nullness.castNonNull;
import static org.apache.calcite.runtime.FlatLists.append;
import static org.apache.calcite.sql.SqlUtil.stripAs;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -780,6 +782,8 @@ protected void convertSelectImpl(final Blackboard bb, SqlSelect select) {
convertSelectList(bb, select, orderExprList);
}

convertQualify(bb, select.getQualify());

if (select.isDistinct()) {
distinctify(bb, true);
}
Expand Down Expand Up @@ -894,8 +898,7 @@ private void distinctify(Blackboard bb, boolean checkForDupExprs) {
}

assert rel != null : "rel must not be null, root = " + bb.root;
// Usual case: all of the expressions in the SELECT clause are
// different.
// Usual case: all expressions in the SELECT clause are different.
final ImmutableBitSet groupSet = ImmutableBitSet.range(rel.getRowType().getFieldCount());
rel = createAggregate(bb, groupSet, ImmutableList.of(groupSet), ImmutableList.of());

Expand Down Expand Up @@ -3368,8 +3371,8 @@ private void convertJoin(Blackboard bb, SqlJoin join) {
condition,
convertJoinType(join.getJoinType()));
relBuilder.push(joinRel);
final RelNode newProjectRel = relBuilder.project(relBuilder.fields()).build();
bb.setRoot(newProjectRel, false);
relBuilder.project(relBuilder.fields());
bb.setRoot(relBuilder.build(), false);
}

private RexNode convertNaturalCondition(
Expand All @@ -3387,8 +3390,8 @@ private RexNode convertUsingCondition(
SqlValidatorNamespace leftNamespace,
SqlValidatorNamespace rightNamespace) {
final SqlNodeList list =
(SqlNodeList)
requireNonNull(join.getCondition(), () -> "getCondition for join " + join);
requireNonNull(
(SqlNodeList) join.getCondition(), () -> "getCondition for join " + join);
return convertUsing(
leftNamespace,
rightNamespace,
Expand Down Expand Up @@ -4316,10 +4319,9 @@ protected void collectInsertTargets(

private RelNode convertDelete(SqlDelete call) {
RelOptTable targetTable = getTargetTable(call);
RelNode sourceRel =
convertSelect(
requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call),
false);
final SqlSelect sourceSelect =
requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call);
RelNode sourceRel = convertSelect(sourceSelect, false);
return LogicalTableModify.create(
targetTable,
catalogReader,
Expand All @@ -4331,11 +4333,9 @@ private RelNode convertDelete(SqlDelete call) {
}

private RelNode convertUpdate(SqlUpdate call) {
final SqlValidatorScope scope =
validator()
.getWhereScope(
requireNonNull(
call.getSourceSelect(), () -> "sourceSelect for " + call));
final SqlSelect sourceSelect =
requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call);
final SqlValidatorScope scope = validator().getWhereScope(sourceSelect);
Blackboard bb = createBlackboard(scope, null, false);

replaceSubQueries(bb, call, RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
Expand All @@ -4350,14 +4350,11 @@ private RelNode convertUpdate(SqlUpdate call) {
RelDataTypeField field =
SqlValidatorUtil.getTargetField(
targetRowType, typeFactory, id, catalogReader, targetTable);
assert field != null : "column " + id.toString() + " not found";
assert field != null : "column " + id + " not found";
targetColumnNameList.add(field.getName());
}

RelNode sourceRel =
convertSelect(
requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call),
false);
RelNode sourceRel = convertSelect(sourceSelect, false);

bb.setRoot(sourceRel, false);
ImmutableList.Builder<RexNode> rexNodeSourceExpressionListBuilder = ImmutableList.builder();
Expand Down Expand Up @@ -4403,10 +4400,9 @@ private RelNode convertMerge(SqlMerge call) {

// first, convert the merge's source select to construct the columns
// from the target table and the set expressions in the update call
RelNode mergeSourceRel =
convertSelect(
requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call),
false);
final SqlSelect sourceSelect =
requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call);
RelNode mergeSourceRel = convertSelect(sourceSelect, false);

// then, convert the insert statement so we can get the insert
// values expressions
Expand Down Expand Up @@ -4683,6 +4679,10 @@ private void convertSelectList(Blackboard bb, SqlSelect select, List<SqlNode> or
selectList = validator().expandStar(selectList, select, false);

replaceSubQueries(bb, selectList, RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
replaceSubQueries(
bb,
new SqlNodeList(orderList, SqlParserPos.ZERO),
RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);

List<String> fieldNames = new ArrayList<>();
final List<RexNode> exprs = new ArrayList<>();
Expand Down Expand Up @@ -4779,6 +4779,118 @@ private static String deriveAlias(
return alias;
}

private void convertQualify(Blackboard bb, @Nullable SqlNode qualify) {
if (qualify == null) {
return;
}

final LogicalProject projectionFromSelect =
requireNonNull((LogicalProject) bb.root, "root");

// Convert qualify SqlNode to a RexNode
replaceSubQueries(bb, qualify, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
final RelNode originalRoot = requireNonNull(bb.root, "root");
RexNode qualifyRexNode;
try {
// Set the root to the input of the project,
// since QUALIFY might have an expression in the OVER clause
// that references a column not in the SELECT.
bb.setRoot(projectionFromSelect.getInput(), false);
qualifyRexNode = bb.convertExpression(qualify);
} finally {
bb.setRoot(originalRoot, false);
}

// Check to see if the qualify expression has a referenced expression and
// do some referencing accordingly
final RexNode qualifyWithReferencesRexNode =
qualifyRexNode.accept(new DuplicateEliminator(projectionFromSelect.getProjects()));

// Create a Project with the QUALIFY expression
if (qualifyWithReferencesRexNode.equals(qualifyRexNode)) {
// The QUALIFY expression does not depend on any references like so:
//
// SELECT A, B
// FROM tbl
// QUALIFY WINDOW(C) = 1
//
// Meaning we should generate a plan like:
// Project(A, B, WINDOW(C) = 1 as QualifyExpression)
// TableScan(tbl)
//
relBuilder
.push(projectionFromSelect.getInput())
.project(
append(projectionFromSelect.getProjects(), qualifyRexNode),
append(
projectionFromSelect.getRowType().getFieldNames(),
"QualifyExpression"));
} else {
// The QUALIFY expression depended on a reference meaning
// we need to introduce an extra project like so:
//
// SELECT A, B, WINDOW(C) as window_val
// FROM tbl
// QUALIFY window_val = 1
//
// Meaning we should generate a plan like:
//
// Project($0, $1, $2, =($2, 1) as QualifyExpression)
// Project(A, B, WINDOW(C) as window_val)
// TableScan(tbl)
//
// This is a very specific application of Common Subexpression Elimination
// (CSE), since the window value pops up twice.
relBuilder
.push(requireNonNull(bb.root, "root"))
.project(
append(relBuilder.fields(), qualifyWithReferencesRexNode),
append(
relBuilder.peek().getRowType().getFieldNames(),
"QualifyExpression"));
}

// Filter on that extra column
relBuilder.filter(Util.last(relBuilder.fields()));

// Remove that extra column from the projection
relBuilder.project(
Util.first(relBuilder.fields(), projectionFromSelect.getProjects().size()));

// Update the root
bb.setRoot(relBuilder.build(), false);
}

/**
* Eliminates a common sub-expression by looking for a {@link RexNode} in the expressions of a
* {@link Project}; if found, returns a refIndex instead of the raw node.
*/
private static final class DuplicateEliminator extends RexShuttle {
private final List<RexNode> projects;

DuplicateEliminator(List<RexNode> projects) {
this.projects = projects;
}

@Override
public RexNode visitCall(RexCall call) {
final int i = projects.indexOf(call);
if (i >= 0) {
return new RexInputRef(i, projects.get(i).getType());
}
return super.visitCall(call);
}

@Override
public RexNode visitOver(RexOver over) {
final int i = projects.indexOf(over);
if (i >= 0) {
return new RexInputRef(i, projects.get(i).getType());
}
return over;
}
}

/** Converts a WITH sub-query into a relational expression. */
public RelRoot convertWith(SqlWith with, boolean top) {
return convertQuery(with.body, false, top);
Expand Down Expand Up @@ -6745,15 +6857,23 @@ default boolean isExplain() {
Config withExplain(boolean explain);

/**
* Returns the {@code expand} option. Controls whether to expand sub-queries. If false, each
* sub-query becomes a {@link org.apache.calcite.rex.RexSubQuery}.
* Returns the {@code expand} option. Controls whether to expand sub-queries. If false (the
* default), each sub-query becomes a {@link org.apache.calcite.rex.RexSubQuery}.
*
* <p>Setting {@code expand} to true is deprecated. Expansion still works, but there will be
* less development effort in that area.
*/
@Value.Default
default boolean isExpand() {
return true;
return false;
}

/** Sets {@link #isExpand()}. */
/**
* Sets {@link #isExpand()}.
*
* <p>Expansion is deprecated. We recommend that you do not call this method, and use the
* default value of {@link #isExpand()}, false.
*/
Config withExpand(boolean expand);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2288,7 +2288,7 @@ public RelBuilder projectNamed(
@Nullable Iterable<? extends @Nullable String> fieldNames,
boolean force,
Iterable<CorrelationId> variablesSet) {
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
final List<? extends RexNode> nodeList =
nodes instanceof List ? (List) nodes : ImmutableList.copyOf(nodes);
final List<@Nullable String> fieldNameList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ This project bundles the following dependencies under the Apache Software Licens

- com.google.guava:guava:32.1.3-jre
- com.google.guava:failureaccess:1.0.1
- org.apache.calcite:calcite-core:1.33.0
- org.apache.calcite:calcite-linq4j:1.33.0
- org.apache.calcite:calcite-core:1.34.0
- org.apache.calcite:calcite-linq4j:1.34.0
- org.apache.calcite.avatica:avatica-core:1.23.0
- org.apache.commons:commons-math3:3.6.1
- commons-codec:commons-codec:1.15
Expand Down
4 changes: 2 additions & 2 deletions flink-table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ under the License.
</dependencyManagement>

<properties>
<calcite.version>1.33.0</calcite.version>
<!-- Calcite 1.33.0 depends on 3.1.8,
<calcite.version>1.34.0</calcite.version>
<!-- Calcite 1.34.0 depends on 3.1.8,
at the same time minimum 3.1.x Janino version passing Flink tests without WAs is 3.1.10,
more details are in FLINK-27995 -->
<janino.version>3.1.10</janino.version>
Expand Down

0 comments on commit 6650bc8

Please sign in to comment.