Skip to content

Commit

Permalink
[FLINK-36266][table] Insert into as select * behaves incorrect
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Sep 11, 2024
1 parent ab230ad commit bd1b0ce
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.table.planner.calcite.PreValidateReWriter.newValidationError
import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlSelect, rewriteSqlValues}

import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.runtime.CalciteContextException
import org.apache.calcite.sql.`type`.SqlTypeUtil
import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlKind, SqlNode, SqlNodeList, SqlSelect}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.util.Static.RESOURCE

import java.util
import java.util.Collections
Expand All @@ -36,8 +39,15 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
select: SqlSelect,
targetRowType: RelDataType,
assignedFields: util.LinkedHashMap[Integer, SqlNode],
targetPosition: util.List[Int]): SqlCall = {
rewriteSqlSelect(validator, select, targetRowType, assignedFields, targetPosition)
targetPosition: util.List[Int],
exceptionSupplier: () => CalciteContextException): SqlCall = {
rewriteSqlSelect(
validator,
select,
targetRowType,
assignedFields,
targetPosition,
exceptionSupplier)
}

def rewriteValues(
Expand Down Expand Up @@ -87,12 +97,16 @@ object SqlRewriterUtils {
select: SqlSelect,
targetRowType: RelDataType,
assignedFields: util.LinkedHashMap[Integer, SqlNode],
targetPosition: util.List[Int]): SqlCall = {
targetPosition: util.List[Int],
exceptionSupplier: () => CalciteContextException): SqlCall = {
// Expands the select list first in case there is a star(*).
// Validates the select first to register the where scope.
validator.validate(select)
val sourceList = validator.expandStar(select.getSelectList, select, false).getList

if (sourceList.size() != targetPosition.size()) {
throw exceptionSupplier.apply()
}
val fixedNodes = new util.ArrayList[SqlNode]
val currentNodes =
if (targetPosition.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.table.planner.utils.PlannerMocks;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand All @@ -37,6 +39,12 @@ class FlinkCalciteSqlValidatorTest {
"t1", Schema.newBuilder().column("a", DataTypes.INT()).build())
.registerTemporaryTable(
"t2",
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.build())
.registerTemporaryTable(
"t2_copy",
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
Expand All @@ -51,81 +59,45 @@ void testUpsertInto() {
}

@Test
void testInsertIntoShouldColumnMismatchWithValues() {
assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(" Number of columns must match number of query columns");
}

@Test
void testInsertIntoShouldColumnMismatchWithSelect() {
assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(" Number of columns must match number of query columns");
}

@Test
void testInsertIntoShouldColumnMismatchWithLastValue() {
assertThatThrownBy(
() ->
plannerMocks
.getParser()
.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(" Number of columns must match number of query columns");
}

@Test
void testInsertIntoShouldColumnMismatchWithFirstValue() {
assertThatThrownBy(
() ->
plannerMocks
.getParser()
.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)"))
void testExplainUpsertInto() {
assertThatThrownBy(() -> plannerMocks.getParser().parse("EXPLAIN UPSERT INTO t1 VALUES(1)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(" Number of columns must match number of query columns");
.hasMessageContaining(
"UPSERT INTO statement is not supported. Please use INSERT INTO instead.");
}

@Test
void testInsertIntoShouldColumnMismatchWithMultiFieldValues() {
assertThatThrownBy(
() ->
plannerMocks
.getParser()
.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)"))
@ParameterizedTest
@ValueSource(
strings = {
"INSERT INTO t2 (a, b) VALUES(1)",
"INSERT INTO t2 (a, b) VALUES (1, 2), (3)",
"INSERT INTO t2 (a, b) VALUES (1), (2, 3)",
"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)",
"INSERT INTO t2 (a, b) SELECT 1",
"INSERT INTO t2 (a, b) SELECT * FROM t1",
"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy",
"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy"
})
void testInvalidNumberOfColumnsWhileInsertInto(String sql) {
assertThatThrownBy(() -> plannerMocks.getParser().parse(sql))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(" Number of columns must match number of query columns");
}

@Test
void testInsertIntoShouldNotColumnMismatchWithValues() {
assertDoesNotThrow(
() -> {
plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)");
});
}

@Test
void testInsertIntoShouldNotColumnMismatchWithSelect() {
@ParameterizedTest
@ValueSource(
strings = {
"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)",
"INSERT INTO t2 (a) VALUES (1), (3)",
"INSERT INTO t2 (a, b) SELECT 1, 2",
"INSERT INTO t2 (a, b) SELECT * from t2_copy",
"INSERT INTO t2 (a, b) SELECT *, 42 from t1",
"INSERT INTO t2 (a, b) SELECT 42, * from t1"
})
void validInsertIntoTest(final String sql) {
assertDoesNotThrow(
() -> {
plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2");
plannerMocks.getParser().parse(sql);
});
}

@Test
void testInsertIntoShouldNotColumnMismatchWithSingleColValues() {
assertDoesNotThrow(
() -> {
plannerMocks.getParser().parse("INSERT INTO t2 (a) VALUES (1), (3)");
});
}

@Test
void testExplainUpsertInto() {
assertThatThrownBy(() -> plannerMocks.getParser().parse("EXPLAIN UPSERT INTO t1 VALUES(1)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"UPSERT INTO statement is not supported. Please use INSERT INTO instead.");
}
}

0 comments on commit bd1b0ce

Please sign in to comment.