Skip to content

Commit

Permalink
[FLINK-36415][table] ProjectWatermarkAssignerTransposeRule can genera…
Browse files Browse the repository at this point in the history
…te extra casts which do not make sense

Co-authored-by: Dawid Wysakowicz <dwysakowicz@apache.org>
  • Loading branch information
snuyanzin and dawidwys committed Oct 1, 2024
1 parent 6633719 commit 055c9a1
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.runtime.FlatLists;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.tools.RelBuilder;
Expand Down Expand Up @@ -56,8 +58,15 @@ public FlinkRexBuilder(RelDataTypeFactory typeFactory) {
public RexNode makeFieldAccess(RexNode expr, String fieldName, boolean caseSensitive) {
RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive);
if (expr.getType().isNullable() && !field.getType().isNullable()) {
return makeCast(
typeFactory.createTypeWithNullability(field.getType(), true), field, true);
SqlOperator sqlOperator = ((RexCall) expr).getOperator();
if (sqlOperator.getKind() == SqlKind.CAST) {
// return field;
}
field =
makeCast(
typeFactory.createTypeWithNullability(field.getType(), true),
field,
true);
}

return field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushD

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind

import java.util.{LinkedHashMap => JLinkedHashMap, LinkedList => JLinkedList, List => JList}

Expand Down Expand Up @@ -222,10 +223,10 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex
if (parent.isLeaf) {
(
Some(
builder.makeFieldAccess(
copyFieldAccess(
new RexInputRef(parent.indexOfLeafInNewSchema, parent.originFieldType),
fieldAccess.getField.getName,
true)),
builder)),
parent)
} else {
val child = parent.children.get(fieldAccess.getField.getName)
Expand All @@ -238,7 +239,7 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex
case acc: RexFieldAccess =>
val (field, parent) = traverse(acc)
if (field.isDefined) {
(Some(builder.makeFieldAccess(field.get, fieldAccess.getField.getName, true)), parent)
(Some(copyFieldAccess(field.get, fieldAccess.getField.getName, builder)), parent)
} else {
val child = parent.children.get(fieldAccess.getField.getName)
if (child.isLeaf) {
Expand All @@ -251,7 +252,21 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex
// rewrite operands of the expression
val newExpr = expr.accept(this)
// rebuild FieldAccess
(Some(builder.makeFieldAccess(newExpr, fieldAccess.getField.getName, true)), null)
(Some(copyFieldAccess(newExpr, fieldAccess.getField.getName, builder)), null)
}
}

private def copyFieldAccess(
newExpr: RexNode,
fieldName: String,
rexBuilder: RexBuilder): RexNode = {
// rebuild fieldAccess
val fieldAccess = rexBuilder.makeFieldAccess(newExpr, fieldName, true)
fieldAccess match {
case call: RexCall =>
call.getOperands.get(0)
case _ =>
fieldAccess
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public void setup() {
+ " `data_map` MAP<STRING, ROW<`value` BIGINT>>>,\n"
+ " `outer_array` ARRAY<INT>,\n"
+ " `outer_map` MAP<STRING, STRING>,\n"
+ " `chart` ROW<"
+ " `result` ARRAY<ROW<`meta` ROW<"
+ " `symbol` STRING NOT NULL> NOT NULL> NOT NULL>"
+ " NOT NULL>,\n"
+ " WATERMARK FOR `Timestamp` AS `Timestamp`\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
Expand Down Expand Up @@ -297,6 +301,11 @@ void testNestedProjectFieldAccessWithITEMWithConstantIndex() {
+ "FROM NestedItemTable");
}

@Test
void testNestedProjectFieldAccessWithNestedArrayAndRows() {
util.verifyRelPlan("SELECT `chart`.`result`[1].`meta`.`symbol` FROM ItemTable");
}

@Test
void testNestedProjectFieldAccessWithITEMContainsTopLevelAccess() {
util.verifyRelPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ LogicalProject(EXPR$0=[ITEM($0, $2).value], EXPR$1=[ITEM($1, _UTF-16LE'item').va
]]>
</Resource>
</TestCase>
<TestCase name="testNestedProjectFieldAccessWithNestedArrayAndRows">
<Resource name="sql">
<![CDATA[SELECT `chart`.`result`[1].`meta`.`symbol` FROM ItemTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(EXPR$0=[CAST(CAST(ITEM($5.result, 1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(EXPR$0=[CAST(CAST(ITEM($0.result, 1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart], metadata=[]]])
]]>
</Resource>
</TestCase>
<TestCase name="testNestedProjectFieldAccessWithITEMContainsTopLevelAccess">
<Resource name="sql">
<![CDATA[SELECT `Result`.`Mid`.data_arr[2].`value`, `Result`.`Mid`.data_arr[ID].`value`, `Result`.`Mid`.data_map['item'].`value`, `Result`.`Mid` FROM NestedItemTable]]>
Expand Down

0 comments on commit 055c9a1

Please sign in to comment.