Skip to content

Commit

Permalink
[Improve][Jdbc] Add quote identifier for sql (#6669)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Apr 10, 2024
1 parent 19888e7 commit 849d748
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ private static void addPrimaryKeyColumnsToCondition(
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
sql.append(fieldNamesIt.next()).append(predicate);
sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.debezium.relational.TableId;

public class MySqlUtilsTest {

@Test
public void testSplitScanQuery() {
String splitScanSQL =
MySqlUtils.buildSplitScanQuery(
TableId.parse("db1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
false);
Assertions.assertEquals(
"SELECT * FROM `db1`.`table1` WHERE `id` >= ? AND NOT (`id` = ?) AND `id` <= ?",
splitScanSQL);

splitScanSQL =
MySqlUtils.buildSplitScanQuery(
TableId.parse("db1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
true);
Assertions.assertEquals("SELECT * FROM `db1`.`table1`", splitScanSQL);

splitScanSQL =
MySqlUtils.buildSplitScanQuery(
TableId.parse("db1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
false);
Assertions.assertEquals(
"SELECT * FROM `db1`.`table1` WHERE `id` <= ? AND NOT (`id` = ?)", splitScanSQL);

splitScanSQL =
MySqlUtils.buildSplitScanQuery(
TableId.parse("db1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
true);
Assertions.assertEquals("SELECT * FROM `db1`.`table1` WHERE `id` >= ?", splitScanSQL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private static void addPrimaryKeyColumnsToCondition(
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
sql.append(fieldNamesIt.next()).append(predicate);
sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.debezium.relational.TableId;

public class OracleUtilsTest {
@Test
public void testSplitScanQuery() {
String splitScanSQL =
OracleUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
false);
Assertions.assertEquals(
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?",
splitScanSQL);

splitScanSQL =
OracleUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
true);
Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"", splitScanSQL);

splitScanSQL =
OracleUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
false);
Assertions.assertEquals(
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND NOT (\"id\" = ?)",
splitScanSQL);

splitScanSQL =
OracleUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
true);
Assertions.assertEquals(
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", splitScanSQL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ private static void addPrimaryKeyColumnsToCondition(
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
sql.append(fieldNamesIt.next()).append(predicate);
sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.debezium.relational.TableId;

public class PostgresUtilsTest {
@Test
public void testSplitScanQuery() {
String splitScanSQL =
PostgresUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
false);
Assertions.assertEquals(
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?",
splitScanSQL);

splitScanSQL =
PostgresUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
true);
Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"", splitScanSQL);

splitScanSQL =
PostgresUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
false);
Assertions.assertEquals(
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND NOT (\"id\" = ?)",
splitScanSQL);

splitScanSQL =
PostgresUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
true);
Assertions.assertEquals(
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", splitScanSQL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ private static void addPrimaryKeyColumnsToCondition(
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
sql.append(fieldNamesIt.next()).append(predicate);
sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.debezium.relational.TableId;

public class SqlServerUtilsTest {
@Test
public void testSplitScanQuery() {
String splitScanSQL =
SqlServerUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
false);
Assertions.assertEquals(
"SELECT * FROM [schema1].[table1] WHERE [id] >= ? AND NOT ([id] = ?) AND [id] <= ?",
splitScanSQL);

splitScanSQL =
SqlServerUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
true);
Assertions.assertEquals("SELECT * FROM [schema1].[table1]", splitScanSQL);

splitScanSQL =
SqlServerUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
true,
false);
Assertions.assertEquals(
"SELECT * FROM [schema1].[table1] WHERE [id] <= ? AND NOT ([id] = ?)",
splitScanSQL);

splitScanSQL =
SqlServerUtils.buildSplitScanQuery(
TableId.parse("db1.schema1.table1"),
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
false,
true);
Assertions.assertEquals("SELECT * FROM [schema1].[table1] WHERE [id] >= ?", splitScanSQL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public String quoteIdentifier(String identifier) {
return "\"" + getFieldIde(identifier, fieldIde) + "\"";
}

@Override
public String tableIdentifier(TablePath tablePath) {
return tablePath.getFullNameWithQuoted("\"");
}

@Override
public String quoteDatabaseIdentifier(String identifier) {
return "\"" + identifier + "\"";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand Down Expand Up @@ -449,7 +451,8 @@ private int objectCompare(Object obj1, Object obj2) {
return ObjectUtils.compare(obj1, obj2);
}

private String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
@VisibleForTesting
String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {split.getSplitKeyName()},
Expand Down Expand Up @@ -499,11 +502,11 @@ private String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
return sql.toString();
}

private static void addKeyColumnsToCondition(
private void addKeyColumnsToCondition(
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
sql.append(fieldNamesIt.next()).append(predicate);
sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
Expand Down
Loading

0 comments on commit 849d748

Please sign in to comment.