Skip to content

Commit

Permalink
refactor jdbc connector AbstractJdbcCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Aug 28, 2023
1 parent 94c6e3e commit be653d4
Show file tree
Hide file tree
Showing 15 changed files with 638 additions and 1,011 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String build(String catalogName) {
List<String> sqls = new ArrayList<>();
sqls.add(
String.format(
"CREATE TABLE IF NOT EXISTS %s (\n%s\n)",
"CREATE TABLE %s (\n%s\n)",
tableName, buildColumnsIdentifySql(catalogName)));
if (engine != null) {
sqls.add("ENGINE = " + engine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,22 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;

import lombok.extern.slf4j.Slf4j;

import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BFILE;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleDataTypeConvertor.ORACLE_BLOB;
Expand All @@ -61,8 +50,10 @@

@Slf4j
public class OracleCatalog extends AbstractJdbcCatalog {

private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR =
new OracleDataTypeConvertor();

private static final List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList(
Expand All @@ -87,7 +78,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
"EXFSYS",
"SYSMAN"));

private static final String SELECT_COLUMNS_SQL =
private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT\n"
+ " cols.COLUMN_NAME,\n"
+ " CASE \n"
Expand Down Expand Up @@ -127,158 +118,50 @@ public OracleCatalog(
}

@Override
public List<String> listDatabases() throws CatalogException {
try (PreparedStatement ps =
defaultConnection.prepareStatement("SELECT name FROM v$database")) {

List<String> databases = new ArrayList<>();
ResultSet rs = ps.executeQuery();

while (rs.next()) {
String databaseName = rs.getString(1);
databases.add(databaseName);
}
return databases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", this.catalogName), e);
}
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
}

@Override
protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
throws CatalogException {
String createTableSql = new OracleCreateTableSqlBuilder(table).build(tablePath);
String[] createTableSqls = createTableSql.split(";");
for (String sql : createTableSqls) {
log.info("create table sql: {}", sql);
try (PreparedStatement ps = defaultConnection.prepareStatement(sql)) {
ps.execute();
} catch (Exception e) {
throw new CatalogException(
String.format("Failed creating table %s", tablePath.getFullName()), e);
}
}
return true;
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return new OracleCreateTableSqlBuilder(table).build(tablePath);
}

@Override
protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
return false;
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE %s", getTableName(tablePath));
}

@Override
protected boolean createDatabaseInternal(String databaseName) {
return false;
protected String getTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName().toUpperCase();
}

@Override
protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
return false;
protected String getListTableSql(String databaseName) {
return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES"
+ " WHERE TABLE_NAME NOT LIKE 'MDRT_%'"
+ " AND TABLE_NAME NOT LIKE 'MDRS_%'"
+ " AND TABLE_NAME NOT LIKE 'MDXT_%'"
+ " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS NULL)";
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName().toUpperCase());
} catch (DatabaseNotExistException e) {
return false;
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
return null;
}
return rs.getString(1) + "." + rs.getString(2);
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(this.catalogName, databaseName);
}

try (PreparedStatement ps =
defaultConnection.prepareStatement(
"SELECT OWNER, TABLE_NAME FROM ALL_TABLES\n"
+ "WHERE TABLE_NAME NOT LIKE 'MDRT_%'\n"
+ " AND TABLE_NAME NOT LIKE 'MDRS_%'\n"
+ " AND TABLE_NAME NOT LIKE 'MDXT_%'\n"
+ " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS NULL)")) {

ResultSet rs = ps.executeQuery();
List<String> tables = new ArrayList<>();
while (rs.next()) {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
continue;
}
tables.add(rs.getString(1) + "." + rs.getString(2));
}

return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}

try {
DatabaseMetaData metaData = defaultConnection.getMetaData();
Optional<PrimaryKey> primaryKey =
getPrimaryKey(
metaData,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
List<ConstraintKey> constraintKeys =
getConstraintKeys(
metaData,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());

String sql =
String.format(
SELECT_COLUMNS_SQL,
tablePath.getSchemaName(),
tablePath.getTableName());
try (PreparedStatement ps = defaultConnection.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery()) {
TableSchema.Builder builder = TableSchema.builder();
// add column
while (resultSet.next()) {
buildColumn(resultSet, builder);
}

// add primary key
primaryKey.ifPresent(builder::primaryKey);
// add constraint key
constraintKeys.forEach(builder::constraintKey);
TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogName,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
return CatalogTable.of(
tableIdentifier,
builder.build(),
buildConnectorOptions(tablePath),
Collections.emptyList(),
"");
}

} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}

private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throws SQLException {
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String typeName = resultSet.getString("TYPE_NAME");
String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
Expand Down Expand Up @@ -314,31 +197,19 @@ private void buildColumn(ResultSet resultSet, TableSchema.Builder builder) throw
break;
}

PhysicalColumn physicalColumn =
PhysicalColumn.of(
columnName,
type,
0,
isNullable,
defaultValue,
columnComment,
fullTypeName,
false,
false,
bitLen,
null,
columnLength);
builder.column(physicalColumn);
}

@SuppressWarnings("unchecked")
private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
throws SQLException {
String columnType = metadata.getColumnTypeName(colIndex);
Map<String, Object> dataTypeProperties = new HashMap<>();
dataTypeProperties.put(OracleDataTypeConvertor.PRECISION, metadata.getPrecision(colIndex));
dataTypeProperties.put(OracleDataTypeConvertor.SCALE, metadata.getScale(colIndex));
return DATA_TYPE_CONVERTOR.toSeaTunnelType(columnType, dataTypeProperties);
return PhysicalColumn.of(
columnName,
type,
0,
isNullable,
defaultValue,
columnComment,
fullTypeName,
false,
false,
bitLen,
null,
columnLength);
}

private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, long scale) {
Expand All @@ -348,14 +219,13 @@ private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, long
return DATA_TYPE_CONVERTOR.toSeaTunnelType(typeName, dataTypeProperties);
}

@SuppressWarnings("MagicNumber")
private Map<String, String> buildConnectorOptions(TablePath tablePath) {
Map<String, String> options = new HashMap<>(8);
options.put("connector", "jdbc");
options.put("url", baseUrl);
options.put("table-name", tablePath.getSchemaAndTableName());
options.put("username", username);
options.put("password", pwd);
return options;
@Override
protected String getUrlFromDatabaseName(String databaseName) {
return defaultUrl;
}

@Override
protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}
}
Loading

0 comments on commit be653d4

Please sign in to comment.