diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 25329a1a829a9da..d6a6cc3c4a52d17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -48,6 +48,7 @@ import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase; import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase; +import org.apache.doris.datasource.mapping.IdentifierMapping; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; import org.apache.doris.datasource.metacache.MetaCache; import org.apache.doris.datasource.operations.ExternalMetadataOps; @@ -149,6 +150,9 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; + protected IdentifierMapping identifierMapping; + private boolean mappingsInitialized = false; + public ExternalCatalog() { } @@ -181,6 +185,10 @@ protected List listDatabaseNames() { } } + // only for forward to master + protected void buildDatabaseMapping() { + } + // Will be called when creating catalog(so when as replaying) // to add some default properties if missing. public void setDefaultPropsIfMissing(boolean isReplay) { @@ -209,6 +217,10 @@ public void checkWhenCreating() throws DdlException { */ public abstract List listTableNames(SessionContext ctx, String dbName); + // only for forward to master + protected void buildTableMapping(SessionContext ctx, String dbName) { + } + /** * check if the specified table exist. * @@ -273,6 +285,10 @@ public final synchronized void makeSureInitialized() { } initialized = true; } + if (!mappingsInitialized) { + buildDatabaseMapping(); + mappingsInitialized = true; + } } protected final void initLocalObjects() { @@ -398,6 +414,7 @@ private List getFilteredDatabaseNames() { public void onRefresh(boolean invalidCache) { this.objectCreated = false; this.initialized = false; + this.mappingsInitialized = false; synchronized (this.propLock) { this.convertedProperties = null; } @@ -756,6 +773,7 @@ public void gsonPostProcess() throws IOException { } this.propLock = new byte[0]; this.initialized = false; + this.mappingsInitialized = false; setDefaultPropsIfMissing(true); if (tableAutoAnalyzePolicy == null) { tableAutoAnalyzePolicy = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index d653a5a178e484a..cf65a5f0a48de67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -91,6 +91,8 @@ public abstract class ExternalDatabase private MetaCache metaCache; + private boolean mappingsInitialized = false; + /** * Create external database. * @@ -117,6 +119,7 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) { public void setUnInitialized(boolean invalidCache) { this.initialized = false; + this.mappingsInitialized = false; this.invalidCacheInInit = invalidCache; if (extCatalog.getUseMetaCache().isPresent()) { if (extCatalog.getUseMetaCache().get() && metaCache != null) { @@ -170,6 +173,10 @@ public final synchronized void makeSureInitialized() { } initialized = true; } + if (!mappingsInitialized) { + extCatalog.buildTableMapping(null, name); + mappingsInitialized = true; + } } public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index fb26265d19fe93c..9d837fcac6328c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.datasource.mapping.DefaultIdentifierMapping; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; @@ -119,19 +120,16 @@ public void onRefresh(boolean invalidCache) { super.onRefresh(invalidCache); if (jdbcClient != null) { jdbcClient.closeClient(); + jdbcClient = null; } } - @Override - public void onRefreshCache(boolean invalidCache) { - onRefresh(invalidCache); - } - @Override public void onClose() { super.onClose(); if (jdbcClient != null) { jdbcClient.closeClient(); + jdbcClient = null; } } @@ -232,8 +230,6 @@ protected void initLocalObjectsImpl() { .setDriverUrl(getDriverUrl()) .setDriverClass(getDriverClass()) .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase()) - .setIsLowerCaseMetaNames(getLowerCaseMetaNames()) - .setMetaNamesMapping(getMetaNamesMapping()) .setIncludeDatabaseMap(getIncludeDatabaseMap()) .setExcludeDatabaseMap(getExcludeDatabaseMap()) .setConnectionPoolMinSize(getConnectionPoolMinSize()) @@ -243,22 +239,62 @@ protected void initLocalObjectsImpl() { .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); + identifierMapping = new DefaultIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()), + getMetaNamesMapping()); } + @Override protected List listDatabaseNames() { - return jdbcClient.getDatabaseNameList(); + return identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList()); + } + + @Override + protected void buildDatabaseMapping() { + identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList()); + } + + protected String getRemoteDatabaseName(String dbName) { + return identifierMapping.toRemoteDatabaseName(dbName); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - return jdbcClient.getTablesNameList(dbName); + String remoteDbName = getRemoteDatabaseName(dbName); + return identifierMapping.fromRemoteTableName(remoteDbName, jdbcClient.getTablesNameList(remoteDbName)); + } + + @Override + protected void buildTableMapping(SessionContext ctx, String dbName) { + String remoteDbName = getRemoteDatabaseName(dbName); + identifierMapping.fromRemoteTableName(getRemoteDatabaseName(dbName), + jdbcClient.getTablesNameList(remoteDbName)); + } + + protected String getRemoteTableName(String dbName, String tblName) { + return identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName); } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - return jdbcClient.isTableExist(dbName, tblName); + String remoteDbName = getRemoteDatabaseName(dbName); + String remoteTblName = getRemoteTableName(dbName, tblName); + return jdbcClient.isTableExist(remoteDbName, remoteTblName); + } + + public List listColumns(String dbName, String tblName) { + makeSureInitialized(); + String remoteDbName = getRemoteDatabaseName(dbName); + String remoteTblName = getRemoteTableName(dbName, tblName); + return identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName, + jdbcClient.getColumnsFromJdbc(remoteDbName, + remoteTblName)); + } + + protected Map getRemoteColumnNames(String dbName, String tblName) { + return identifierMapping.toRemoteColumnNames(getRemoteDatabaseName(dbName), + getRemoteTableName(dbName, tblName)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 495311bc087d5bd..78805612e92d878 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -32,6 +32,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; +import com.google.common.collect.Maps; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,21 +87,29 @@ public TTableDescriptor toThrift() { @Override public Optional initSchema() { - return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient() - .getColumnsFromJdbc(dbName, name))); + return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).listColumns(dbName, name))); } private JdbcTable toJdbcTable() { List schema = getFullSchema(); JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog; - String fullDbName = this.dbName + "." + this.name; - JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE); - jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName); + String fullTableName = this.dbName + "." + this.name; + JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE); + jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName); // Set remote properties - jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName)); - jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name)); - jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name)); + jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName)); + jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, this.name)); + Map remoteColumnNames = jdbcCatalog.getRemoteColumnNames(this.dbName, this.name); + if (!remoteColumnNames.isEmpty()) { + jdbcTable.setRemoteColumnNames(remoteColumnNames); + } else { + remoteColumnNames = Maps.newHashMap(); + for (Column column : schema) { + remoteColumnNames.put(column.getName(), column.getName()); + } + jdbcTable.setRemoteColumnNames(remoteColumnNames); + } return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java deleted file mode 100644 index 20a74724b3e4965..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.jdbc; - -import org.apache.doris.datasource.jdbc.client.JdbcClient; -import org.apache.doris.datasource.mapping.IdentifierMapping; - -public class JdbcIdentifierMapping extends IdentifierMapping { - private final JdbcClient jdbcClient; - - public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping, JdbcClient jdbcClient) { - super(isLowerCaseMetaNames, metaNamesMapping); - this.jdbcClient = jdbcClient; - } - - @Override - protected void loadDatabaseNames() { - jdbcClient.getDatabaseNameList(); - } - - @Override - protected void loadTableNames(String localDbName) { - jdbcClient.getTablesNameList(localDbName); - } - - @Override - protected void loadColumnNames(String localDbName, String localTableName) { - jdbcClient.getColumnsFromJdbc(localDbName, localTableName); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 0832aa68f003720..b9adebc65d898fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -24,7 +24,6 @@ import org.apache.doris.cloud.security.SecurityChecker; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping; import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema; import com.google.common.collect.ImmutableSet; @@ -63,11 +62,8 @@ public abstract class JdbcClient { protected ClassLoader classLoader = null; protected HikariDataSource dataSource = null; protected boolean isOnlySpecifiedDatabase; - protected boolean isLowerCaseMetaNames; - protected String metaNamesMapping; protected Map includeDatabaseMap; protected Map excludeDatabaseMap; - protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching; public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) { String dbType = parseDbType(jdbcClientConfig.getJdbcUrl()); @@ -104,8 +100,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.catalogName = jdbcClientConfig.getCatalog(); this.jdbcUser = jdbcClientConfig.getUser(); this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase()); - this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames()); - this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping(); this.includeDatabaseMap = Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap()); this.excludeDatabaseMap = @@ -114,7 +108,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) { this.dbType = parseDbType(jdbcUrl); initializeClassLoader(jdbcClientConfig); initializeDataSource(jdbcClientConfig); - this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this); } // Initialize DataSource @@ -297,10 +290,9 @@ public List getDatabaseNameList() { /** * get all tables of one database */ - public List getTablesNameList(String localDbName) { + public List getTablesNameList(String remoteDbName) { List remoteTablesNames = Lists.newArrayList(); String[] tableTypes = getTableTypes(); - String remoteDbName = getRemoteDatabaseName(localDbName); processTable(remoteDbName, null, tableTypes, (rs) -> { try { while (rs.next()) { @@ -310,14 +302,12 @@ public List getTablesNameList(String localDbName) { throw new JdbcClientException("failed to get all tables for remote database: `%s`", e, remoteDbName); } }); - return filterTableNames(remoteDbName, remoteTablesNames); + return remoteTablesNames; } - public boolean isTableExist(String localDbName, String localTableName) { + public boolean isTableExist(String remoteDbName, String remoteTableName) { final boolean[] isExist = {false}; String[] tableTypes = getTableTypes(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> { try { if (rs.next()) { @@ -334,12 +324,10 @@ public boolean isTableExist(String localDbName, String localTableName) { /** * get all columns of one table */ - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); @@ -365,21 +353,7 @@ public List getColumnsFromJdbc(String localDbName, String localTableName field.isAllowNull(), field.getRemarks(), true, -1)); } - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); - return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema); - } - - public String getRemoteDatabaseName(String localDbname) { - return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname); - } - - public String getRemoteTableName(String localDbName, String localTableName) { - return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName); - } - - public Map getRemoteColumnNames(String localDbName, String localTableName) { - return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName); + return dorisTableSchema; } // protected methods,for subclass to override @@ -437,7 +411,7 @@ protected List filterDatabaseNames(List remoteDbNames) { } filteredDatabaseNames.add(databaseName); } - return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames); + return filteredDatabaseNames; } protected Set getFilterInternalDatabases() { @@ -448,14 +422,6 @@ protected Set getFilterInternalDatabases() { .build(); } - protected List filterTableNames(String remoteDbName, List remoteTableNames) { - return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, remoteTableNames); - } - - protected List filterColumnName(String remoteDbName, String remoteTableName, List remoteColumns) { - return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns); - } - protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema); protected Type createDecimalOrStringType(int precision, int scale) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java index 5624392de14c39e..3baa2ce9d911df0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java @@ -129,12 +129,10 @@ protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String c * get all columns of one table */ @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index d37b36cbf3de155..dc367e8ea6e6b6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -49,12 +49,10 @@ public String getTestQuery() { } @Override - public List getJdbcColumnsInfo(String localDbName, String localTableName) { + public List getJdbcColumnsInfo(String remoteDbName, String remoteTableName) { Connection conn = getConnection(); ResultSet rs = null; List tableSchema = Lists.newArrayList(); - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); try { DatabaseMetaData databaseMetaData = conn.getMetaData(); String catalogName = getCatalogName(conn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java new file mode 100644 index 000000000000000..4847cd86e6d79c1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mapping; + +import org.apache.doris.catalog.Column; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultIdentifierMapping implements IdentifierMapping { + private static final Logger LOG = LogManager.getLogger(DefaultIdentifierMapping.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> localTableToRemoteTable + = new ConcurrentHashMap<>(); + private final ConcurrentHashMap>> + localColumnToRemoteColumn = new ConcurrentHashMap<>(); + + private final boolean isLowerCaseMetaNames; + private final String metaNamesMapping; + + public DefaultIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { + this.isLowerCaseMetaNames = isLowerCaseMetaNames; + this.metaNamesMapping = metaNamesMapping; + } + + private boolean isMappingInvalid() { + return metaNamesMapping == null || metaNamesMapping.isEmpty(); + } + + @Override + public List fromRemoteDatabaseName(List remoteDatabaseNames) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteDatabaseNames; + } + JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); + + Map databaseNameMapping = Maps.newTreeMap(); + if (databasesNode.isArray()) { + for (JsonNode node : databasesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String mapping = node.path("mapping").asText(); + databaseNameMapping.put(remoteDatabase, mapping); + } + } + + Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, + databaseNameMapping, isLowerCaseMetaNames); + List localDatabaseNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the names."); + } + return localDatabaseNames; + } + + @Override + public List fromRemoteTableName(String remoteDbName, List remoteTableNames) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteTableNames; + } + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); + + Map tableNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + if (remoteDbName.equals(remoteDatabase)) { + String remoteTable = node.path("remoteTable").asText(); + String mapping = node.path("mapping").asText(); + tableNameMapping.put(remoteTable, mapping); + } + } + } + + localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); + + Map> result = nameListToMapping(remoteTableNames, + localTableToRemoteTable.get(remoteDbName), + tableNameMapping, isLowerCaseMetaNames); + List localTableNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict table names found in remote database/schema: " + remoteDbName + + " when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the table names."); + } + return localTableNames; + } + + @Override + public List fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, + List remoteColumns) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return remoteColumns; + } + JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); + + Map columnNameMapping = Maps.newTreeMap(); + if (tablesNode.isArray()) { + for (JsonNode node : tablesNode) { + String remoteDatabase = node.path("remoteDatabase").asText(); + String remoteTable = node.path("remoteTable").asText(); + if (remoteDatabaseName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { + String remoteColumn = node.path("remoteColumn").asText(); + String mapping = node.path("mapping").asText(); + columnNameMapping.put(remoteColumn, mapping); + } + } + } + localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>()); + localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); + + List remoteColumnNames = Lists.newArrayList(); + for (Column remoteColumn : remoteColumns) { + remoteColumnNames.add(remoteColumn.getName()); + } + + Map> result = nameListToMapping(remoteColumnNames, + localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName), + columnNameMapping, isLowerCaseMetaNames); + List localColumnNames = result.get("localNames"); + List conflictNames = result.get("conflictNames"); + if (!conflictNames.isEmpty()) { + throw new RuntimeException( + "Conflict column names found in remote database/schema: " + remoteDatabaseName + + " in remote table: " + remoteTableName + + " when lower_case_meta_names is true: " + conflictNames + + ". Please set lower_case_meta_names to false or" + + " use meta_name_mapping to specify the column names."); + } + for (int i = 0; i < remoteColumns.size(); i++) { + remoteColumns.get(i).setName(localColumnNames.get(i)); + } + return remoteColumns; + } + + @Override + public String toRemoteDatabaseName(String localDatabaseName) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localDatabaseName; + } + return getRequiredMapping(localDBToRemoteDB, localDatabaseName, "database", localDatabaseName); + } + + @Override + public String toRemoteTableName(String remoteDatabaseName, String localTableName) { + // If mapping is not required, return the original input + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return localTableName; + } + Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDatabaseName, + k -> new ConcurrentHashMap<>()); + return getRequiredMapping(tableMap, localTableName, "table", localTableName); + } + + @Override + public Map toRemoteColumnNames(String remoteDatabaseName, String remoteTableName) { + // If mapping is not required, return an empty map (since there's no mapping) + if (!isLowerCaseMetaNames && isMappingInvalid()) { + return Collections.emptyMap(); + } + ConcurrentHashMap> tableColumnMap + = localColumnToRemoteColumn.computeIfAbsent(remoteDatabaseName, k -> new ConcurrentHashMap<>()); + Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); + if (columnMap.isEmpty()) { + LOG.warn("No remote column found for: {}. Please refresh this catalog.", remoteTableName); + throw new RuntimeException( + "No remote column found for: " + remoteTableName + ". Please refresh this catalog."); + } + return columnMap; + } + + private V getRequiredMapping(Map map, K key, String typeName, String entityName) { + V value = map.get(key); + if (value == null) { + LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); + throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName + + ". Please refresh this catalog."); + } + return value; + } + + private JsonNode readAndParseJson(String jsonPath, String nodeName) { + JsonNode rootNode; + try { + rootNode = mapper.readTree(jsonPath); + return rootNode.path(nodeName); + } catch (JsonProcessingException e) { + throw new RuntimeException("parse meta_names_mapping property error", e); + } + } + + private Map> nameListToMapping(List remoteNames, + ConcurrentHashMap localNameToRemoteName, + Map nameMapping, boolean isLowerCaseMetaNames) { + List filteredDatabaseNames = Lists.newArrayList(); + Set lowerCaseNames = Sets.newHashSet(); + Map> nameMap = Maps.newHashMap(); + List conflictNames = Lists.newArrayList(); + + for (String name : remoteNames) { + String mappedName = nameMapping.getOrDefault(name, name); + String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; + + localNameToRemoteName.computeIfAbsent(localName, k -> name); + + if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { + if (nameMap.containsKey(localName)) { + nameMap.get(localName).add(mappedName); + } + } else { + nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); + } + + filteredDatabaseNames.add(localName); + } + + for (List conflictNameList : nameMap.values()) { + if (conflictNameList.size() > 1) { + conflictNames.addAll(conflictNameList); + } + } + + Map> result = Maps.newConcurrentMap(); + result.put("localNames", filteredDatabaseNames); + result.put("conflictNames", conflictNames); + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java index 363ef351152a39d..7745a25d27da47a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java @@ -18,313 +18,20 @@ package org.apache.doris.datasource.mapping; import org.apache.doris.catalog.Column; -import org.apache.doris.qe.GlobalVariable; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class IdentifierMapping { - private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class); - - private final ObjectMapper mapper = new ObjectMapper(); - private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> localTableToRemoteTable - = new ConcurrentHashMap<>(); - private final ConcurrentHashMap>> - localColumnToRemoteColumn = new ConcurrentHashMap<>(); - - private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false); - private final ConcurrentHashMap tableNamesLoadedMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> columnNamesLoadedMap - = new ConcurrentHashMap<>(); - - private final boolean isLowerCaseMetaNames; - private final String metaNamesMapping; - - public IdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) { - this.isLowerCaseMetaNames = isLowerCaseMetaNames; - this.metaNamesMapping = metaNamesMapping; - } - - public List setDatabaseNameMapping(List remoteDatabaseNames) { - JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases"); - - Map databaseNameMapping = Maps.newTreeMap(); - if (databasesNode.isArray()) { - for (JsonNode node : databasesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String mapping = node.path("mapping").asText(); - databaseNameMapping.put(remoteDatabase, mapping); - } - } - - Map> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB, - databaseNameMapping, isLowerCaseMetaNames); - List localDatabaseNames = result.get("localNames"); - List conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the names."); - } - return localDatabaseNames; - } - - public List setTableNameMapping(String remoteDbName, List remoteTableNames) { - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables"); - - Map tableNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - if (remoteDbName.equals(remoteDatabase)) { - String remoteTable = node.path("remoteTable").asText(); - String mapping = node.path("mapping").asText(); - tableNameMapping.put(remoteTable, mapping); - } - } - } - - localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - - List localTableNames; - List conflictNames; - - if (GlobalVariable.lowerCaseTableNames == 1) { - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, true); - localTableNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + " when lower_case_table_names is 1: " + conflictNames - + ". Please use meta_name_mapping to specify the names."); - } - } else { - Map> result = nameListToMapping(remoteTableNames, - localTableToRemoteTable.get(remoteDbName), - tableNameMapping, isLowerCaseMetaNames); - localTableNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict table names found in remote database/schema: " + remoteDbName - + "when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the table names."); - } - } - return localTableNames; - } - - public List setColumnNameMapping(String remoteDbName, String remoteTableName, List remoteColumns) { - JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns"); - - Map columnNameMapping = Maps.newTreeMap(); - if (tablesNode.isArray()) { - for (JsonNode node : tablesNode) { - String remoteDatabase = node.path("remoteDatabase").asText(); - String remoteTable = node.path("remoteTable").asText(); - if (remoteDbName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) { - String remoteColumn = node.path("remoteColumn").asText(); - String mapping = node.path("mapping").asText(); - columnNameMapping.put(remoteColumn, mapping); - } - } - } - localColumnToRemoteColumn.putIfAbsent(remoteDbName, new ConcurrentHashMap<>()); - localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>()); - - List localColumnNames; - List conflictNames; - - // Get the name from localColumns and save it to List - List remoteColumnNames = Lists.newArrayList(); - for (Column remoteColumn : remoteColumns) { - remoteColumnNames.add(remoteColumn.getName()); - } - - Map> result = nameListToMapping(remoteColumnNames, - localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName), - columnNameMapping, isLowerCaseMetaNames); - localColumnNames = result.get("localNames"); - conflictNames = result.get("conflictNames"); - if (!conflictNames.isEmpty()) { - throw new RuntimeException( - "Conflict column names found in remote database/schema: " + remoteDbName - + " in remote table: " + remoteTableName - + " when lower_case_meta_names is true: " + conflictNames - + ". Please set lower_case_meta_names to false or" - + " use meta_name_mapping to specify the column names."); - } - // Replace the name in remoteColumns with localColumnNames - for (int i = 0; i < remoteColumns.size(); i++) { - remoteColumns.get(i).setName(localColumnNames.get(i)); - } - return remoteColumns; - } - - public String getRemoteDatabaseName(String localDbName) { - return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded, - localDbName); - } - - public String getRemoteTableName(String localDbName, String localTableName) { - String remoteDbName = getRemoteDatabaseName(localDbName); - Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName, - k -> new ConcurrentHashMap<>()); - return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName), - localTableName); - } - - public Map getRemoteColumnNames(String localDbName, String localTableName) { - String remoteDbName = getRemoteDatabaseName(localDbName); - String remoteTableName = getRemoteTableName(localDbName, localTableName); - ConcurrentHashMap> tableColumnMap - = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>()); - Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); - if (columnMap.isEmpty()) { - LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}", - localDbName, localTableName); - loadColumnNamesIfNeeded(localDbName, localTableName); - columnMap = tableColumnMap.get(remoteTableName); - } - if (columnMap.isEmpty()) { - LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName); - throw new RuntimeException( - "No remote column found for localTableName: " + localTableName + ". Please refresh this catalog."); - } - return columnMap; - } - - - private void loadDatabaseNamesIfNeeded() { - if (dbNamesLoaded.compareAndSet(false, true)) { - try { - loadDatabaseNames(); - } catch (Exception e) { - dbNamesLoaded.set(false); // Reset on failure - LOG.warn("Error loading database names", e); - } - } - } - - private void loadTableNamesIfNeeded(String localDbName) { - AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false)); - if (isLoaded.compareAndSet(false, true)) { - try { - loadTableNames(localDbName); - } catch (Exception e) { - tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure - LOG.warn("Error loading table names for localDbName: {}", localDbName, e); - } - } - } - - private void loadColumnNamesIfNeeded(String localDbName, String localTableName) { - columnNamesLoadedMap.putIfAbsent(localDbName, new ConcurrentHashMap<>()); - AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName) - .computeIfAbsent(localTableName, k -> new AtomicBoolean(false)); - if (isLoaded.compareAndSet(false, true)) { - try { - loadColumnNames(localDbName, localTableName); - } catch (Exception e) { - columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure - LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName, - localTableName, e); - } - } - } - - private V getRequiredMapping(Map map, K key, String typeName, Runnable loadIfNeeded, - String entityName) { - if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) { - LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName); - loadIfNeeded.run(); - } - V value = map.get(key); - if (value == null) { - LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); - throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName - + ". Please refresh this catalog."); - } - return value; - } - - // Load the database name from the data source. - // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping. - protected abstract void loadDatabaseNames(); - - // Load the table names for the specified database from the data source. - // In the corresponding getTableNameList(), setTableNameMapping() must be used to update the mapping. - protected abstract void loadTableNames(String localDbName); - - // Load the column names for a specified table in a database from the data source. - // In the corresponding getColumnNameList(), setColumnNameMapping() must be used to update the mapping. - protected abstract void loadColumnNames(String localDbName, String localTableName); - - private JsonNode readAndParseJson(String jsonPath, String nodeName) { - JsonNode rootNode; - try { - rootNode = mapper.readTree(jsonPath); - return rootNode.path(nodeName); - } catch (JsonProcessingException e) { - throw new RuntimeException("parse meta_names_mapping property error", e); - } - } - - private Map> nameListToMapping(List remoteNames, - ConcurrentHashMap localNameToRemoteName, - Map nameMapping, boolean isLowerCaseMetaNames) { - List filteredDatabaseNames = Lists.newArrayList(); - Set lowerCaseNames = Sets.newHashSet(); - Map> nameMap = Maps.newHashMap(); - List conflictNames = Lists.newArrayList(); - for (String name : remoteNames) { - String mappedName = nameMapping.getOrDefault(name, name); - String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName; +public interface IdentifierMapping { + List fromRemoteDatabaseName(List remoteDatabaseNames); - // Use computeIfAbsent to ensure atomicity - localNameToRemoteName.computeIfAbsent(localName, k -> name); + List fromRemoteTableName(String remoteDatabaseName, List remoteTableNames); - if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) { - if (nameMap.containsKey(localName)) { - nameMap.get(localName).add(mappedName); - } - } else { - nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName))); - } + List fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, List remoteColumns); - filteredDatabaseNames.add(localName); - } + String toRemoteDatabaseName(String localDatabaseName); - for (List conflictNameList : nameMap.values()) { - if (conflictNameList.size() > 1) { - conflictNames.addAll(conflictNameList); - } - } + String toRemoteTableName(String remoteDatabaseName, String localTableName); - Map> result = Maps.newConcurrentMap(); - result.put("localNames", filteredDatabaseNames); - result.put("conflictNames", conflictNames); - return result; - } + Map toRemoteColumnNames(String remoteDatabaseName, String remoteTableName); }