Skip to content

Commit

Permalink
[Improve][[Jdbc]sink sql support custom field.(#6515) (#6525)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyuy authored Jun 15, 2024
1 parent d266f4d commit ef3e61d
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -636,13 +637,29 @@ public static FieldNamedPreparedStatement prepareStatement(
HashMap<String, List<Integer>> parameterMap = new HashMap<>();
parsedSQL = parseNamedStatement(sql, parameterMap);
// currently, the statements must contain all the field parameters
checkArgument(parameterMap.size() == fieldNames.length);
parameterMap
.keySet()
.forEach(
namedParameter -> {
boolean namedParameterExist =
Arrays.asList(fieldNames).stream()
.anyMatch(field -> field.equals(namedParameter));
checkArgument(
namedParameterExist,
String.format(
"Named parameters [%s] not in source columns, check SQL: %s",
namedParameter, sql));
});

for (int i = 0; i < fieldNames.length; i++) {
String fieldName = fieldNames[i];
checkArgument(
parameterMap.containsKey(fieldName),
fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
boolean parameterExist =
parameterMap.keySet().stream()
.anyMatch(parameter -> parameter.equals(fieldName));
indexMapping[i] =
parameterExist
? parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray()
: new int[0];
}
}
log.info("PrepareStatement sql is:\n{}\n", parsedSQL);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.given;

@Slf4j
public class JdbcSinkNameParameterSQLIT extends TestSuiteBase implements TestResource {
private static final String PG_IMAGE = "postgres:14-alpine";
private static final String PG_DRIVER_JAR =
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
private PostgreSQLContainer<?> postgreSQLContainer;

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ PG_DRIVER_JAR);
Assertions.assertEquals(0, extraCommands.getExitCode());
};

@BeforeAll
@Override
public void startUp() throws Exception {
postgreSQLContainer =
new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
.withNetwork(TestSuiteBase.NETWORK)
.withNetworkAliases("postgresql")
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
Startables.deepStart(Stream.of(postgreSQLContainer)).join();
log.info("PostgreSQL container started");
Class.forName(postgreSQLContainer.getDriverClassName());
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(this::initializeJdbcTable);
}

@TestTemplate
public void testSinkNamedParameterSQL(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/jdbc_sink_name_parameter_sql.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

private void initializeJdbcTable() {
try (Connection connection =
DriverManager.getConnection(
postgreSQLContainer.getJdbcUrl(),
postgreSQLContainer.getUsername(),
postgreSQLContainer.getPassword())) {
Statement statement = connection.createStatement();
String sink =
"create table sink(\n"
+ "user_id BIGINT NOT NULL PRIMARY KEY,\n"
+ "name varchar(255),\n"
+ "age INT\n"
+ ")";
statement.execute(sink);
} catch (SQLException e) {
throw new RuntimeException("Initializing PostgreSql table failed!", e);
}
}

@AfterAll
@Override
public void tearDown() {
if (postgreSQLContainer != null) {
postgreSQLContainer.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 100
schema = {
fields {
user_id = bigint
name = string
age = int
}
}
result_table_name = "fake"
}
}

sink {
Assert {
source_table_name = "fake"
rules = {
row_rules = [
{
rule_type = MAX_ROW
rule_value = 100
},
{
rule_type = MIN_ROW
rule_value = 100
}
]
}
}
Jdbc {
source_table_name = "fake"
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
query = "insert into public.sink (user_id, name) values(:user_id, :name)"

}
}

0 comments on commit ef3e61d

Please sign in to comment.