From 1f71caf96e7ac1b5d7a233ae3b0e4dad14f2514a Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Sun, 29 Sep 2024 16:23:30 +0800 Subject: [PATCH] [improvement](jdbc catalog) Optimize JdbcCatalog case mapping stability (#40891) This PR makes the following changes to the uppercase and lowercase mapping of JdbcCatalog 1. The identifierMapping is managed by JdbcExternalCatalog instead of JdbcClient to better control its lifecycle 2. The identifierMapping no longer loads remoteName alone, but Catalog controls the loading uniformly 3. The identifierMapping will be loaded when each FE performs makeSureInitialized() to ensure that each FE has a mapping 4. The initialization of mapping will only be performed once in makeSureInitialized(), which means that even if you use metaCache, if your source data is updated when identifierMapping is enabled, you must refresh the catalog to query normally. 5. The identifierMapping is only responsible for the properties of the Catalog and is no longer affected by the fe config, simplifying the processing logic 6. If lower_case_mete_names is false and meta_names_mapping is empty in the catalog properties, the identifierMapping will no longer take effect, further enhancing the stability of the default settings 7. The JdbcClient is no longer closed during onRefreshCache, reducing the repeated creation of resources, improving reuse, and reducing the leakage of some global shared threads --- .../doris/datasource/ExternalCatalog.java | 18 + .../doris/datasource/ExternalDatabase.java | 7 + .../datasource/jdbc/JdbcExternalCatalog.java | 56 +++- .../datasource/jdbc/JdbcExternalTable.java | 25 +- .../jdbc/JdbcIdentifierMapping.java | 45 --- .../datasource/jdbc/client/JdbcClient.java | 46 +-- .../jdbc/client/JdbcMySQLClient.java | 4 +- .../jdbc/client/JdbcOracleClient.java | 4 +- .../mapping/DefaultIdentifierMapping.java | 268 +++++++++++++++ .../datasource/mapping/IdentifierMapping.java | 307 +----------------- 10 files changed, 371 insertions(+), 409 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 25329a1a829a9d..d6a6cc3c4a52d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -48,6 +48,7 @@ import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase; import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase; +import org.apache.doris.datasource.mapping.IdentifierMapping; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; import org.apache.doris.datasource.metacache.MetaCache; import org.apache.doris.datasource.operations.ExternalMetadataOps; @@ -149,6 +150,9 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; + protected IdentifierMapping identifierMapping; + private boolean mappingsInitialized = false; + public ExternalCatalog() { } @@ -181,6 +185,10 @@ protected List listDatabaseNames() { } } + // only for forward to master + protected void buildDatabaseMapping() { + } + // Will be called when creating catalog(so when as replaying) // to add some default properties if missing. public void setDefaultPropsIfMissing(boolean isReplay) { @@ -209,6 +217,10 @@ public void checkWhenCreating() throws DdlException { */ public abstract List listTableNames(SessionContext ctx, String dbName); + // only for forward to master + protected void buildTableMapping(SessionContext ctx, String dbName) { + } + /** * check if the specified table exist. * @@ -273,6 +285,10 @@ public final synchronized void makeSureInitialized() { } initialized = true; } + if (!mappingsInitialized) { + buildDatabaseMapping(); + mappingsInitialized = true; + } } protected final void initLocalObjects() { @@ -398,6 +414,7 @@ private List getFilteredDatabaseNames() { public void onRefresh(boolean invalidCache) { this.objectCreated = false; this.initialized = false; + this.mappingsInitialized = false; synchronized (this.propLock) { this.convertedProperties = null; } @@ -756,6 +773,7 @@ public void gsonPostProcess() throws IOException { } this.propLock = new byte[0]; this.initialized = false; + this.mappingsInitialized = false; setDefaultPropsIfMissing(true); if (tableAutoAnalyzePolicy == null) { tableAutoAnalyzePolicy = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index d653a5a178e484..cf65a5f0a48de6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -91,6 +91,8 @@ public abstract class ExternalDatabase private MetaCache metaCache; + private boolean mappingsInitialized = false; + /** * Create external database. * @@ -117,6 +119,7 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) { public void setUnInitialized(boolean invalidCache) { this.initialized = false; + this.mappingsInitialized = false; this.invalidCacheInInit = invalidCache; if (extCatalog.getUseMetaCache().isPresent()) { if (extCatalog.getUseMetaCache().get() && metaCache != null) { @@ -170,6 +173,10 @@ public final synchronized void makeSureInitialized() { } initialized = true; } + if (!mappingsInitialized) { + extCatalog.buildTableMapping(null, name); + mappingsInitialized = true; + } } public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index fb26265d19fe93..9d837fcac6328c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.datasource.mapping.DefaultIdentifierMapping; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; @@ -119,19 +120,16 @@ public void onRefresh(boolean invalidCache) { super.onRefresh(invalidCache); if (jdbcClient != null) { jdbcClient.closeClient(); + jdbcClient = null; } } - @Override - public void onRefreshCache(boolean invalidCache) { - onRefresh(invalidCache); - } - @Override public void onClose() { super.onClose(); if (jdbcClient != null) { jdbcClient.closeClient(); + jdbcClient = null; } } @@ -232,8 +230,6 @@ protected void initLocalObjectsImpl() { .setDriverUrl(getDriverUrl()) .setDriverClass(getDriverClass()) .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase()) - .setIsLowerCaseMetaNames(getLowerCaseMetaNames()) - .setMetaNamesMapping(getMetaNamesMapping()) .setIncludeDatabaseMap(getIncludeDatabaseMap()) .setExcludeDatabaseMap(getExcludeDatabaseMap()) .setConnectionPoolMinSize(getConnectionPoolMinSize()) @@ -243,22 +239,62 @@ protected void initLocalObjectsImpl() { .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); + identifierMapping = new DefaultIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()), + getMetaNamesMapping()); } + @Override protected List listDatabaseNames() { - return jdbcClient.getDatabaseNameList(); + return identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList()); + } + + @Override + protected void buildDatabaseMapping() { + identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList()); + } + + protected String getRemoteDatabaseName(String dbName) { + return identifierMapping.toRemoteDatabaseName(dbName); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - return jdbcClient.getTablesNameList(dbName); + String remoteDbName = getRemoteDatabaseName(dbName); + return identifierMapping.fromRemoteTableName(remoteDbName, jdbcClient.getTablesNameList(remoteDbName)); + } + + @Override + protected void buildTableMapping(SessionContext ctx, String dbName) { + String remoteDbName = getRemoteDatabaseName(dbName); + identifierMapping.fromRemoteTableName(getRemoteDatabaseName(dbName), + jdbcClient.getTablesNameList(remoteDbName)); + } + + protected String getRemoteTableName(String dbName, String tblName) { + return identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName); } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - return jdbcClient.isTableExist(dbName, tblName); + String remoteDbName = getRemoteDatabaseName(dbName); + String remoteTblName = getRemoteTableName(dbName, tblName); + return jdbcClient.isTableExist(remoteDbName, remoteTblName); + } + + public List listColumns(String dbName, String tblName) { + makeSureInitialized(); + String remoteDbName = getRemoteDatabaseName(dbName); + String remoteTblName = getRemoteTableName(dbName, tblName); + return identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName, + jdbcClient.getColumnsFromJdbc(remoteDbName, + remoteTblName)); + } + + protected Map getRemoteColumnNames(String dbName, String tblName) { + return identifierMapping.toRemoteColumnNames(getRemoteDatabaseName(dbName), + getRemoteTableName(dbName, tblName)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 495311bc087d5b..78805612e92d87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -32,6 +32,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; +import com.google.common.collect.Maps; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,21 +87,29 @@ public TTableDescriptor toThrift() { @Override public Optional initSchema() { - return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient() - .getColumnsFromJdbc(dbName, name))); + return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).listColumns(dbName, name))); } private JdbcTable toJdbcTable() { List schema = getFullSchema(); JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog; - String fullDbName = this.dbName + "." + this.name; - JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE); - jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName); + String fullTableName = this.dbName + "." + this.name; + JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE); + jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName); // Set remote properties - jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName)); - jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name)); - jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name)); + jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName)); + jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, this.name)); + Map remoteColumnNames = jdbcCatalog.getRemoteColumnNames(this.dbName, this.name); + if (!remoteColumnNames.isEmpty()) { + jdbcTable.setRemoteColumnNames(remoteColumnNames); + } else { + remoteColumnNames = Maps.newHashMap(); + for (Column column : schema) { + remoteColumnNames.put(column.getName(), column.getName()); + } + jdbcTable.setRemoteColumnNames(remoteColumnNames); + } return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java deleted file mode 100644 index 20a74724b3e496..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.jdbc; - -import org.apache.doris.datasource.jdbc.client.JdbcClient; -import org.apache.doris.datasource.mapping.IdentifierMapping; - -public class JdbcIdentifierMapping extends IdentifierMapping { - private final JdbcClient jdbcClient; - - public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping, JdbcClient jdbcClient) { - super(isLowerCaseMetaNames, metaNamesMapping); - this.jdbcClient = jdbcClient; - } - - @Override - protected void loadDatabaseNames() { - jdbcClient.getDatabaseNameList(); - } - - @Override - protected void loadTableNames(String localDbName) { - jdbcClient.getTablesNameList(localDbName); - } - - @Override - protected void loadColumnNames(String localDbName, String localTableName) { - jdbcClient.getColumnsFromJdbc(localDbName, localTableName); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 0832aa68f00372..b9adebc65d898f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -24,7 +24,6 @@ import org.apache.doris.cloud.security.SecurityChecker; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping; import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema; import com.google.common.collect.ImmutableSet; @@ -63,11 +62,8 @@ public abstract class JdbcClient { protected ClassLoader classLoader = null; protected HikariDataSource dataSource = null; protected boolean isOnlySpecifiedDatabase; - protected boolean isLowerCaseMetaNames; - protected String metaNamesMapping; protected Map includeDatabaseMap; protected Map excludeDatabaseMap; - protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching; public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) { String dbType = parseDbType(jdbcClientConfig.getJdbcUrl()); @@ -104,8 +100,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.catalogName = jdbcClientConfig.getCatalog(); this.jdbcUser = jdbcClientConfig.getUser(); this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase()); - this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames()); - this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping(); this.includeDatabaseMap = Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap()); this.excludeDatabaseMap = @@ -114,7 +108,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.dbType = parseDbType(jdbcUrl); initializeClassLoader(jdbcClientConfig); initializeDataSource(jdbcClientConfig); - this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this); } // Initialize DataSource @@ -297,10 +290,9 @@ public List getDatabaseNameList() { /** * get all tables of one database */ - public List getTablesNameList(String localDbName) { + public List getTablesNameList(String remoteDbName) { List remoteTablesNames = Lists.newArrayList(); String[] tableTypes = getTableTypes(); - String remoteDbName = getRemoteDatabaseName(localDbName); processTable(remoteDbName, null, tableTypes, (rs) -> { try { while (rs.next()) { @@ -310,14 +302,12 @@ public List getTablesNameList(String localDbName) { throw new JdbcClientException("failed to get all tables for remote database: `%s`", e, remoteDbName); } }); - return filterTableNames(remoteDbName, remoteTablesNames); + return remoteTablesNames; } - public boolean isTableExist(String localDbName, String localTableName) { + public boolean isTableExist(String remoteDbName, String remoteTableName) { final boolean[] isExist = {false}; String[] tableTypes = getTableTypes(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> { try { if (rs.next()) { @@ -334,12 +324,10 @@ public boolean isTableExist(String localDbName, String localTableName) { /** * get all columns of one table */ - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); @@ -365,21 +353,7 @@ public List getColumnsFromJdbc(String localDbName, String localTableName field.isAllowNull(), field.getRemarks(), true, -1)); } - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); - return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema); - } - - public String getRemoteDatabaseName(String localDbname) { - return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname); - } - - public String getRemoteTableName(String localDbName, String localTableName) { - return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName); - } - - public Map getRemoteColumnNames(String localDbName, String localTableName) { - return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName); + return dorisTableSchema; } // protected methods,for subclass to override @@ -437,7 +411,7 @@ protected List filterDatabaseNames(List remoteDbNames) { } filteredDatabaseNames.add(databaseName); } - return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames); + return filteredDatabaseNames; } protected Set getFilterInternalDatabases() { @@ -448,14 +422,6 @@ protected Set getFilterInternalDatabases() { .build(); } - protected List filterTableNames(String remoteDbName, List remoteTableNames) { - return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, remoteTableNames); - } - - protected List filterColumnName(String remoteDbName, String remoteTableName, List remoteColumns) { - return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns); - } - protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema); protected Type createDecimalOrStringType(int precision, int scale) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java index 5624392de14c39..3baa2ce9d911df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java @@ -129,12 +129,10 @@ protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String c * get all columns of one table */ @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index d37b36cbf3de15..dc367e8ea6e6b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -49,12 +49,10 @@ public String getTestQuery() { } @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java new file mode 100644 index 00000000000000..4847cd86e6d79c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mapping; + +import org.apache.doris.catalog.Column; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultIdentifierMapping implements IdentifierMapping { + private static final Logger LOG = LogManager.getLogger(DefaultIdentifierMapping.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> localTableToRemoteTable + = new ConcurrentHashMap<>(); + private final ConcurrentHashMap>> + localColumnToRemoteColumn = new ConcurrentHashMap<>(); + + private final boolean isLowerCaseMetaNames; + private final String metaNamesMapping; + + public DefaultIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { + this.isLowerCaseMetaNames = isLowerCaseMetaNames; + this.metaNamesMapping = metaNamesMapping; + } + + private boolean isMappingInvalid() { + return metaNamesMapping == null || metaNamesMapping.isEmpty(); + } + + @Override + public List fromRemoteDatabaseName(List remoteDatabaseNames) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteDatabaseNames; + } + JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); + + Map databaseNameMapping = Maps.newTreeMap(); + if (databasesNode.isArray()) { + for (JsonNode node : databasesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String mapping = node.path("mapping").asText(); + databaseNameMapping.put(remoteDatabase, mapping); + } + } + + Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, + databaseNameMapping, isLowerCaseMetaNames); + List localDatabaseNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the names."); + } + return localDatabaseNames; + } + + @Override + public List fromRemoteTableName(String remoteDbName, List remoteTableNames) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteTableNames; + } + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); + + Map tableNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + if (remoteDbName.equals(remoteDatabase)) { + String remoteTable = node.path("remoteTable").asText(); + String mapping = node.path("mapping").asText(); + tableNameMapping.put(remoteTable, mapping); + } + } + } + + localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); + + Map> result = nameListToMapping(remoteTableNames, + localTableToRemoteTable.get(remoteDbName), + tableNameMapping, isLowerCaseMetaNames); + List localTableNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict table names found in remote database/schema: " + remoteDbName + + " when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the table names."); + } + return localTableNames; + } + + @Override + public List fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, + List remoteColumns) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteColumns; + } + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); + + Map columnNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String remoteTable = node.path("remoteTable").asText(); + if (remoteDatabaseName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { + String remoteColumn = node.path("remoteColumn").asText(); + String mapping = node.path("mapping").asText(); + columnNameMapping.put(remoteColumn, mapping); + } + } + } + localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>()); + localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); + + List remoteColumnNames = Lists.newArrayList(); + for (Column remoteColumn : remoteColumns) { + remoteColumnNames.add(remoteColumn.getName()); + } + + Map> result = nameListToMapping(remoteColumnNames, + localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName), + columnNameMapping, isLowerCaseMetaNames); + List localColumnNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict column names found in remote database/schema: " + remoteDatabaseName + + " in remote table: " + remoteTableName + + " when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the column names."); + } + for (int i = 0; i < remoteColumns.size(); i++) { + remoteColumns.get(i).setName(localColumnNames.get(i)); + } + return remoteColumns; + } + + @Override + public String toRemoteDatabaseName(String localDatabaseName) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localDatabaseName; + } + return getRequiredMapping(localDBToRemoteDB, localDatabaseName, "database", localDatabaseName); + } + + @Override + public String toRemoteTableName(String remoteDatabaseName, String localTableName) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localTableName; + } + Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDatabaseName, + k -> new ConcurrentHashMap<>()); + return getRequiredMapping(tableMap, localTableName, "table", localTableName); + } + + @Override + public Map toRemoteColumnNames(String remoteDatabaseName, String remoteTableName) { + // If mapping is not required, return an empty map (since there's no mapping) + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return Collections.emptyMap(); + } + ConcurrentHashMap> tableColumnMap + = localColumnToRemoteColumn.computeIfAbsent(remoteDatabaseName, k -> new ConcurrentHashMap<>()); + Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); + if (columnMap.isEmpty()) { + LOG.warn("No remote column found for: {}. Please refresh this catalog.", remoteTableName); + throw new RuntimeException( + "No remote column found for: " + remoteTableName + ". Please refresh this catalog."); + } + return columnMap; + } + + private V getRequiredMapping(Map map, K key, String typeName, String entityName) { + V value = map.get(key); + if (value == null) { + LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); + throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName + + ". Please refresh this catalog."); + } + return value; + } + + private JsonNode readAndParseJson(String jsonPath, String nodeName) { + JsonNode rootNode; + try { + rootNode = mapper.readTree(jsonPath); + return rootNode.path(nodeName); + } catch (JsonProcessingException e) { + throw new RuntimeException("parse meta_names_mapping property error", e); + } + } + + private Map> nameListToMapping(List remoteNames, + ConcurrentHashMap localNameToRemoteName, + Map nameMapping, boolean isLowerCaseMetaNames) { + List filteredDatabaseNames = Lists.newArrayList(); + Set lowerCaseNames = Sets.newHashSet(); + Map> nameMap = Maps.newHashMap(); + List conflictNames = Lists.newArrayList(); + + for (String name : remoteNames) { + String mappedName = nameMapping.getOrDefault(name, name); + String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; + + localNameToRemoteName.computeIfAbsent(localName, k -> name); + + if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { + if (nameMap.containsKey(localName)) { + nameMap.get(localName).add(mappedName); + } + } else { + nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); + } + + filteredDatabaseNames.add(localName); + } + + for (List conflictNameList : nameMap.values()) { + if (conflictNameList.size() > 1) { + conflictNames.addAll(conflictNameList); + } + } + + Map> result = Maps.newConcurrentMap(); + result.put("localNames", filteredDatabaseNames); + result.put("conflictNames", conflictNames); + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java index 363ef351152a39..7745a25d27da47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java @@ -18,313 +18,20 @@ package org.apache.doris.datasource.mapping; import org.apache.doris.catalog.Column; -import org.apache.doris.qe.GlobalVariable; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class IdentifierMapping { - private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class); - - private final ObjectMapper mapper = new ObjectMapper(); - private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> localTableToRemoteTable - = new ConcurrentHashMap<>(); - private final ConcurrentHashMap>> - localColumnToRemoteColumn = new ConcurrentHashMap<>(); - - private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false); - private final ConcurrentHashMap tableNamesLoadedMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> columnNamesLoadedMap - = new ConcurrentHashMap<>(); - - private final boolean isLowerCaseMetaNames; - private final String metaNamesMapping; - - public IdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { - this.isLowerCaseMetaNames = isLowerCaseMetaNames; - this.metaNamesMapping = metaNamesMapping; - } - - public List setDatabaseNameMapping(List remoteDatabaseNames) { - JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); - - Map databaseNameMapping = Maps.newTreeMap(); - if (databasesNode.isArray()) { - for (JsonNode node : databasesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String mapping = node.path("mapping").asText(); - databaseNameMapping.put(remoteDatabase, mapping); - } - } - - Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, - databaseNameMapping, isLowerCaseMetaNames); - List localDatabaseNames = result.get("localNames"); - List conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the names."); - } - return localDatabaseNames; - } - - public List setTableNameMapping(String remoteDbName, List remoteTableNames) { - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); - - Map tableNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - if (remoteDbName.equals(remoteDatabase)) { - String remoteTable = node.path("remoteTable").asText(); - String mapping = node.path("mapping").asText(); - tableNameMapping.put(remoteTable, mapping); - } - } - } - - localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - - List localTableNames; - List conflictNames; - - if (GlobalVariable.lowerCaseTableNames == 1) { - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, true); - localTableNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + " when lower_case_table_names is 1: " + conflictNames - + ". Please use meta_name_mapping to specify the names."); - } - } else { - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, isLowerCaseMetaNames); - localTableNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + "when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the table names."); - } - } - return localTableNames; - } - - public List setColumnNameMapping(String remoteDbName, String remoteTableName, List remoteColumns) { - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); - - Map columnNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String remoteTable = node.path("remoteTable").asText(); - if (remoteDbName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { - String remoteColumn = node.path("remoteColumn").asText(); - String mapping = node.path("mapping").asText(); - columnNameMapping.put(remoteColumn, mapping); - } - } - } - localColumnToRemoteColumn.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); - - List localColumnNames; - List conflictNames; - - // Get the name from localColumns and save it to List - List remoteColumnNames = Lists.newArrayList(); - for (Column remoteColumn : remoteColumns) { - remoteColumnNames.add(remoteColumn.getName()); - } - - Map> result = nameListToMapping(remoteColumnNames, - localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName), - columnNameMapping, isLowerCaseMetaNames); - localColumnNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict column names found in remote database/schema: " + remoteDbName - + " in remote table: " + remoteTableName - + " when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the column names."); - } - // Replace the name in remoteColumns with localColumnNames - for (int i = 0; i < remoteColumns.size(); i++) { - remoteColumns.get(i).setName(localColumnNames.get(i)); - } - return remoteColumns; - } - - public String getRemoteDatabaseName(String localDbName) { - return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded, - localDbName); - } - - public String getRemoteTableName(String localDbName, String localTableName) { - String remoteDbName = getRemoteDatabaseName(localDbName); - Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName, - k -> new ConcurrentHashMap<>()); - return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName), - localTableName); - } - - public Map getRemoteColumnNames(String localDbName, String localTableName) { - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); - ConcurrentHashMap> tableColumnMap - = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>()); - Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); - if (columnMap.isEmpty()) { - LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}", - localDbName, localTableName); - loadColumnNamesIfNeeded(localDbName, localTableName); - columnMap = tableColumnMap.get(remoteTableName); - } - if (columnMap.isEmpty()) { - LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName); - throw new RuntimeException( - "No remote column found for localTableName: " + localTableName + ". Please refresh this catalog."); - } - return columnMap; - } - - - private void loadDatabaseNamesIfNeeded() { - if (dbNamesLoaded.compareAndSet(false, true)) { - try { - loadDatabaseNames(); - } catch (Exception e) { - dbNamesLoaded.set(false); // Reset on failure - LOG.warn("Error loading database names", e); - } - } - } - - private void loadTableNamesIfNeeded(String localDbName) { - AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false)); - if (isLoaded.compareAndSet(false, true)) { - try { - loadTableNames(localDbName); - } catch (Exception e) { - tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure - LOG.warn("Error loading table names for localDbName: {}", localDbName, e); - } - } - } - - private void loadColumnNamesIfNeeded(String localDbName, String localTableName) { - columnNamesLoadedMap.putIfAbsent(localDbName, new ConcurrentHashMap<>()); - AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName) - .computeIfAbsent(localTableName, k -> new AtomicBoolean(false)); - if (isLoaded.compareAndSet(false, true)) { - try { - loadColumnNames(localDbName, localTableName); - } catch (Exception e) { - columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure - LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName, - localTableName, e); - } - } - } - - private V getRequiredMapping(Map map, K key, String typeName, Runnable loadIfNeeded, - String entityName) { - if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) { - LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName); - loadIfNeeded.run(); - } - V value = map.get(key); - if (value == null) { - LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); - throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName - + ". Please refresh this catalog."); - } - return value; - } - - // Load the database name from the data source. - // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping. - protected abstract void loadDatabaseNames(); - - // Load the table names for the specified database from the data source. - // In the corresponding getTableNameList(), setTableNameMapping() must be used to update the mapping. - protected abstract void loadTableNames(String localDbName); - - // Load the column names for a specified table in a database from the data source. - // In the corresponding getColumnNameList(), setColumnNameMapping() must be used to update the mapping. - protected abstract void loadColumnNames(String localDbName, String localTableName); - - private JsonNode readAndParseJson(String jsonPath, String nodeName) { - JsonNode rootNode; - try { - rootNode = mapper.readTree(jsonPath); - return rootNode.path(nodeName); - } catch (JsonProcessingException e) { - throw new RuntimeException("parse meta_names_mapping property error", e); - } - } - - private Map> nameListToMapping(List remoteNames, - ConcurrentHashMap localNameToRemoteName, - Map nameMapping, boolean isLowerCaseMetaNames) { - List filteredDatabaseNames = Lists.newArrayList(); - Set lowerCaseNames = Sets.newHashSet(); - Map> nameMap = Maps.newHashMap(); - List conflictNames = Lists.newArrayList(); - for (String name : remoteNames) { - String mappedName = nameMapping.getOrDefault(name, name); - String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; +public interface IdentifierMapping { + List fromRemoteDatabaseName(List remoteDatabaseNames); - // Use computeIfAbsent to ensure atomicity - localNameToRemoteName.computeIfAbsent(localName, k -> name); + List fromRemoteTableName(String remoteDatabaseName, List remoteTableNames); - if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { - if (nameMap.containsKey(localName)) { - nameMap.get(localName).add(mappedName); - } - } else { - nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); - } + List fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, List remoteColumns); - filteredDatabaseNames.add(localName); - } + String toRemoteDatabaseName(String localDatabaseName); - for (List conflictNameList : nameMap.values()) { - if (conflictNameList.size() > 1) { - conflictNames.addAll(conflictNameList); - } - } + String toRemoteTableName(String remoteDatabaseName, String localTableName); - Map> result = Maps.newConcurrentMap(); - result.put("localNames", filteredDatabaseNames); - result.put("conflictNames", conflictNames); - return result; - } + Map toRemoteColumnNames(String remoteDatabaseName, String remoteTableName); }