Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Remove system table limit #7391

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -71,9 +69,6 @@
public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);

protected static final Set<String> SYS_DATABASES = new HashSet<>();
protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>();

protected final String catalogName;
protected final String defaultDatabase;
protected final String username;
Expand Down Expand Up @@ -271,13 +266,7 @@ protected String getDatabaseWithConditionSql(String databaseName) {
@Override
public List<String> listDatabases() throws CatalogException {
try {
return queryString(
defaultUrl,
getListDatabaseSql(),
rs -> {
String s = rs.getString(1);
return SYS_DATABASES.contains(s) ? null : s;
});
return queryString(defaultUrl, getListDatabaseSql(), rs -> rs.getString(1));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", this.catalogName), e);
Expand All @@ -289,9 +278,6 @@ public boolean databaseExists(String databaseName) throws CatalogException {
if (StringUtils.isBlank(databaseName)) {
return false;
}
if (SYS_DATABASES.contains(databaseName)) {
return false;
}
try {
return querySQLResultExists(
getUrlFromDatabaseName(databaseName),
Expand All @@ -318,7 +304,7 @@ protected String getTableWithConditionSql(TablePath tablePath) {
protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
if (StringUtils.isNotBlank(schemaName) && !SYS_DATABASES.contains(schemaName)) {
if (StringUtils.isNotBlank(schemaName)) {
return schemaName + "." + tableName;
}
return null;
Expand Down Expand Up @@ -347,9 +333,6 @@ public List<String> listTables(String databaseName)
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
String databaseName = tablePath.getDatabaseName();
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
try {
return querySQLResultExists(
this.getUrlFromDatabaseName(databaseName), getTableWithConditionSql(tablePath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@Slf4j
public class DamengCatalog extends AbstractJdbcCatalog {
private static final List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList("SYS", "SYSDBA", "SYSSSO", "SYSAUDITOR", "CTISYS"));

private static final String SELECT_COLUMNS_SQL =
"SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE, COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
Expand Down Expand Up @@ -110,9 +105,6 @@ protected String getListTableSql(String databaseName) {

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
return null;
}
return rs.getString(1) + "." + rs.getString(2);
}

Expand Down Expand Up @@ -177,9 +169,6 @@ public List<String> listTables(String databaseName)

List<String> tables = new ArrayList<>();
while (rs.next()) {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
continue;
}
tables.add(rs.getString(1) + "." + rs.getString(2));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public class IrisCatalog extends AbstractJdbcCatalog {
public IrisCatalog(
String catalogName, String username, String password, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, password, urlInfo, null);
SYS_DATABASES.add("%SYS");
}

@Override
Expand Down Expand Up @@ -138,9 +137,6 @@ public boolean databaseExists(String databaseName) throws CatalogException {

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
private static final String SELECT_TABLE_EXISTS =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
SYS_DATABASES.add("performance_schema");
SYS_DATABASES.add("sys");
}

private MySqlVersion version;
private MySqlTypeConverter typeConverter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,6 @@ public class OceanBaseMySqlCatalog extends AbstractJdbcCatalog {
private static final String SELECT_TABLE_EXISTS =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

static {
SYS_DATABASES.clear();
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
SYS_DATABASES.add("oceanbase");
SYS_DATABASES.add("LBACSYS");
SYS_DATABASES.add("ORAAUDITOR");
SYS_DATABASES.add("SYS");
}

private OceanBaseMySqlTypeConverter typeConverter;

public OceanBaseMySqlCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@

public class OceanBaseOracleCatalog extends OracleCatalog {

static {
EXCLUDED_SCHEMAS.add("oceanbase");
EXCLUDED_SCHEMAS.add("LBACSYS");
EXCLUDED_SCHEMAS.add("ORAAUDITOR");
EXCLUDED_SCHEMAS.add("SYS");
}

public OceanBaseOracleCatalog(
String catalogName,
String username,
Expand All @@ -59,9 +52,6 @@ protected String getDatabaseWithConditionSql(String databaseName) {

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@Slf4j
public class OracleCatalog extends AbstractJdbcCatalog {

protected static List<String> EXCLUDED_SCHEMAS_ALL =
Collections.unmodifiableList(
Arrays.asList(
"APPQOSSYS",
"AUDSYS",
"CTXSYS",
"DVSYS",
"DBSFWUSER",
"DBSNMP",
"GSMADMIN_INTERNAL",
"LBACSYS",
"MDSYS",
"OJVMSYS",
"OLAPSYS",
"ORDDATA",
"ORDSYS",
"OUTLN",
"SYS",
"SYSTEM",
"WMSYS",
"XDB",
"EXFSYS",
"SYSMAN"));

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT\n"
+ " cols.COLUMN_NAME,\n"
Expand Down Expand Up @@ -97,10 +71,6 @@ public class OracleCatalog extends AbstractJdbcCatalog {
+ "ORDER BY \n"
+ " cols.column_id \n";

static {
EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL);
}

public OracleCatalog(
String catalogName,
String username,
Expand Down Expand Up @@ -157,9 +127,6 @@ protected String getListTableSql(String databaseName) {

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
return null;
}
return rs.getString(1) + "." + rs.getString(2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,6 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
+ "ORDER BY \n"
+ " a.attnum;";

static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("pg_catalog");
SYS_DATABASES.add("root");
SYS_DATABASES.add("pg_toast");
SYS_DATABASES.add("pg_temp_1");
SYS_DATABASES.add("pg_toast_temp_1");
SYS_DATABASES.add("postgres");
SYS_DATABASES.add("template0");
SYS_DATABASES.add("template1");
}

public PostgresCatalog(
String catalogName,
String username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,6 @@ public class RedshiftCatalog extends AbstractJdbcCatalog {
private final String SELECT_COLUMNS =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC";

static {
EXCLUDED_SCHEMAS.add("information_schema");
EXCLUDED_SCHEMAS.add("catalog_history");
EXCLUDED_SCHEMAS.add("pg_auto_copy");
EXCLUDED_SCHEMAS.add("pg_automv");
EXCLUDED_SCHEMAS.add("pg_catalog");
EXCLUDED_SCHEMAS.add("pg_internal");
EXCLUDED_SCHEMAS.add("pg_s3");
}

static {
SYS_DATABASES.add("template0");
SYS_DATABASES.add("template1");
SYS_DATABASES.add("awsdatacatalog");
SYS_DATABASES.add("padb_harvest");
}

protected final Map<String, Connection> connectionMap;

public RedshiftCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,6 @@
@Slf4j
public class SapHanaCatalog extends AbstractJdbcCatalog {

static {
SYS_DATABASES.add("SYS");
SYS_DATABASES.add("SYSTEM");
SYS_DATABASES.add("SYS_DATABASES");
SYS_DATABASES.add("_SYS_ADVISOR");
SYS_DATABASES.add("_SYS_AFL");
SYS_DATABASES.add("_SYS_BI");
SYS_DATABASES.add("_SYS_BIC");
SYS_DATABASES.add("_SYS_DATA_ANONYMIZATION");
SYS_DATABASES.add("_SYS_DI");
SYS_DATABASES.add("_SYS_EPM");
SYS_DATABASES.add("_SYS_LDB");
SYS_DATABASES.add("_SYS_PLAN_STABILITY");
SYS_DATABASES.add("_SYS_REPO");
SYS_DATABASES.add("_SYS_RT");
SYS_DATABASES.add("_SYS_SECURITY");
SYS_DATABASES.add("_SYS_SQL_ANALYZER");
SYS_DATABASES.add("_SYS_STATISTICS");
SYS_DATABASES.add("_SYS_TABLE_REPLICAS");
SYS_DATABASES.add("_SYS_TASK");
SYS_DATABASES.add("_SYS_TELEMETRY");
SYS_DATABASES.add("_SYS_XS");
SYS_DATABASES.add("_SYS_DI_CATALOG");
SYS_DATABASES.add("_SYS_EPM_DATA");
SYS_DATABASES.add("_SYS_DI_SU");
SYS_DATABASES.add("_SYS_WORKLOAD_REPLAY");
SYS_DATABASES.add("_SYS_AUDIT");
SYS_DATABASES.add("_SYS_DI_BI_CATALOG");
SYS_DATABASES.add("_SYS_DI_CDS_CATALOG");
SYS_DATABASES.add("_SYS_DI_SEARCH_CATALOG");
SYS_DATABASES.add("_SYS_DI_TO");
}

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT\n"
+ " C.COLUMN_NAME,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@

public class TiDBCatalog extends MySqlCatalog {

static {
SYS_DATABASES.clear();
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
SYS_DATABASES.add("performance_schema");
SYS_DATABASES.add("metrics_schema");
}

public TiDBCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@Slf4j
public class XuguCatalog extends AbstractJdbcCatalog {

protected static List<String> EXCLUDED_SCHEMAS =
Collections.unmodifiableList(Arrays.asList("GUEST", "SYSAUDITOR", "SYSSSO"));

private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT\n"
+ " dc.COLUMN_NAME,\n"
Expand Down Expand Up @@ -172,9 +167,6 @@ protected String getListTableSql(String databaseName) {

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
return null;
}
return rs.getString(1) + "." + rs.getString(2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
Expand All @@ -79,15 +77,8 @@ public class StarRocksCatalog implements Catalog {
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
private final String template;

private static final Set<String> SYS_DATABASES = new HashSet<>();
private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);

static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("_statistics_");
}

public StarRocksCatalog(
String catalogName, String username, String pwd, String defaultUrl, String template) {

Expand Down Expand Up @@ -115,10 +106,7 @@ public List<String> listDatabases() throws CatalogException {
ResultSet rs = ps.executeQuery();

while (rs.next()) {
String databaseName = rs.getString(1);
if (!SYS_DATABASES.contains(databaseName)) {
databases.add(rs.getString(1));
}
databases.add(rs.getString(1));
}

return databases;
Expand Down
Loading