diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java index cff4fdc78fe59..8ffa0d2b8cb27 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java @@ -28,6 +28,8 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.runtime.FlatLists; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.tools.RelBuilder; @@ -56,8 +58,15 @@ public FlinkRexBuilder(RelDataTypeFactory typeFactory) { public RexNode makeFieldAccess(RexNode expr, String fieldName, boolean caseSensitive) { RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive); if (expr.getType().isNullable() && !field.getType().isNullable()) { - return makeCast( - typeFactory.createTypeWithNullability(field.getType(), true), field, true); + SqlOperator sqlOperator = ((RexCall) expr).getOperator(); + if (sqlOperator.getKind() == SqlKind.CAST) { + // return field; + } + field = + makeCast( + typeFactory.createTypeWithNullability(field.getType(), true), + field, + true); } return field; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala index d5123779f45a8..4b067e5358fb9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala @@ -22,6 +22,7 @@ import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushD import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ +import org.apache.calcite.sql.SqlKind import java.util.{LinkedHashMap => JLinkedHashMap, LinkedList => JLinkedList, List => JList} @@ -222,10 +223,10 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex if (parent.isLeaf) { ( Some( - builder.makeFieldAccess( + copyFieldAccess( new RexInputRef(parent.indexOfLeafInNewSchema, parent.originFieldType), fieldAccess.getField.getName, - true)), + builder)), parent) } else { val child = parent.children.get(fieldAccess.getField.getName) @@ -238,7 +239,7 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex case acc: RexFieldAccess => val (field, parent) = traverse(acc) if (field.isDefined) { - (Some(builder.makeFieldAccess(field.get, fieldAccess.getField.getName, true)), parent) + (Some(copyFieldAccess(field.get, fieldAccess.getField.getName, builder)), parent) } else { val child = parent.children.get(fieldAccess.getField.getName) if (child.isLeaf) { @@ -251,7 +252,21 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex // rewrite operands of the expression val newExpr = expr.accept(this) // rebuild FieldAccess - (Some(builder.makeFieldAccess(newExpr, fieldAccess.getField.getName, true)), null) + (Some(copyFieldAccess(newExpr, fieldAccess.getField.getName, builder)), null) + } + } + + private def copyFieldAccess( + newExpr: RexNode, + fieldName: String, + rexBuilder: RexBuilder): RexNode = { + // rebuild fieldAccess + val fieldAccess = rexBuilder.makeFieldAccess(newExpr, fieldName, true) + fieldAccess match { + case call: RexCall => + call.getOperands.get(0) + case _ => + fieldAccess } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java index 2a6c770cd6662..55cbdddcb354a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java @@ -175,6 +175,10 @@ public void setup() { + " `data_map` MAP>>,\n" + " `outer_array` ARRAY,\n" + " `outer_map` MAP,\n" + + " `chart` ROW<" + + " `result` ARRAY NOT NULL> NOT NULL>" + + " NOT NULL>,\n" + " WATERMARK FOR `Timestamp` AS `Timestamp`\n" + ") WITH (\n" + " 'connector' = 'values',\n" @@ -297,6 +301,11 @@ void testNestedProjectFieldAccessWithITEMWithConstantIndex() { + "FROM NestedItemTable"); } + @Test + void testNestedProjectFieldAccessWithNestedArrayAndRows() { + util.verifyRelPlan("SELECT `chart`.`result`[1].`meta`.`symbol` FROM ItemTable"); + } + @Test void testNestedProjectFieldAccessWithITEMContainsTopLevelAccess() { util.verifyRelPlan( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml index 3c90cf08bcfe3..2550377ba6954 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml @@ -186,6 +186,23 @@ LogicalProject(EXPR$0=[ITEM($0, $2).value], EXPR$1=[ITEM($1, _UTF-16LE'item').va ]]> + + + + + + + + + + +