From 11a7dc3446347eba16df56e3361fbd2de77f4b84 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 6 Sep 2023 11:13:10 +0800 Subject: [PATCH] [Improve][Connector-v2][Jdbc] Refactor AbstractJdbcCatalog (#5096) * refactor jdbc connector AbstractJdbcCatalog * add testCatalog for pg --- .../jdbc/catalog/AbstractJdbcCatalog.java | 342 +++++++++++++++--- .../jdbc/catalog/mysql/MySqlCatalog.java | 309 ++++------------ .../mysql/MysqlCreateTableSqlBuilder.java | 2 +- .../jdbc/catalog/oracle/OracleCatalog.java | 224 +++--------- .../jdbc/catalog/psql/PostgresCatalog.java | 311 +++------------- .../psql/PostgresCreateTableSqlBuilder.java | 2 +- .../catalog/sqlserver/SqlServerCatalog.java | 295 +++------------ .../catalog/oracle/OracleCatalogTest.java | 19 +- .../catalog/psql/PostgresCatalogTest.java | 4 +- .../sql/MysqlCreateTableSqlBuilderTest.java | 2 +- .../seatunnel/jdbc/AbstractJdbcIT.java | 65 +++- .../connectors/seatunnel/jdbc/JdbcCase.java | 5 + .../seatunnel/jdbc/JdbcMysqlIT.java | 17 + .../seatunnel/jdbc/JdbcOracleIT.java | 47 ++- .../src/test/resources/sql/oracle_init.sql | 22 ++ .../seatunnel/jdbc/JdbcPostgresIT.java | 43 +++ .../seatunnel/jdbc/JdbcSqlServerIT.java | 34 +- 17 files changed, 732 insertions(+), 1011 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index 22752ba5011a..ddc327fbc300 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -20,9 +20,12 @@ import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; @@ -40,14 +43,19 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; @@ -56,6 +64,8 @@ public abstract class AbstractJdbcCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); + protected static final Set SYS_DATABASES = new HashSet<>(); + protected final String catalogName; protected final String defaultDatabase; protected final String username; @@ -66,7 +76,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { protected final Optional defaultSchema; - protected Connection defaultConnection; + protected final Map connectionMap; public AbstractJdbcCatalog( String catalogName, @@ -88,6 +98,7 @@ public AbstractJdbcCatalog( this.defaultUrl = urlInfo.getOrigin(); this.suffix = urlInfo.getSuffix(); this.defaultSchema = Optional.ofNullable(defaultSchema); + this.connectionMap = new ConcurrentHashMap<>(); } @Override @@ -95,51 +106,101 @@ public String getDefaultDatabase() { return defaultDatabase; } - public String getCatalogName() { - return catalogName; + protected Connection getConnection(String url) { + if (connectionMap.containsKey(url)) { + return connectionMap.get(url); + } + try { + Connection connection = DriverManager.getConnection(url, username, pwd); + connectionMap.put(url, connection); + return connection; + } catch (SQLException e) { + throw new CatalogException(String.format("Failed connecting to %s via JDBC.", url), e); + } } - public String getUsername() { - return username; + @Override + public void open() throws CatalogException { + getConnection(defaultUrl); + LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl); } - public String getPassword() { - return pwd; + @Override + public void close() throws CatalogException { + for (Map.Entry entry : connectionMap.entrySet()) { + try { + entry.getValue().close(); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed to close %s via JDBC.", entry.getKey()), e); + } + } + connectionMap.clear(); + LOG.info("Catalog {} closing", catalogName); } - public String getBaseUrl() { - return baseUrl; + protected String getSelectColumnsSql(TablePath tablePath) { + throw new UnsupportedOperationException(); } - @Override - public void open() throws CatalogException { - try { - defaultConnection = DriverManager.getConnection(defaultUrl, username, pwd); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed connecting to %s via JDBC.", defaultUrl), e); - } + protected Column buildColumn(ResultSet resultSet) throws SQLException { + throw new UnsupportedOperationException(); + } - LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl); + protected TableIdentifier getTableIdentifier(TablePath tablePath) { + return TableIdentifier.of( + catalogName, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); } - @Override - public void close() throws CatalogException { - if (defaultConnection == null) { - return; + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); } + + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + Connection conn = getConnection(dbUrl); try { - defaultConnection.close(); - } catch (SQLException e) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = getPrimaryKey(metaData, tablePath); + List constraintKeys = getConstraintKeys(metaData, tablePath); + try (PreparedStatement ps = conn.prepareStatement(getSelectColumnsSql(tablePath)); + ResultSet resultSet = ps.executeQuery()) { + + TableSchema.Builder builder = TableSchema.builder(); + while (resultSet.next()) { + builder.column(buildColumn(resultSet)); + } + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = getTableIdentifier(tablePath); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + "", + catalogName); + } + + } catch (Exception e) { throw new CatalogException( - String.format("Failed to close %s via JDBC.", defaultUrl), e); + String.format("Failed getting table %s", tablePath.getFullName()), e); } - LOG.info("Catalog {} closing", catalogName); } - protected Optional getPrimaryKey( - DatabaseMetaData metaData, String database, String table) throws SQLException { - return getPrimaryKey(metaData, database, table, table); + protected Optional getPrimaryKey(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + return getPrimaryKey( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); } protected Optional getPrimaryKey( @@ -174,9 +235,13 @@ protected Optional getPrimaryKey( return Optional.of(PrimaryKey.of(pkName, pkFields)); } - protected List getConstraintKeys( - DatabaseMetaData metaData, String database, String table) throws SQLException { - return getConstraintKeys(metaData, database, table, table); + protected List getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + return getConstraintKeys( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); } protected List getConstraintKeys( @@ -217,16 +282,24 @@ protected List getConstraintKeys( return new ArrayList<>(constraintKeyMap.values()); } - protected Optional getColumnDefaultValue( - DatabaseMetaData metaData, String database, String schema, String table, String column) - throws SQLException { - try (ResultSet resultSet = metaData.getColumns(database, schema, table, column)) { - while (resultSet.next()) { - String defaultValue = resultSet.getString("COLUMN_DEF"); - return Optional.ofNullable(defaultValue); - } + protected String getListDatabaseSql() { + throw new UnsupportedOperationException(); + } + + @Override + public List listDatabases() throws CatalogException { + try { + return queryString( + defaultUrl, + getListDatabaseSql(), + rs -> { + String s = rs.getString(1); + return SYS_DATABASES.contains(s) ? null : s; + }); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); } - return Optional.empty(); } @Override @@ -236,11 +309,44 @@ public boolean databaseExists(String databaseName) throws CatalogException { return listDatabases().contains(databaseName); } + protected String getListTableSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + protected String getTableName(ResultSet rs) throws SQLException { + String schemaName = rs.getString(1); + String tableName = rs.getString(2); + if (StringUtils.isNotBlank(schemaName) && !SYS_DATABASES.contains(schemaName)) { + return schemaName + "." + tableName; + } + return null; + } + + protected String getTableName(TablePath tablePath) { + return tablePath.getSchemaAndTableName(); + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + String dbUrl = getUrlFromDatabaseName(databaseName); + try { + return queryString(dbUrl, getListTableSql(databaseName), this::getTableName); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + @Override public boolean tableExists(TablePath tablePath) throws CatalogException { try { return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName()); + && listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); } catch (DatabaseNotExistException e) { return false; } @@ -261,24 +367,61 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI defaultSchema.get(), tablePath.getTableName()); } - if (!createTableInternal(tablePath, table) && !ignoreIfExists) { + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } throw new TableAlreadyExistException(catalogName, tablePath); } + + createTableInternal(tablePath, table); + } + + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + throw new UnsupportedOperationException(); } - protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException; + protected void createTableInternal(TablePath tablePath, CatalogTable table) + throws CatalogException { + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try { + executeInternal(dbUrl, getCreateTableSql(tablePath, table)); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed creating table %s", tablePath.getFullName()), e); + } + } @Override public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { checkNotNull(tablePath, "Table path cannot be null"); - if (!dropTableInternal(tablePath) && !ignoreIfNotExists) { + + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + return; + } throw new TableNotExistException(catalogName, tablePath); } + + dropTableInternal(tablePath); } - protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException; + protected String getDropTableSql(TablePath tablePath) { + throw new UnsupportedOperationException(); + } + + protected void dropTableInternal(TablePath tablePath) throws CatalogException { + String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + try { + // Will there exist concurrent drop for one table? + executeInternal(dbUrl, getDropTableSql(tablePath)); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed dropping table %s", tablePath.getFullName()), e); + } + } @Override public void createDatabase(TablePath tablePath, boolean ignoreIfExists) @@ -287,14 +430,42 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists) checkNotNull(tablePath.getDatabaseName(), "Database name cannot be null"); if (databaseExists(tablePath.getDatabaseName())) { + if (ignoreIfExists) { + return; + } throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName()); } - if (!createDatabaseInternal(tablePath.getDatabaseName()) && !ignoreIfExists) { - throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName()); + + createDatabaseInternal(tablePath.getDatabaseName()); + } + + protected String getCreateDatabaseSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + protected void createDatabaseInternal(String databaseName) { + try { + executeInternal(defaultUrl, getCreateDatabaseSql(databaseName)); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed creating database %s in catalog %s", + databaseName, this.catalogName), + e); } } - protected abstract boolean createDatabaseInternal(String databaseName); + protected void closeDatabaseConnection(String databaseName) { + String dbUrl = getUrlFromDatabaseName(databaseName); + try { + Connection connection = connectionMap.remove(dbUrl); + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + throw new CatalogException(String.format("Failed to close %s via JDBC.", dbUrl), e); + } + } @Override public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) @@ -302,10 +473,77 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) checkNotNull(tablePath, "Table path cannot be null"); checkNotNull(tablePath.getDatabaseName(), "Database name cannot be null"); - if (!dropDatabaseInternal(tablePath.getDatabaseName()) && !ignoreIfNotExists) { + if (!databaseExists(tablePath.getDatabaseName())) { + if (ignoreIfNotExists) { + return; + } throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); } + + dropDatabaseInternal(tablePath.getDatabaseName()); + } + + protected String getDropDatabaseSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + protected void dropDatabaseInternal(String databaseName) throws CatalogException { + try { + executeInternal(defaultUrl, getDropDatabaseSql(databaseName)); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed dropping database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + protected String getUrlFromDatabaseName(String databaseName) { + String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; + return url + databaseName + suffix; + } + + protected String getOptionTableName(TablePath tablePath) { + return tablePath.getFullName(); + } + + @SuppressWarnings("MagicNumber") + protected Map buildConnectorOptions(TablePath tablePath) { + Map options = new HashMap<>(8); + options.put("connector", "jdbc"); + options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName())); + options.put("table-name", getOptionTableName(tablePath)); + options.put("username", username); + options.put("password", pwd); + return options; + } + + @FunctionalInterface + public interface ResultSetConsumer { + T apply(ResultSet rs) throws SQLException; + } + + protected List queryString(String url, String sql, ResultSetConsumer consumer) + throws SQLException { + try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) { + List result = new ArrayList<>(); + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String value = consumer.apply(rs); + if (value != null) { + result.add(value); + } + } + return result; + } } - protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException; + // If sql is DDL, the execute() method always returns false, so the return value + // should not be used to determine whether changes were made in database. + protected boolean executeInternal(String url, String sql) throws SQLException { + try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) { + return ps.execute(); + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index 267a68f0eefc..b558926e453a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -19,47 +19,34 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; -import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import com.mysql.cj.MysqlType; -import com.mysql.cj.jdbc.result.ResultSetImpl; -import com.mysql.cj.util.StringUtils; import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; @Slf4j public class MySqlCatalog extends AbstractJdbcCatalog { - protected static final Set SYS_DATABASES = new HashSet<>(4); - private final String SELECT_COLUMNS = + private static final MysqlDataTypeConvertor DATA_TYPE_CONVERTOR = new MysqlDataTypeConvertor(); + + private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s'"; static { @@ -69,137 +56,65 @@ public class MySqlCatalog extends AbstractJdbcCatalog { SYS_DATABASES.add("sys"); } - protected final Map connectionMap; - public MySqlCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { super(catalogName, username, pwd, urlInfo, null); - this.connectionMap = new ConcurrentHashMap<>(); } - public Connection getConnection(String url) { - if (connectionMap.containsKey(url)) { - return connectionMap.get(url); - } - try { - Connection connection = DriverManager.getConnection(url, username, pwd); - connectionMap.put(url, connection); - return connection; - } catch (SQLException e) { - throw new CatalogException(String.format("Failed connecting to %s via JDBC.", url), e); - } + @Override + protected String getListDatabaseSql() { + return "SHOW DATABASES;"; } @Override - public void close() throws CatalogException { - for (Map.Entry entry : connectionMap.entrySet()) { - try { - entry.getValue().close(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed to close %s via JDBC.", entry.getKey()), e); - } - } - super.close(); + protected String getListTableSql(String databaseName) { + return "SHOW TABLES;"; } @Override - public List listDatabases() throws CatalogException { - try (PreparedStatement ps = defaultConnection.prepareStatement("SHOW DATABASES;")) { - - List databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!SYS_DATABASES.contains(databaseName)) { - databases.add(rs.getString(1)); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } + protected String getTableName(ResultSet rs) throws SQLException { + return rs.getString(1); } @Override - public List listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - String dbUrl = getUrlFromDatabaseName(databaseName); - Connection connection = getConnection(dbUrl); - try (PreparedStatement ps = connection.prepareStatement("SHOW TABLES;")) { - - ResultSet rs = ps.executeQuery(); - - List tables = new ArrayList<>(); - - while (rs.next()) { - tables.add(rs.getString(1)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } + protected String getTableName(TablePath tablePath) { + return tablePath.getTableName(); } @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - Connection conn = getConnection(dbUrl); - try { - DatabaseMetaData metaData = conn.getMetaData(); + protected String getSelectColumnsSql(TablePath tablePath) { + return String.format( + SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName()); + } - Optional primaryKey = - getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName()); - List constraintKeys = - getConstraintKeys( - metaData, tablePath.getDatabaseName(), tablePath.getTableName()); - String sql = - String.format( - SELECT_COLUMNS, tablePath.getDatabaseName(), tablePath.getTableName()); - try (PreparedStatement ps = conn.prepareStatement(sql); - ResultSet resultSet = ps.executeQuery(); ) { + @Override + protected TableIdentifier getTableIdentifier(TablePath tablePath) { + return TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + } - TableSchema.Builder builder = TableSchema.builder(); - while (resultSet.next()) { - buildTable(resultSet, builder); - } - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - "", - "mysql"); - } + @Override + protected Optional getPrimaryKey(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + return getPrimaryKey( + metaData, + tablePath.getDatabaseName(), + tablePath.getTableName(), + tablePath.getTableName()); + } - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } + @Override + protected List getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + return getConstraintKeys( + metaData, + tablePath.getDatabaseName(), + tablePath.getTableName(), + tablePath.getTableName()); } - private void buildTable(ResultSet resultSet, TableSchema.Builder builder) throws SQLException { + @Override + protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); String sourceType = resultSet.getString("COLUMN_TYPE"); String typeName = resultSet.getString("DATA_TYPE").toUpperCase(); @@ -243,121 +158,39 @@ private void buildTable(ResultSet resultSet, TableSchema.Builder builder) throws break; } - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - 0, - isNullable, - defaultValue, - comment, - sourceType, - sourceType.contains("unsigned"), - sourceType.contains("zerofill"), - bitLen, - null, - columnLength); - builder.column(physicalColumn); + return PhysicalColumn.of( + columnName, + type, + 0, + isNullable, + defaultValue, + comment, + sourceType, + sourceType.contains("unsigned"), + sourceType.contains("zerofill"), + bitLen, + null, + columnLength); } - public static Map getColumnsDefaultValue(TablePath tablePath, Connection conn) { - StringBuilder queryBuf = new StringBuilder("SHOW FULL COLUMNS FROM "); - queryBuf.append(StringUtils.quoteIdentifier(tablePath.getTableName(), "`", false)); - queryBuf.append(" FROM "); - queryBuf.append(StringUtils.quoteIdentifier(tablePath.getDatabaseName(), "`", false)); - try (PreparedStatement ps2 = conn.prepareStatement(queryBuf.toString())) { - ResultSet rs = ps2.executeQuery(); - Map result = new HashMap<>(); - while (rs.next()) { - String field = rs.getString("Field"); - Object defaultValue = rs.getObject("Default"); - result.put(field, defaultValue); - } - return result; - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed getting table(%s) columns default value", - tablePath.getFullName()), - e); - } - } - - // todo: If the origin source is mysql, we can directly use create table like to create the @Override - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - - String createTableSql = - MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName()); - Connection connection = getConnection(dbUrl); - log.info("create table sql: {}", createTableSql); - try (PreparedStatement ps = connection.prepareStatement(createTableSql)) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed creating table %s", tablePath.getFullName()), e); - } + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + return MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName()); } @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - Connection connection = getConnection(dbUrl); - try (PreparedStatement ps = - connection.prepareStatement( - String.format("DROP TABLE IF EXISTS %s;", tablePath.getFullName()))) { - // Will there exist concurrent drop for one table? - return ps.execute(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed dropping table %s", tablePath.getFullName()), e); - } + protected String getDropTableSql(TablePath tablePath) { + return String.format("DROP TABLE %s;", tablePath.getFullName()); } @Override - protected boolean createDatabaseInternal(String databaseName) throws CatalogException { - try (PreparedStatement ps = - defaultConnection.prepareStatement( - String.format("CREATE DATABASE `%s`;", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed creating database %s in catalog %s", - databaseName, this.catalogName), - e); - } + protected String getCreateDatabaseSql(String databaseName) { + return String.format("CREATE DATABASE `%s`;", databaseName); } @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - try (PreparedStatement ps = - defaultConnection.prepareStatement( - String.format("DROP DATABASE `%s`;", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed dropping database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - /** - * @see com.mysql.cj.MysqlType - * @see ResultSetImpl#getObjectStoredProc(int, int) - */ - @SuppressWarnings("unchecked") - private SeaTunnelDataType fromJdbcType(ResultSetMetaData metadata, int colIndex) - throws SQLException { - MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex)); - Map dataTypeProperties = new HashMap<>(); - dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, metadata.getPrecision(colIndex)); - dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex)); - return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties); + protected String getDropDatabaseSql(String databaseName) { + return String.format("DROP DATABASE `%s`;", databaseName); } private SeaTunnelDataType fromJdbcType(String typeName, int precision, int scale) { @@ -365,22 +198,6 @@ private SeaTunnelDataType fromJdbcType(String typeName, int precision, int sc Map dataTypeProperties = new HashMap<>(); dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION, precision); dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, scale); - return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties); - } - - @SuppressWarnings("MagicNumber") - private Map buildConnectorOptions(TablePath tablePath) { - Map options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put("url", baseUrl + tablePath.getDatabaseName()); - options.put("table-name", tablePath.getFullName()); - options.put("username", username); - options.put("password", pwd); - return options; - } - - private String getUrlFromDatabaseName(String databaseName) { - String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; - return url + databaseName + suffix; + return DATA_TYPE_CONVERTOR.toSeaTunnelType(mysqlType, dataTypeProperties); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index 608062fc9998..490ecd30ff87 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -119,7 +119,7 @@ public String build(String catalogName) { List sqls = new ArrayList<>(); sqls.add( String.format( - "CREATE TABLE IF NOT EXISTS %s (\n%s\n)", + "CREATE TABLE %s (\n%s\n)", tableName, buildColumnsIdentifySql(catalogName))); if (engine != null) { sqls.add("ENGINE = " + engine); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index 261f4f7fb6ff..b90a86a7abb8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -18,33 +18,22 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; -import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import lombok.extern.slf4j.Slf4j; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BFILE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BLOB; @@ -61,8 +50,10 @@ @Slf4j public class OracleCatalog extends AbstractJdbcCatalog { + private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR = new OracleDataTypeConvertor(); + private static final List EXCLUDED_SCHEMAS = Collections.unmodifiableList( Arrays.asList( @@ -87,7 +78,7 @@ public class OracleCatalog extends AbstractJdbcCatalog { "EXFSYS", "SYSMAN")); - private static final String SELECT_COLUMNS_SQL = + private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT\n" + " cols.COLUMN_NAME,\n" + " CASE \n" @@ -127,158 +118,50 @@ public OracleCatalog( } @Override - public List listDatabases() throws CatalogException { - try (PreparedStatement ps = - defaultConnection.prepareStatement("SELECT name FROM v$database")) { - - List databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - databases.add(databaseName); - } - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } + protected String getListDatabaseSql() { + return "SELECT name FROM v$database"; } @Override - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - String createTableSql = new OracleCreateTableSqlBuilder(table).build(tablePath); - String[] createTableSqls = createTableSql.split(";"); - for (String sql : createTableSqls) { - log.info("create table sql: {}", sql); - try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) { - ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed creating table %s", tablePath.getFullName()), e); - } - } - return true; + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + return new OracleCreateTableSqlBuilder(table).build(tablePath); } @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - return false; + protected String getDropTableSql(TablePath tablePath) { + return String.format("DROP TABLE %s", getTableName(tablePath)); } @Override - protected boolean createDatabaseInternal(String databaseName) { - return false; + protected String getTableName(TablePath tablePath) { + return tablePath.getSchemaAndTableName().toUpperCase(); } @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - return false; + protected String getListTableSql(String databaseName) { + return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES" + + " WHERE TABLE_NAME NOT LIKE 'MDRT_%'" + + " AND TABLE_NAME NOT LIKE 'MDRS_%'" + + " AND TABLE_NAME NOT LIKE 'MDXT_%'" + + " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS NULL)"; } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName().toUpperCase()); - } catch (DatabaseNotExistException e) { - return false; + protected String getTableName(ResultSet rs) throws SQLException { + if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) { + return null; } + return rs.getString(1) + "." + rs.getString(2); } @Override - public List listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - try (PreparedStatement ps = - defaultConnection.prepareStatement( - "SELECT OWNER, TABLE_NAME FROM ALL_TABLES\n" - + "WHERE TABLE_NAME NOT LIKE 'MDRT_%'\n" - + " AND TABLE_NAME NOT LIKE 'MDRS_%'\n" - + " AND TABLE_NAME NOT LIKE 'MDXT_%'\n" - + " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS NULL)")) { - - ResultSet rs = ps.executeQuery(); - List tables = new ArrayList<>(); - while (rs.next()) { - if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) { - continue; - } - tables.add(rs.getString(1) + "." + rs.getString(2)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } + protected String getSelectColumnsSql(TablePath tablePath) { + return String.format( + SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), tablePath.getTableName()); } @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - try { - DatabaseMetaData metaData = defaultConnection.getMetaData(); - Optional primaryKey = - getPrimaryKey( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - List constraintKeys = - getConstraintKeys( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - - String sql = - String.format( - SELECT_COLUMNS_SQL, - tablePath.getSchemaName(), - tablePath.getTableName()); - try (PreparedStatement ps = defaultConnection.prepareStatement(sql); - ResultSet resultSet = ps.executeQuery()) { - TableSchema.Builder builder = TableSchema.builder(); - // add column - while (resultSet.next()) { - buildColumn(resultSet, builder); - } - - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - ""); - } - - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } - } - - private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throws SQLException { + protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); String typeName = resultSet.getString("TYPE_NAME"); String fullTypeName = resultSet.getString("FULL_TYPE_NAME"); @@ -314,31 +197,19 @@ private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throw break; } - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - 0, - isNullable, - defaultValue, - columnComment, - fullTypeName, - false, - false, - bitLen, - null, - columnLength); - builder.column(physicalColumn); - } - - @SuppressWarnings("unchecked") - private SeaTunnelDataType fromJdbcType(ResultSetMetaData metadata, int colIndex) - throws SQLException { - String columnType = metadata.getColumnTypeName(colIndex); - Map dataTypeProperties = new HashMap<>(); - dataTypeProperties.put(OracleDataTypeConvertor.PRECISION, metadata.getPrecision(colIndex)); - dataTypeProperties.put(OracleDataTypeConvertor.SCALE, metadata.getScale(colIndex)); - return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnType, dataTypeProperties); + return PhysicalColumn.of( + columnName, + type, + 0, + isNullable, + defaultValue, + columnComment, + fullTypeName, + false, + false, + bitLen, + null, + columnLength); } private SeaTunnelDataType fromJdbcType(String typeName, long precision, long scale) { @@ -348,14 +219,13 @@ private SeaTunnelDataType fromJdbcType(String typeName, long precision, long return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, dataTypeProperties); } - @SuppressWarnings("MagicNumber") - private Map buildConnectorOptions(TablePath tablePath) { - Map options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put("url", baseUrl); - options.put("table-name", tablePath.getSchemaAndTableName()); - options.put("username", username); - options.put("password", pwd); - return options; + @Override + protected String getUrlFromDatabaseName(String databaseName) { + return defaultUrl; + } + + @Override + protected String getOptionTableName(TablePath tablePath) { + return tablePath.getSchemaAndTableName(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index e3507666d08f..2769d09ebb70 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -18,39 +18,20 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; -import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; -import com.mysql.cj.MysqlType; -import com.mysql.cj.jdbc.result.ResultSetImpl; import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor.PG_BIT; import static org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresDataTypeConvertor.PG_BYTEA; @@ -65,7 +46,10 @@ @Slf4j public class PostgresCatalog extends AbstractJdbcCatalog { - private static final String SELECT_COLUMNS_SQL = + private static final PostgresDataTypeConvertor DATA_TYPE_CONVERTOR = + new PostgresDataTypeConvertor(); + + private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT \n" + " a.attname AS column_name, \n" + "\t\tt.typname as type_name,\n" @@ -102,8 +86,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog { + "ORDER BY \n" + " a.attnum;"; - protected static final Set SYS_DATABASES = new HashSet<>(9); - static { SYS_DATABASES.add("information_schema"); SYS_DATABASES.add("pg_catalog"); @@ -116,8 +98,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog { SYS_DATABASES.add("template1"); } - protected final Map connectionMap; - public PostgresCatalog( String catalogName, String username, @@ -125,154 +105,26 @@ public PostgresCatalog( JdbcUrlUtil.UrlInfo urlInfo, String defaultSchema) { super(catalogName, username, pwd, urlInfo, defaultSchema); - this.connectionMap = new ConcurrentHashMap<>(); - } - - public Connection getConnection(String url) { - if (connectionMap.containsKey(url)) { - return connectionMap.get(url); - } - try { - Connection connection = DriverManager.getConnection(url, username, pwd); - connectionMap.put(url, connection); - return connection; - } catch (SQLException e) { - throw new CatalogException(String.format("Failed connecting to %s via JDBC.", url), e); - } } @Override - public void close() throws CatalogException { - for (Map.Entry entry : connectionMap.entrySet()) { - try { - entry.getValue().close(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed to close %s via JDBC.", entry.getKey()), e); - } - } - super.close(); + protected String getListDatabaseSql() { + return "select datname from pg_database;"; } @Override - public List listDatabases() throws CatalogException { - try (PreparedStatement ps = - defaultConnection.prepareStatement("select datname from pg_database;")) { - - List databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!SYS_DATABASES.contains(databaseName)) { - databases.add(rs.getString(1)); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } + protected String getListTableSql(String databaseName) { + return "SELECT table_schema, table_name FROM information_schema.tables;"; } @Override - public List listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - String dbUrl = getUrlFromDatabaseName(databaseName); - Connection connection = getConnection(dbUrl); - try (PreparedStatement ps = - connection.prepareStatement( - "SELECT table_schema, table_name FROM information_schema.tables;")) { - - ResultSet rs = ps.executeQuery(); - - List tables = new ArrayList<>(); - - while (rs.next()) { - String schemaName = rs.getString("table_schema"); - String tableName = rs.getString("table_name"); - if (org.apache.commons.lang3.StringUtils.isNotBlank(schemaName) - && !SYS_DATABASES.contains(schemaName)) { - tables.add(schemaName + "." + tableName); - } - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } + protected String getSelectColumnsSql(TablePath tablePath) { + return String.format( + SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), tablePath.getTableName()); } @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - Connection conn = getConnection(dbUrl); - try { - DatabaseMetaData metaData = conn.getMetaData(); - Optional primaryKey = - getPrimaryKey( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - List constraintKeys = - getConstraintKeys( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - - String sql = - String.format( - SELECT_COLUMNS_SQL, - tablePath.getSchemaName(), - tablePath.getTableName()); - try (PreparedStatement ps = conn.prepareStatement(sql); - ResultSet resultSet = ps.executeQuery()) { - TableSchema.Builder builder = TableSchema.builder(); - - // add column - while (resultSet.next()) { - buildColumn(resultSet, builder); - } - - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - "", - "postgres"); - } - - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } - } - - private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throws SQLException { + protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("column_name"); String typeName = resultSet.getString("type_name"); String fullTypeName = resultSet.getString("full_type_name"); @@ -282,8 +134,9 @@ private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throw Object defaultValue = resultSet.getObject("default_value"); boolean isNullable = resultSet.getString("is_nullable").equals("YES"); - if (defaultValue != null && defaultValue.toString().contains("regclass")) + if (defaultValue != null && defaultValue.toString().contains("regclass")) { defaultValue = null; + } SeaTunnelDataType type = fromJdbcType(typeName, columnLength, columnScale); long bitLen = 0; @@ -311,131 +164,55 @@ private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throw break; } - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - 0, - isNullable, - defaultValue, - columnComment, - fullTypeName, - false, - false, - bitLen, - null, - columnLength); - builder.column(physicalColumn); + return PhysicalColumn.of( + columnName, + type, + 0, + isNullable, + defaultValue, + columnComment, + fullTypeName, + false, + false, + bitLen, + null, + columnLength); } @Override - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - String createTableSql = new PostgresCreateTableSqlBuilder(table).build(tablePath); - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - Connection conn = getConnection(dbUrl); - log.info("create table sql: {}", createTableSql); - try (PreparedStatement ps = conn.prepareStatement(createTableSql)) { - ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed creating table %s", tablePath.getFullName()), e); - } - return true; + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + return new PostgresCreateTableSqlBuilder(table).build(tablePath); } @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - - String schemaName = tablePath.getSchemaName(); - String tableName = tablePath.getTableName(); - - String sql = "DROP TABLE IF EXISTS \"" + schemaName + "\".\"" + tableName + "\""; - Connection connection = getConnection(dbUrl); - try (PreparedStatement ps = connection.prepareStatement(sql)) { - // Will there exist concurrent drop for one table? - return ps.execute(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed dropping table %s", tablePath.getFullName()), e); - } + protected String getDropTableSql(TablePath tablePath) { + return "DROP TABLE \"" + + tablePath.getSchemaName() + + "\".\"" + + tablePath.getTableName() + + "\""; } @Override - protected boolean createDatabaseInternal(String databaseName) throws CatalogException { - String sql = "CREATE DATABASE \"" + databaseName + "\""; - try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed creating database %s in catalog %s", - databaseName, this.catalogName), - e); - } + protected String getCreateDatabaseSql(String databaseName) { + return "CREATE DATABASE \"" + databaseName + "\""; } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } + protected String getDropDatabaseSql(String databaseName) { + return "DROP DATABASE \"" + databaseName + "\""; } @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - String sql = "DROP DATABASE IF EXISTS \"" + databaseName + "\""; - try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed dropping database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - /** - * @see MysqlType - * @see ResultSetImpl#getObjectStoredProc(int, int) - */ - @SuppressWarnings("unchecked") - private SeaTunnelDataType fromJdbcType(ResultSetMetaData metadata, int colIndex) - throws SQLException { - String columnTypeName = metadata.getColumnTypeName(colIndex); - Map dataTypeProperties = new HashMap<>(); - dataTypeProperties.put( - PostgresDataTypeConvertor.PRECISION, metadata.getPrecision(colIndex)); - dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, metadata.getScale(colIndex)); - return new PostgresDataTypeConvertor().toSeaTunnelType(columnTypeName, dataTypeProperties); + protected void dropDatabaseInternal(String databaseName) throws CatalogException { + closeDatabaseConnection(databaseName); + super.dropDatabaseInternal(databaseName); } private SeaTunnelDataType fromJdbcType(String typeName, long precision, long scale) { Map dataTypeProperties = new HashMap<>(); dataTypeProperties.put(PostgresDataTypeConvertor.PRECISION, precision); dataTypeProperties.put(PostgresDataTypeConvertor.SCALE, scale); - return new PostgresDataTypeConvertor().toSeaTunnelType(typeName, dataTypeProperties); - } - - @SuppressWarnings("MagicNumber") - private Map buildConnectorOptions(TablePath tablePath) { - Map options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put("url", baseUrl + tablePath.getDatabaseName()); - options.put("table-name", tablePath.getFullName()); - options.put("username", username); - options.put("password", pwd); - return options; - } - - private String getUrlFromDatabaseName(String databaseName) { - String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; - return url + databaseName + suffix; + return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, dataTypeProperties); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index 85f4468bef9e..d423f183010b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -48,7 +48,7 @@ public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) { public String build(TablePath tablePath) { StringBuilder createTableSql = new StringBuilder(); createTableSql - .append("CREATE TABLE IF NOT EXISTS ") + .append("CREATE TABLE ") .append(tablePath.getSchemaAndTableName()) .append(" (\n"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index ea04c60bff56..7d18ed2d9058 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -19,15 +19,10 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; -import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -37,33 +32,35 @@ import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; @Slf4j public class SqlServerCatalog extends AbstractJdbcCatalog { - private static final Set SYS_DATABASES = new HashSet<>(4); - - static { - SYS_DATABASES.add("master"); - SYS_DATABASES.add("tempdb"); - SYS_DATABASES.add("model"); - SYS_DATABASES.add("msdb"); - } + private static final SqlServerDataTypeConvertor DATA_TYPE_CONVERTOR = + new SqlServerDataTypeConvertor(); + + private static final String SELECT_COLUMNS_SQL_TEMPLATE = + "SELECT tbl.name AS table_name,\n" + + " col.name AS column_name,\n" + + " ext.value AS comment,\n" + + " col.column_id AS column_id,\n" + + " types.name AS type,\n" + + " col.max_length AS max_length,\n" + + " col.precision AS precision,\n" + + " col.scale AS scale,\n" + + " col.is_nullable AS is_nullable,\n" + + " def.definition AS default_value\n" + + "FROM sys.tables tbl\n" + + " INNER JOIN sys.columns col ON tbl.object_id = col.object_id\n" + + " LEFT JOIN sys.types types ON col.user_type_id = types.user_type_id\n" + + " LEFT JOIN sys.extended_properties ext ON ext.major_id = col.object_id AND ext.minor_id = col.column_id\n" + + " LEFT JOIN sys.default_constraints def ON col.default_object_id = def.object_id AND ext.minor_id = col.column_id AND ext.name = 'MS_Description'\n" + + "WHERE schema_name(tbl.schema_id) = '%s' %s\n" + + "ORDER BY tbl.name, col.column_id"; public SqlServerCatalog( String catalogName, @@ -75,133 +72,29 @@ public SqlServerCatalog( } @Override - public List listDatabases() throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM sys.databases")) { - - List databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!SYS_DATABASES.contains(databaseName)) { - databases.add(databaseName); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } - } - - @Override - public List listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - String dbUrl = getUrlFromDatabaseName(databaseName); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - "SELECT TABLE_SCHEMA, TABLE_NAME FROM " - + databaseName - + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) { - - ResultSet rs = ps.executeQuery(); - - List tables = new ArrayList<>(); - - while (rs.next()) { - tables.add(rs.getString(1) + "." + rs.getString(2)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } + protected String getListDatabaseSql() { + return "SELECT NAME FROM sys.databases"; } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } + protected String getListTableSql(String databaseName) { + return "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + + databaseName + + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'"; } @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } + protected String getSelectColumnsSql(TablePath tablePath) { String tableSql = StringUtils.isNotEmpty(tablePath.getTableName()) ? "AND tbl.name = '" + tablePath.getTableName() + "'" : ""; - String columnSql = - String.format( - " SELECT tbl.name AS table_name, \n col.name AS column_name, \n ext.value AS comment, \n col.column_id AS column_id, \n types.name AS type, \n col.max_length AS max_length, \n col.precision AS precision, \n col.scale AS scale, \n col.is_nullable AS is_nullable, \n def.definition AS default_value\n FROM sys.tables tbl \nINNER JOIN sys.columns col \n ON tbl.object_id = col.object_id \n LEFT JOIN sys.types types \n ON col.user_type_id = types.user_type_id \n LEFT JOIN sys.extended_properties ext \n ON ext.major_id = col.object_id and ext.minor_id = col.column_id \n LEFT JOIN sys.default_constraints def ON col.default_object_id = def.object_id \n AND ext.minor_id = col.column_id \n AND ext.name = 'MS_Description' \n WHERE schema_name(tbl.schema_id) = '%s' \n %s \n ORDER BY tbl.name, col.column_id", - tablePath.getSchemaName(), tableSql); - - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { - DatabaseMetaData metaData = conn.getMetaData(); - Optional primaryKey = - getPrimaryKey( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - List constraintKeys = - getConstraintKeys( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - - try (PreparedStatement ps = conn.prepareStatement(columnSql); - ResultSet resultSet = ps.executeQuery(); ) { - TableSchema.Builder builder = TableSchema.builder(); - while (resultSet.next()) { - buildTable(resultSet, builder); - } - - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - "", - "sqlserver"); - } - - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } + return String.format(SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), tableSql); } - private void buildTable(ResultSet resultSet, TableSchema.Builder builder) throws SQLException { + @Override + protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("column_name"); String sourceType = resultSet.getString("type"); // String typeName = resultSet.getString("DATA_TYPE").toUpperCase(); @@ -266,21 +159,19 @@ private void buildTable(ResultSet resultSet, TableSchema.Builder builder) throws default: break; } - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - 0, - isNullable, - defaultValue, - comment, - sourceType, - false, - false, - bitLen, - null, - columnLength); - builder.column(physicalColumn); + return PhysicalColumn.of( + columnName, + type, + 0, + isNullable, + defaultValue, + comment, + sourceType, + false, + false, + bitLen, + null, + columnLength); } private SeaTunnelDataType fromJdbcType(String typeName, int precision, int scale) { @@ -288,103 +179,37 @@ private SeaTunnelDataType fromJdbcType(String typeName, int precision, int sc Map dataTypeProperties = new HashMap<>(); dataTypeProperties.put(SqlServerDataTypeConvertor.PRECISION, precision); dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, scale); - return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties); + return DATA_TYPE_CONVERTOR.toSeaTunnelType(pair.getLeft(), dataTypeProperties); } @Override - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - - String createTableSql = - SqlServerCreateTableSqlBuilder.builder(tablePath, table).build(tablePath, table); - log.info("create table sql: {}", createTableSql); - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement(createTableSql)) { - System.out.println(createTableSql); - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed creating table %s", tablePath.getFullName()), e); - } + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + return SqlServerCreateTableSqlBuilder.builder(tablePath, table).build(tablePath, table); } @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "DROP TABLE IF EXISTS %s", tablePath.getFullName()))) { - // Will there exist concurrent drop for one table? - return ps.execute(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed dropping table %s", tablePath.getFullName()), e); - } + protected String getDropTableSql(TablePath tablePath) { + return String.format("DROP TABLE %s", tablePath.getFullName()); } @Override - protected boolean createDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format("CREATE DATABASE `%s`", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed creating database %s in catalog %s", - databaseName, this.catalogName), - e); - } + protected String getCreateDatabaseSql(String databaseName) { + return String.format("CREATE DATABASE %s", databaseName); } @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed dropping database %s in catalog %s", - databaseName, this.catalogName), - e); - } + protected String getDropDatabaseSql(String databaseName) { + return String.format("DROP DATABASE %s;", databaseName); } - @SuppressWarnings("unchecked") - private SeaTunnelDataType fromJdbcType(ResultSetMetaData metadata, int colIndex) - throws SQLException { - Pair> pair = - SqlServerType.parse(metadata.getColumnTypeName(colIndex)); - Map dataTypeProperties = new HashMap<>(); - dataTypeProperties.put( - SqlServerDataTypeConvertor.PRECISION, metadata.getPrecision(colIndex)); - dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex)); - return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties); - } - - @SuppressWarnings("MagicNumber") - private Map buildConnectorOptions(TablePath tablePath) { - Map options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName())); - options.put("table-name", tablePath.getFullName()); - options.put("username", username); - options.put("password", pwd); - return options; + @Override + protected void dropDatabaseInternal(String databaseName) throws CatalogException { + closeDatabaseConnection(databaseName); + super.dropDatabaseInternal(databaseName); } - private String getUrlFromDatabaseName(String databaseName) { + @Override + protected String getUrlFromDatabaseName(String databaseName) { return baseUrl + ";databaseName=" + databaseName + ";" + suffix; } - - private String getCreateTableSql(TablePath tablePath, CatalogTable table) { - - return ""; - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java index 6b8c49bc0abf..1c5fb5a2b22a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java @@ -19,8 +19,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.common.utils.JdbcUrlUtil; -import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -41,25 +39,10 @@ void testCatalog() { catalog.open(); - MySqlCatalog mySqlCatalog = - new MySqlCatalog( - "mysql", - "root", - "root@123", - JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33062/mingdongtest")); - - mySqlCatalog.open(); - - CatalogTable table1 = - mySqlCatalog.getTable(TablePath.of("mingdongtest", "all_types_table_02")); - List strings = catalog.listDatabases(); - System.out.println(strings); - - List strings1 = catalog.listTables("XE"); CatalogTable table = catalog.getTable(TablePath.of("XE", "TEST", "PG_TYPES_TABLE_CP1")); - catalog.createTableInternal(new TablePath("XE", "TEST", "TEST003"), table); + catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table, false); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java index badab864fc3f..6ef4d9e65484 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java @@ -53,7 +53,7 @@ void testCatalog() { catalog.getTable(TablePath.of("st_test", "public", "all_types_table_02")); System.out.println("find table: " + table); - catalog.createTableInternal( - new TablePath("liulitest", "public", "all_types_table_02"), table); + catalog.createTable( + new TablePath("liulitest", "public", "all_types_table_02"), table, false); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java index 3de5c65bf8d7..b75ac68223bc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java @@ -93,7 +93,7 @@ public void testBuild() { MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql"); // create table sql is change; The old unit tests are no longer applicable String expect = - "CREATE TABLE IF NOT EXISTS test_table (\n" + "CREATE TABLE test_table (\n" + "\tid null NOT NULL COMMENT 'id', \n" + "\tname null NOT NULL COMMENT 'name', \n" + "\tage null NULL COMMENT 'age', \n" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 6528be0e1fca..a38fb2217f29 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.ExceptionUtils; @@ -31,6 +34,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; @@ -76,6 +80,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour protected GenericContainer dbServer; protected JdbcCase jdbcCase; protected Connection connection; + protected Catalog catalog; abstract JdbcCase getJdbcCase(); @@ -141,12 +146,16 @@ protected void createNeededTables() { String.format( createTemplate, buildTableInfoWithSchema( - jdbcCase.getDatabase(), jdbcCase.getSourceTable())); + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSourceTable())); String createSink = String.format( createTemplate, buildTableInfoWithSchema( - jdbcCase.getDatabase(), jdbcCase.getSinkTable())); + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSinkTable())); statement.execute(createSource); statement.execute(createSink); @@ -173,6 +182,14 @@ public String insertTable(String schema, String table, String... fields) { + ")"; } + protected void clearTable(String database, String schema, String table) { + clearTable(database, table); + } + + protected String buildTableInfoWithSchema(String database, String schema, String table) { + return buildTableInfoWithSchema(database, table); + } + public void clearTable(String schema, String table) { try (Statement statement = connection.createStatement()) { statement.execute("TRUNCATE TABLE " + buildTableInfoWithSchema(schema, table)); @@ -215,6 +232,7 @@ public void startUp() { createSchemaIfNeeded(); createNeededTables(); insertTestData(); + initCatalog(); } @Override @@ -226,6 +244,10 @@ public void tearDown() throws SQLException { if (connection != null) { connection.close(); } + + if (catalog != null) { + catalog.close(); + } } @TestTemplate @@ -238,6 +260,43 @@ public void testJdbcDb(TestContainer container) } compareResult(); - clearTable(jdbcCase.getDatabase(), jdbcCase.getSinkTable()); + clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSinkTable()); + } + + protected void initCatalog() {} + + @Test + public void testCatalog() { + if (catalog == null) { + return; + } + + TablePath sourceTablePath = + new TablePath( + jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSourceTable()); + TablePath targetTablePath = + new TablePath( + jdbcCase.getCatalogDatabase(), + jdbcCase.getCatalogSchema(), + jdbcCase.getCatalogTable()); + boolean createdDb = false; + + if (!catalog.databaseExists(targetTablePath.getDatabaseName())) { + catalog.createDatabase(targetTablePath, false); + Assertions.assertTrue(catalog.databaseExists(targetTablePath.getDatabaseName())); + createdDb = true; + } + + CatalogTable catalogTable = catalog.getTable(sourceTablePath); + catalog.createTable(targetTablePath, catalogTable, false); + Assertions.assertTrue(catalog.tableExists(targetTablePath)); + + catalog.dropTable(targetTablePath, false); + Assertions.assertFalse(catalog.tableExists(targetTablePath)); + + if (createdDb) { + catalog.dropDatabase(targetTablePath, false); + Assertions.assertFalse(catalog.databaseExists(targetTablePath.getDatabaseName())); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java index 805fcbd16bb9..5f17eacc51ad 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java @@ -41,6 +41,7 @@ public class JdbcCase { private int port; private int localPort; private String database; + private String schema; private String sourceTable; private String sinkTable; private String jdbcTemplate; @@ -50,4 +51,8 @@ public class JdbcCase { private List configFile; private Pair> testData; private Map containerEnv; + + private String catalogDatabase; + private String catalogSchema; + private String catalogTable; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index f4b1338b15b5..b10aa0c2225f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; import org.apache.commons.lang3.tuple.Pair; @@ -48,6 +50,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT { private static final String MYSQL_DATABASE = "seatunnel"; private static final String MYSQL_SOURCE = "source"; private static final String MYSQL_SINK = "sink"; + private static final String CATALOG_DATABASE = "catalog_database"; private static final String MYSQL_USERNAME = "root"; private static final String MYSQL_PASSWORD = "Abc!@#135_seatunnel"; @@ -138,6 +141,8 @@ JdbcCase getJdbcCase() { .configFile(CONFIG_FILE) .insertSql(insertSql) .testData(testDataSet) + .catalogDatabase(CATALOG_DATABASE) + .catalogTable(MYSQL_SINK) .build(); } @@ -282,4 +287,16 @@ protected GenericContainer initContainer() { return container; } + + @Override + protected void initCatalog() { + catalog = + new MySqlCatalog( + "mysql", + jdbcCase.getUserName(), + jdbcCase.getPassword(), + JdbcUrlUtil.getUrlInfo( + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()))); + catalog.open(); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java index d0f8ce3b6879..75bdffbd6cad 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java @@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleURLParser; import org.apache.commons.lang3.tuple.Pair; @@ -27,6 +29,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; import com.google.common.collect.Lists; @@ -47,11 +50,13 @@ public class JdbcOracleIT extends AbstractJdbcIT { private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver"; private static final int ORACLE_PORT = 1521; private static final String ORACLE_URL = "jdbc:oracle:thin:@" + HOST + ":%s/%s"; - private static final String USERNAME = "testUser"; + private static final String USERNAME = "TESTUSER"; private static final String PASSWORD = "testPassword"; - private static final String DATABASE = "TESTUSER"; + private static final String DATABASE = "XE"; + private static final String SCHEMA = USERNAME; private static final String SOURCE_TABLE = "E2E_TABLE_SOURCE"; private static final String SINK_TABLE = "E2E_TABLE_SINK"; + private static final String CATALOG_TABLE = "E2E_TABLE_CATALOG"; private static final List CONFIG_FILE = Lists.newArrayList("/jdbc_oracle_source_to_sink.conf"); @@ -78,11 +83,11 @@ JdbcCase getJdbcCase() { containerEnv.put("ORACLE_PASSWORD", PASSWORD); containerEnv.put("APP_USER", USERNAME); containerEnv.put("APP_USER_PASSWORD", PASSWORD); - String jdbcUrl = String.format(ORACLE_URL, ORACLE_PORT, DATABASE); + String jdbcUrl = String.format(ORACLE_URL, ORACLE_PORT, SCHEMA); Pair> testDataSet = initTestData(); String[] fieldNames = testDataSet.getKey(); - String insertSql = insertTable(DATABASE, SOURCE_TABLE, fieldNames); + String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames); return JdbcCase.builder() .dockerImage(ORACLE_IMAGE) @@ -97,8 +102,12 @@ JdbcCase getJdbcCase() { .userName(USERNAME) .password(PASSWORD) .database(DATABASE) + .schema(SCHEMA) .sourceTable(SOURCE_TABLE) .sinkTable(SINK_TABLE) + .catalogDatabase(DATABASE) + .catalogSchema(SCHEMA) + .catalogTable(CATALOG_TABLE) .createSql(CREATE_SQL) .configFile(CONFIG_FILE) .insertSql(insertSql) @@ -162,9 +171,10 @@ GenericContainer initContainer() { GenericContainer container = new OracleContainer(imageName) - .withDatabaseName(DATABASE) - .withUsername(USERNAME) - .withPassword(PASSWORD) + .withDatabaseName(SCHEMA) + .withCopyFileToContainer( + MountableFile.forClasspathResource("sql/oracle_init.sql"), + "/container-entrypoint-startdb.d/init.sql") .withNetwork(NETWORK) .withNetworkAliases(ORACLE_NETWORK_ALIASES) .withExposedPorts(ORACLE_PORT) @@ -181,4 +191,27 @@ GenericContainer initContainer() { public String quoteIdentifier(String field) { return "\"" + field + "\""; } + + @Override + protected void clearTable(String database, String schema, String table) { + clearTable(schema, table); + } + + @Override + protected String buildTableInfoWithSchema(String database, String schema, String table) { + return buildTableInfoWithSchema(schema, table); + } + + @Override + protected void initCatalog() { + String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()); + catalog = + new OracleCatalog( + "oracle", + jdbcCase.getUserName(), + jdbcCase.getPassword(), + OracleURLParser.parse(jdbcUrl), + SCHEMA); + catalog.open(); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql new file mode 100644 index 000000000000..ba77de271ea6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/sql/oracle_init.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +ALTER SESSION SET CONTAINER = TESTUSER; + +CREATE USER TESTUSER IDENTIFIED BY testPassword; + +GRANT DBA TO TESTUSER; \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java index f66ef615d7b7..a5796c1aaac8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java @@ -17,6 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; @@ -26,6 +31,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.PostgreSQLContainer; @@ -252,6 +258,43 @@ public void testAutoGenerateSQL(TestContainer container) } } + @Test + public void testCatalog() { + String schema = "public"; + String databaseName = POSTGRESQL_CONTAINER.getDatabaseName(); + String tableName = "pg_e2e_sink_table"; + String catalogDatabaseName = "pg_e2e_catalog_database"; + String catalogTableName = "pg_e2e_catalog_table"; + + Catalog catalog = + new PostgresCatalog( + "postgres", + POSTGRESQL_CONTAINER.getUsername(), + POSTGRESQL_CONTAINER.getPassword(), + JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()), + schema); + catalog.open(); + + TablePath tablePath = new TablePath(databaseName, schema, tableName); + TablePath catalogTablePath = new TablePath(catalogDatabaseName, schema, catalogTableName); + + Assertions.assertFalse(catalog.databaseExists(catalogTablePath.getDatabaseName())); + catalog.createDatabase(catalogTablePath, false); + Assertions.assertTrue(catalog.databaseExists(catalogTablePath.getDatabaseName())); + + CatalogTable catalogTable = catalog.getTable(tablePath); + catalog.createTable(catalogTablePath, catalogTable, false); + Assertions.assertTrue(catalog.tableExists(catalogTablePath)); + + catalog.dropTable(catalogTablePath, false); + Assertions.assertFalse(catalog.tableExists(catalogTablePath)); + + catalog.dropDatabase(catalogTablePath, false); + Assertions.assertFalse(catalog.databaseExists(catalogTablePath.getDatabaseName())); + + catalog.close(); + } + private void initializeJdbcTable() { try (Connection connection = getJdbcConnection()) { Statement statement = connection.createStatement(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java index f615b6656ea4..0a170ff4bed2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java @@ -19,6 +19,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.commons.lang3.tuple.Pair; @@ -44,9 +46,16 @@ public class JdbcSqlServerIT extends AbstractJdbcIT { private static final String SQLSERVER_CONTAINER_HOST = "sqlserver"; private static final String SQLSERVER_SOURCE = "source"; private static final String SQLSERVER_SINK = "sink"; + private static final String SQLSERVER_DATABASE = "master"; + private static final String SQLSERVER_SCHEMA = "dbo"; + private static final String SQLSERVER_CATALOG_DATABASE = "catalog_test"; + private static final int SQLSERVER_CONTAINER_PORT = 1433; private static final String SQLSERVER_URL = - "jdbc:sqlserver://" + AbstractJdbcIT.HOST + ":%s;encrypt=false;"; + "jdbc:sqlserver://" + + AbstractJdbcIT.HOST + + ":%s;encrypt=false;databaseName=" + + SQLSERVER_DATABASE; private static final String DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; private static final List CONFIG_FILE = Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf"); @@ -81,8 +90,13 @@ JdbcCase getJdbcCase() { .jdbcUrl(jdbcUrl) .userName(username) .password(password) + .database(SQLSERVER_DATABASE) + .schema(SQLSERVER_SCHEMA) .sourceTable(SQLSERVER_SOURCE) .sinkTable(SQLSERVER_SINK) + .catalogDatabase(SQLSERVER_CATALOG_DATABASE) + .catalogSchema(SQLSERVER_SCHEMA) + .catalogTable(SQLSERVER_SINK) .createSql(CREATE_SQL) .configFile(CONFIG_FILE) .insertSql(insertSql) @@ -158,4 +172,22 @@ public String quoteIdentifier(String field) { public void clearTable(String schema, String table) { // do nothing. } + + @Override + protected String buildTableInfoWithSchema(String database, String schema, String table) { + return buildTableInfoWithSchema(schema, table); + } + + @Override + protected void initCatalog() { + catalog = + new SqlServerCatalog( + "sqlserver", + jdbcCase.getUserName(), + jdbcCase.getPassword(), + SqlServerURLParser.parse( + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), + SQLSERVER_SCHEMA); + catalog.open(); + } }