Skip to content

Commit

Permalink
fix exist check and IT container host
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Aug 11, 2023
1 parent 896c529 commit a586988
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,13 @@ public void close() throws CatalogException {
LOG.info("Catalog {} closing", catalogName);
}

protected abstract String getSelectColumnsSql(TablePath tablePath);
protected String getSelectColumnsSql(TablePath tablePath) {
throw new UnsupportedOperationException();
}

protected abstract Column buildColumn(ResultSet resultSet) throws SQLException;
protected Column buildColumn(ResultSet resultSet) throws SQLException {
throw new UnsupportedOperationException();
}

protected TableIdentifier getTableIdentifier(TablePath tablePath) {
return TableIdentifier.of(
Expand Down Expand Up @@ -278,7 +282,9 @@ protected List<ConstraintKey> getConstraintKeys(
return new ArrayList<>(constraintKeyMap.values());
}

protected abstract String getListDatabaseSql();
protected String getListDatabaseSql() {
throw new UnsupportedOperationException();
}

@Override
public List<String> listDatabases() throws CatalogException {
Expand All @@ -303,7 +309,9 @@ public boolean databaseExists(String databaseName) throws CatalogException {
return listDatabases().contains(databaseName);
}

protected abstract String getListTableSql(String databaseName);
protected String getListTableSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
Expand Down Expand Up @@ -359,18 +367,26 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
defaultSchema.get(),
tablePath.getTableName());
}
if (!createTableInternal(tablePath, table) && !ignoreIfExists) {

if (tableExists(tablePath)) {
if (ignoreIfExists) {
return;
}
throw new TableAlreadyExistException(catalogName, tablePath);
}

createTableInternal(tablePath, table);
}

protected abstract String getCreateTableSql(TablePath tablePath, CatalogTable table);
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
protected void createTableInternal(TablePath tablePath, CatalogTable table)
throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
try {
return executeInternal(dbUrl, getCreateTableSql(tablePath, table));
executeInternal(dbUrl, getCreateTableSql(tablePath, table));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed creating table %s", tablePath.getFullName()), e);
Expand All @@ -381,18 +397,26 @@ protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
if (!dropTableInternal(tablePath) && !ignoreIfNotExists) {

if (!tableExists(tablePath)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(catalogName, tablePath);
}

dropTableInternal(tablePath);
}

protected abstract String getDropTableSql(TablePath tablePath);
protected String getDropTableSql(TablePath tablePath) {
throw new UnsupportedOperationException();
}

protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
protected void dropTableInternal(TablePath tablePath) throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
try {
// Will there exist concurrent drop for one table?
return executeInternal(dbUrl, getDropTableSql(tablePath));
executeInternal(dbUrl, getDropTableSql(tablePath));
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed dropping table %s", tablePath.getFullName()), e);
Expand All @@ -405,16 +429,23 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
checkNotNull(tablePath, "Table path cannot be null");
checkNotNull(tablePath.getDatabaseName(), "Database name cannot be null");

if (!createDatabaseInternal(tablePath.getDatabaseName()) && !ignoreIfExists) {
if (databaseExists(tablePath.getDatabaseName())) {
if (ignoreIfExists) {
return;
}
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
}

createDatabaseInternal(tablePath.getDatabaseName());
}

protected abstract String getCreateDatabaseSql(String databaseName);
protected String getCreateDatabaseSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected boolean createDatabaseInternal(String databaseName) {
protected void createDatabaseInternal(String databaseName) {
try {
return executeInternal(defaultUrl, getCreateDatabaseSql(databaseName));
executeInternal(defaultUrl, getCreateDatabaseSql(databaseName));
} catch (Exception e) {
throw new CatalogException(
String.format(
Expand All @@ -430,16 +461,23 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
checkNotNull(tablePath, "Table path cannot be null");
checkNotNull(tablePath.getDatabaseName(), "Database name cannot be null");

if (!dropDatabaseInternal(tablePath.getDatabaseName()) && !ignoreIfNotExists) {
if (!databaseExists(tablePath.getDatabaseName())) {
if (ignoreIfNotExists) {
return;
}
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}

dropDatabaseInternal(tablePath.getDatabaseName());
}

protected abstract String getDropDatabaseSql(String databaseName);
protected String getDropDatabaseSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
protected void dropDatabaseInternal(String databaseName) throws CatalogException {
try {
return executeInternal(defaultUrl, getDropDatabaseSql(databaseName));
executeInternal(defaultUrl, getDropDatabaseSql(databaseName));
} catch (Exception e) {
throw new CatalogException(
String.format(
Expand Down Expand Up @@ -489,6 +527,8 @@ protected List<String> queryString(String url, String sql, ResultSetConsumer<Str
}
}

// If sql is DDL, the execute() method always returns false, so the return value
// should not be used to determine whether changes were made in database.
protected boolean executeInternal(String url, String sql) throws SQLException {
try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
return ps.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,17 @@ protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE IF EXISTS %s;", tablePath.getFullName());
return String.format("DROP TABLE %s;", tablePath.getFullName());
}

@Override
protected String getCreateDatabaseSql(String databaseName) {
return String.format("CREATE DATABASE IF NOT EXISTS `%s`;", databaseName);
return String.format("CREATE DATABASE `%s`;", databaseName);
}

@Override
protected String getDropDatabaseSql(String databaseName) {
return String.format("DROP DATABASE IF EXISTS `%s`;", databaseName);
return String.format("DROP DATABASE `%s`;", databaseName);
}

private SeaTunnelDataType<?> fromJdbcType(String typeName, int precision, int scale) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String build(String catalogName) {
List<String> sqls = new ArrayList<>();
sqls.add(
String.format(
"CREATE TABLE IF NOT EXISTS %s (\n%s\n)",
"CREATE TABLE %s (\n%s\n)",
tableName, buildColumnsIdentifySql(catalogName)));
if (engine != null) {
sqls.add("ENGINE = " + engine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,7 @@ protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE IF EXISTS %s;", getTableName(tablePath));
}

@Override
protected String getCreateDatabaseSql(String databaseName) {
throw new UnsupportedOperationException();
}

@Override
protected String getDropDatabaseSql(String databaseName) {
throw new UnsupportedOperationException();
return String.format("DROP TABLE %s;", getTableName(tablePath));
}

@Override
Expand All @@ -150,7 +140,7 @@ protected String getTableName(TablePath tablePath) {
@Override
protected String getListTableSql(String databaseName) {
return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES"
+ "WHERE TABLE_NAME NOT LIKE 'MDRT_%'"
+ " WHERE TABLE_NAME NOT LIKE 'MDRT_%'"
+ " AND TABLE_NAME NOT LIKE 'MDRS_%'"
+ " AND TABLE_NAME NOT LIKE 'MDXT_%'"
+ " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS NULL)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {

@Override
protected String getDropTableSql(TablePath tablePath) {
return "DROP TABLE IF EXISTS \""
return "DROP TABLE \""
+ tablePath.getSchemaName()
+ "\".\""
+ tablePath.getTableName()
Expand All @@ -199,7 +199,7 @@ protected String getCreateDatabaseSql(String databaseName) {

@Override
protected String getDropDatabaseSql(String databaseName) {
return "DROP DATABASE IF EXISTS \"" + databaseName + "\"";
return "DROP DATABASE \"" + databaseName + "\"";
}

private SeaTunnelDataType<?> fromJdbcType(String typeName, long precision, long scale) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) {
public String build(TablePath tablePath) {
StringBuilder createTableSql = new StringBuilder();
createTableSql
.append("CREATE TABLE IF NOT EXISTS ")
.append("CREATE TABLE ")
.append(tablePath.getSchemaAndTableName())
.append(" (\n");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {

@Override
protected String getDropTableSql(TablePath tablePath) {
return String.format("DROP TABLE IF EXISTS %s", tablePath.getFullName());
return String.format("DROP TABLE %s", tablePath.getFullName());
}

@Override
Expand All @@ -205,7 +205,7 @@ protected String getCreateDatabaseSql(String databaseName) {

@Override
protected String getDropDatabaseSql(String databaseName) {
return String.format("DROP DATABASE IF EXISTS %s;", databaseName);
return String.format("DROP DATABASE %s;", databaseName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testBuild() {
MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql");
// create table sql is change; The old unit tests are no longer applicable
String expect =
"CREATE TABLE IF NOT EXISTS test_table (\n"
"CREATE TABLE test_table (\n"
+ "\tid null NOT NULL COMMENT 'id', \n"
+ "\tname null NOT NULL COMMENT 'name', \n"
+ "\tage null NULL COMMENT 'age', \n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ JdbcCase getJdbcCase() {

@Override
protected void initCatalog() {
String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost());
catalog =
new OracleCatalog(
"oracle",
jdbcCase.getUserName(),
jdbcCase.getPassword(),
JdbcUrlUtil.getUrlInfo(
jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())),
new JdbcUrlUtil.UrlInfo(
jdbcUrl, jdbcUrl, dbServer.getHost(), ORACLE_PORT, DATABASE, null),
DATABASE);
catalog.open();
}
Expand Down

0 comments on commit a586988

Please sign in to comment.