diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala index 64d426971d880f..2af4ed6596602a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala @@ -18,13 +18,16 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec +import org.apache.flink.table.planner.calcite.PreValidateReWriter.newValidationError import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlSelect, rewriteSqlValues} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.runtime.CalciteContextException import org.apache.calcite.sql.`type`.SqlTypeUtil import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, SqlNodeList, SqlSelect} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.util.Static.RESOURCE import java.util import java.util.Collections @@ -36,8 +39,15 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) { select: SqlSelect, targetRowType: RelDataType, assignedFields: util.LinkedHashMap[Integer, SqlNode], - targetPosition: util.List[Int]): SqlCall = { - rewriteSqlSelect(validator, select, targetRowType, assignedFields, targetPosition) + targetPosition: util.List[Int], + exceptionSupplier: () => CalciteContextException): SqlCall = { + rewriteSqlSelect( + validator, + select, + targetRowType, + assignedFields, + targetPosition, + exceptionSupplier) } def rewriteValues( @@ -87,12 +97,16 @@ object SqlRewriterUtils { select: SqlSelect, targetRowType: RelDataType, assignedFields: util.LinkedHashMap[Integer, SqlNode], - targetPosition: util.List[Int]): SqlCall = { + targetPosition: util.List[Int], + exceptionSupplier: () => CalciteContextException): SqlCall = { // Expands the select list first in case there is a star(*). // Validates the select first to register the where scope. validator.validate(select) val sourceList = validator.expandStar(select.getSelectList, select, false).getList + if (sourceList.size() != targetPosition.size()) { + throw exceptionSupplier.apply() + } val fixedNodes = new util.ArrayList[SqlNode] val currentNodes = if (targetPosition.isEmpty) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java index bca80828e5bd10..bfe95aa2c72887 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java @@ -24,6 +24,8 @@ import org.apache.flink.table.planner.utils.PlannerMocks; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -37,6 +39,12 @@ class FlinkCalciteSqlValidatorTest { "t1", Schema.newBuilder().column("a", DataTypes.INT()).build()) .registerTemporaryTable( "t2", + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .build()) + .registerTemporaryTable( + "t2_copy", Schema.newBuilder() .column("a", DataTypes.INT()) .column("b", DataTypes.INT()) @@ -51,81 +59,45 @@ void testUpsertInto() { } @Test - void testInsertIntoShouldColumnMismatchWithValues() { - assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) - .isInstanceOf(ValidationException.class) - .hasMessageContaining(" Number of columns must match number of query columns"); - } - - @Test - void testInsertIntoShouldColumnMismatchWithSelect() { - assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) - .isInstanceOf(ValidationException.class) - .hasMessageContaining(" Number of columns must match number of query columns"); - } - - @Test - void testInsertIntoShouldColumnMismatchWithLastValue() { - assertThatThrownBy( - () -> - plannerMocks - .getParser() - .parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) - .isInstanceOf(ValidationException.class) - .hasMessageContaining(" Number of columns must match number of query columns"); - } - - @Test - void testInsertIntoShouldColumnMismatchWithFirstValue() { - assertThatThrownBy( - () -> - plannerMocks - .getParser() - .parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) + void testExplainUpsertInto() { + assertThatThrownBy(() -> plannerMocks.getParser().parse("EXPLAIN UPSERT INTO t1 VALUES(1)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining(" Number of columns must match number of query columns"); + .hasMessageContaining( + "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } - @Test - void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { - assertThatThrownBy( - () -> - plannerMocks - .getParser() - .parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) + @ParameterizedTest + @ValueSource( + strings = { + "INSERT INTO t2 (a, b) VALUES(1)", + "INSERT INTO t2 (a, b) VALUES (1, 2), (3)", + "INSERT INTO t2 (a, b) VALUES (1), (2, 3)", + "INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", + "INSERT INTO t2 (a, b) SELECT 1", + "INSERT INTO t2 (a, b) SELECT * FROM t1", + "INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", + "INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy" + }) + void testInvalidNumberOfColumnsWhileInsertInto(String sql) { + assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } - @Test - void testInsertIntoShouldNotColumnMismatchWithValues() { - assertDoesNotThrow( - () -> { - plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); - }); - } - - @Test - void testInsertIntoShouldNotColumnMismatchWithSelect() { + @ParameterizedTest + @ValueSource( + strings = { + "INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", + "INSERT INTO t2 (a) VALUES (1), (3)", + "INSERT INTO t2 (a, b) SELECT 1, 2", + "INSERT INTO t2 (a, b) SELECT * from t2_copy", + "INSERT INTO t2 (a, b) SELECT *, 42 from t1", + "INSERT INTO t2 (a, b) SELECT 42, * from t1" + }) + void validInsertIntoTest(final String sql) { assertDoesNotThrow( () -> { - plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); + plannerMocks.getParser().parse(sql); }); } - - @Test - void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { - assertDoesNotThrow( - () -> { - plannerMocks.getParser().parse("INSERT INTO t2 (a) VALUES (1), (3)"); - }); - } - - @Test - void testExplainUpsertInto() { - assertThatThrownBy(() -> plannerMocks.getParser().parse("EXPLAIN UPSERT INTO t1 VALUES(1)")) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); - } }