diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index 706d7a88624..9b06ddda967 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -434,7 +434,7 @@ private static void addPrimaryKeyColumnsToCondition( SeaTunnelRowType rowType, StringBuilder sql, String predicate) { for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + sql.append(quote(fieldNamesIt.next())).append(predicate); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java new file mode 100644 index 00000000000..ac039659002 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.debezium.relational.TableId; + +public class MySqlUtilsTest { + + @Test + public void testSplitScanQuery() { + String splitScanSQL = + MySqlUtils.buildSplitScanQuery( + TableId.parse("db1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + false); + Assertions.assertEquals( + "SELECT * FROM `db1`.`table1` WHERE `id` >= ? AND NOT (`id` = ?) AND `id` <= ?", + splitScanSQL); + + splitScanSQL = + MySqlUtils.buildSplitScanQuery( + TableId.parse("db1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + true); + Assertions.assertEquals("SELECT * FROM `db1`.`table1`", splitScanSQL); + + splitScanSQL = + MySqlUtils.buildSplitScanQuery( + TableId.parse("db1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + false); + Assertions.assertEquals( + "SELECT * FROM `db1`.`table1` WHERE `id` <= ? AND NOT (`id` = ?)", splitScanSQL); + + splitScanSQL = + MySqlUtils.buildSplitScanQuery( + TableId.parse("db1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + true); + Assertions.assertEquals("SELECT * FROM `db1`.`table1` WHERE `id` >= ?", splitScanSQL); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java index 89e843c393b..8d67c0f1412 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java @@ -439,7 +439,7 @@ private static void addPrimaryKeyColumnsToCondition( SeaTunnelRowType rowType, StringBuilder sql, String predicate) { for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + sql.append(quote(fieldNamesIt.next())).append(predicate); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java new file mode 100644 index 00000000000..10c253da83e --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.debezium.relational.TableId; + +public class OracleUtilsTest { + @Test + public void testSplitScanQuery() { + String splitScanSQL = + OracleUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + false); + Assertions.assertEquals( + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?", + splitScanSQL); + + splitScanSQL = + OracleUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + true); + Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"", splitScanSQL); + + splitScanSQL = + OracleUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + false); + Assertions.assertEquals( + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND NOT (\"id\" = ?)", + splitScanSQL); + + splitScanSQL = + OracleUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + true); + Assertions.assertEquals( + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", splitScanSQL); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index b83e2875406..576c7fb5363 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -444,7 +444,7 @@ private static void addPrimaryKeyColumnsToCondition( SeaTunnelRowType rowType, StringBuilder sql, String predicate) { for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + sql.append(quote(fieldNamesIt.next())).append(predicate); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java new file mode 100644 index 00000000000..e8e5bb22d2c --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.debezium.relational.TableId; + +public class PostgresUtilsTest { + @Test + public void testSplitScanQuery() { + String splitScanSQL = + PostgresUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + false); + Assertions.assertEquals( + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?", + splitScanSQL); + + splitScanSQL = + PostgresUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + true); + Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"", splitScanSQL); + + splitScanSQL = + PostgresUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + false); + Assertions.assertEquals( + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND NOT (\"id\" = ?)", + splitScanSQL); + + splitScanSQL = + PostgresUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + true); + Assertions.assertEquals( + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", splitScanSQL); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java index 2c83e02e0e6..db1872fa648 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java @@ -467,7 +467,7 @@ private static void addPrimaryKeyColumnsToCondition( SeaTunnelRowType rowType, StringBuilder sql, String predicate) { for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + sql.append(quote(fieldNamesIt.next())).append(predicate); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java new file mode 100644 index 00000000000..470a05e54ab --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.debezium.relational.TableId; + +public class SqlServerUtilsTest { + @Test + public void testSplitScanQuery() { + String splitScanSQL = + SqlServerUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + false); + Assertions.assertEquals( + "SELECT * FROM [schema1].[table1] WHERE [id] >= ? AND NOT ([id] = ?) AND [id] <= ?", + splitScanSQL); + + splitScanSQL = + SqlServerUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + true); + Assertions.assertEquals("SELECT * FROM [schema1].[table1]", splitScanSQL); + + splitScanSQL = + SqlServerUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + true, + false); + Assertions.assertEquals( + "SELECT * FROM [schema1].[table1] WHERE [id] <= ? AND NOT ([id] = ?)", + splitScanSQL); + + splitScanSQL = + SqlServerUtils.buildSplitScanQuery( + TableId.parse("db1.schema1.table1"), + new SeaTunnelRowType( + new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + false, + true); + Assertions.assertEquals("SELECT * FROM [schema1].[table1] WHERE [id] >= ?", splitScanSQL); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index 51c5eb67d21..82d02119d1a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -137,6 +137,11 @@ public String quoteIdentifier(String identifier) { return "\"" + getFieldIde(identifier, fieldIde) + "\""; } + @Override + public String tableIdentifier(TablePath tablePath) { + return tablePath.getFullNameWithQuoted("\""); + } + @Override public String quoteDatabaseIdentifier(String identifier) { return "\"" + identifier + "\""; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java index ff75f5b53c7..2993f749c66 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; + import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -449,7 +451,8 @@ private int objectCompare(Object obj1, Object obj2) { return ObjectUtils.compare(obj1, obj2); } - private String createDynamicSplitQuerySQL(JdbcSourceSplit split) { + @VisibleForTesting + String createDynamicSplitQuerySQL(JdbcSourceSplit split) { SeaTunnelRowType rowType = new SeaTunnelRowType( new String[] {split.getSplitKeyName()}, @@ -499,11 +502,11 @@ private String createDynamicSplitQuerySQL(JdbcSourceSplit split) { return sql.toString(); } - private static void addKeyColumnsToCondition( + private void addKeyColumnsToCondition( SeaTunnelRowType rowType, StringBuilder sql, String predicate) { for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java index 8896e5f9604..c71ae7b43db 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java @@ -18,7 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -30,6 +34,47 @@ public class DynamicChunkSplitterTest { + @Test + public void testGenerateSplitQuerySQL() { + JdbcSourceConfig config = + JdbcSourceConfig.builder() + .jdbcConnectionConfig( + JdbcConnectionConfig.builder() + .url("jdbc:postgresql://localhost:5432/test") + .driverName("org.postgresql.Driver") + .build()) + .build(); + DynamicChunkSplitter splitter = new DynamicChunkSplitter(config); + + JdbcSourceSplit split = + new JdbcSourceSplit( + TablePath.of("db1", "schema1", "table1"), + "split1", + null, + "id", + BasicType.INT_TYPE, + 1, + 10); + String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split); + Assertions.assertEquals( + "SELECT * FROM \"db1\".\"schema1\".\"table1\" WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?", + splitQuerySQL); + + split = + new JdbcSourceSplit( + TablePath.of("db1", "schema1", "table1"), + "split1", + "select * from table1", + "id", + BasicType.INT_TYPE, + 1, + 10); + splitQuerySQL = splitter.createDynamicSplitQuerySQL(split); + Assertions.assertEquals( + "SELECT * FROM (select * from table1) tmp WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?", + splitQuerySQL); + } + @Test public void testEfficientShardingThroughSampling() throws NoSuchMethodException { TablePath tablePath = new TablePath("db", "xe", "table");