Skip to content

Commit

Permalink
[fix](multi-catalog) fix persist issue about jdbc catalog and class l…
Browse files Browse the repository at this point in the history
…oader issue (#14794)

1.
Fix a bug that JDBC catalog/database/table should be add to GsonUtil

2.
Fix a class loader issue that sometime it will cause ClassNotFoundException

3.
Fix regression test to use different catalog name.

4.
Comment out 2 regression tests:
- regression-test/suites/query_p0/system/test_query_sys.groovy
- regression-test/suites/statistics/alter_col_stats.groovy
Need to be fixed later
  • Loading branch information
morningman committed Dec 4, 2022
1 parent b2541da commit c2f5d67
Show file tree
Hide file tree
Showing 17 changed files with 141 additions and 89 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,11 @@ under the License.
<version>3.10.1</version>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>

<!-- for aliyun dlf -->
<dependency>
<groupId>com.aliyun.datalake</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,43 +179,52 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFil
}
}
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME)
|| !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) {
throw new UserException(String.format(
"The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME, HADOOP_USER_NAME));
}
String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME);
String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
}
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME)
|| !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) {
throw new UserException(String.format(
"The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME,
HADOOP_USER_NAME));
}
FileSystem fs = FileSystem.get(new URI(fsName), conf, userName);
FileStatus[] statusList = fs.globStatus(new Path(path));
if (statusList == null) {
throw new UserException("failed to get files from path: " + path);
String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME);
String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
}
for (FileStatus status : statusList) {
if (status.isFile()) {
fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
status.isDirectory(), status.getLen(), status.isFile()));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
}
FileSystem fs = FileSystem.get(new URI(fsName), conf, userName);
FileStatus[] statusList = fs.globStatus(new Path(path));
if (statusList == null) {
throw new UserException("failed to get files from path: " + path);
}
for (FileStatus status : statusList) {
if (status.isFile()) {
fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
status.isDirectory(), status.getLen(), status.isFile()));
}
}
} catch (IOException | InterruptedException | URISyntaxException e) {
LOG.warn("hdfs check error: ", e);
throw new UserException(e.getMessage());
}
} catch (IOException | InterruptedException | URISyntaxException e) {
LOG.warn("hdfs check error: ", e);
throw new UserException(e.getMessage());
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}
Expand Down Expand Up @@ -686,3 +695,4 @@ public void close() {

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class JdbcExternalCatalog extends ExternalCatalog {
public static final String PROP_DRIVER_URL = "jdbc.driver_url";
public static final String PROP_DRIVER_CLASS = "jdbc.driver_class";

private JdbcClient jdbcClient;
// Must add "transient" for Gson to ignore this field,
// or Gson will throw exception with HikariCP
private transient JdbcClient jdbcClient;
private String databaseTypeName;
private String jdbcUser;
private String jdbcPasswd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,19 @@ public void close() throws Exception {
}

private CachedClient getClient() throws MetaException {
synchronized (clientPool) {
CachedClient client = clientPool.poll();
if (client == null) {
return new CachedClient(hiveConf);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
synchronized (clientPool) {
CachedClient client = clientPool.poll();
if (client == null) {
return new CachedClient(hiveConf);
}
return client;
}
return client;
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -215,28 +215,34 @@ private HivePartition loadPartitions(PartitionCacheKey key) {
}

private ImmutableList<InputSplit> loadFiles(FileCacheKey key) {
String finalLocation = convertToS3IfNecessary(key.location);
Configuration conf = getConfiguration();
JobConf jobConf = new JobConf(conf);
// For Tez engine, it may generate subdirectories for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
// /user/hive/warehouse/region_tmp_union_all2/000000_0
// /user/hive/warehouse/region_tmp_union_all2/1
// /user/hive/warehouse/region_tmp_union_all2/2
// So we need to set this config to support visit dir recursively.
// Otherwise, getSplits() may throw exception: "Not a file xxx"
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
FileInputFormat.setInputPaths(jobConf, finalLocation);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(conf, key.inputFormat, false);
InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName());
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
String finalLocation = convertToS3IfNecessary(key.location);
Configuration conf = getConfiguration();
JobConf jobConf = new JobConf(conf);
// For Tez engine, it may generate subdirectories for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
// /us£er/hive/warehouse/region_tmp_union_all2/000000_0
// /user/hive/warehouse/region_tmp_union_all2/1
// /user/hive/warehouse/region_tmp_union_all2/2
// So we need to set this config to support visit dir recursively.
// Otherwise, getSplits() may throw exception: "Not a file xxx"
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
FileInputFormat.setInputPaths(jobConf, finalLocation);
try {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(conf, key.inputFormat, false);
InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName());
}
return ImmutableList.copyOf(splits);
} catch (Exception e) {
throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName());
}
return ImmutableList.copyOf(splits);
} catch (Exception e) {
throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName());
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}

Expand Down Expand Up @@ -499,3 +505,4 @@ public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem,
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.JdbcExternalCatalog;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
import org.apache.doris.load.sync.SyncJob;
Expand Down Expand Up @@ -157,19 +160,22 @@ public class GsonUtils {
CatalogIf.class, "clazz")
.registerSubtype(InternalCatalog.class, InternalCatalog.class.getSimpleName())
.registerSubtype(HMSExternalCatalog.class, HMSExternalCatalog.class.getSimpleName())
.registerSubtype(EsExternalCatalog.class, EsExternalCatalog.class.getSimpleName());
.registerSubtype(EsExternalCatalog.class, EsExternalCatalog.class.getSimpleName())
.registerSubtype(JdbcExternalCatalog.class, JdbcExternalCatalog.class.getSimpleName());

private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")
.registerSubtype(ExternalDatabase.class, ExternalDatabase.class.getSimpleName())
.registerSubtype(EsExternalDatabase.class, EsExternalDatabase.class.getSimpleName())
.registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName());
.registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName())
.registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName());

private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
TableIf.class, "clazz")
.registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName())
.registerSubtype(EsExternalTable.class, EsExternalTable.class.getSimpleName())
.registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName());
.registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
.registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName());

// the builder of GSON instance.
// Add any other adapters if necessary.
Expand Down Expand Up @@ -452,3 +458,4 @@ public T read(JsonReader reader) throws IOException {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
Expand Down Expand Up @@ -150,6 +151,9 @@ private void handleInitDb() {
} catch (DdlException e) {
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
return;
} catch (Throwable t) {
ctx.getState().setError(ErrorCode.ERR_INTERNAL_ERROR, Util.getRootCauseMessage(t));
return;
}

ctx.getState().setOk();
Expand Down Expand Up @@ -677,3 +681,4 @@ public void loop() {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlEofPacket;
Expand Down Expand Up @@ -587,7 +588,7 @@ public void execute(TUniqueId queryId) throws Exception {
} catch (Exception e) {
LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getClass().getSimpleName() + ", msg: " + e.getMessage());
e.getClass().getSimpleName() + ", msg: " + Util.getRootCauseMessage(e));
if (parsedStmt instanceof KillStmt) {
// ignore kill stmt execute err(not monitor it)
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
Expand Down Expand Up @@ -1867,3 +1868,4 @@ private List<ResultRow> convertResultBatchToResultRows(TResultBatch batch) {

}


5 changes: 5 additions & 0 deletions fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,11 @@ under the License.
<artifactId>metastore-client-hive2</artifactId>
<version>${dlf-metastore-client-hive2.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<reporting>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,18 @@ suite("test_hive_orc", "all_types") {
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String hms_port = context.config.otherConfigs.get("hms_port")
String catalog_name = "hive_test_orc"
sql """admin set frontend config ("enable_multi_catalog" = "true")"""
sql """admin set frontend config ("enable_new_load_scan_node" = "true");"""
set_be_config.call('true')
sql """drop catalog if exists hive"""
sql """drop catalog if exists ${catalog_name}"""
sql """
create catalog if not exists hive properties (
create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}'
);
"""
sql """use `hive`.`default`"""
sql """use `${catalog_name}`.`default`"""

select_top50()
count_all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,24 @@ suite("test_hive_other", "p0") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String hms_port = context.config.otherConfigs.get("hms_port")
String catalog_name = "hive_test_other"
set_be_config.call()

sql """admin set frontend config ("enable_multi_catalog" = "true")"""
sql """drop catalog if exists hive"""
sql """drop catalog if exists ${catalog_name}"""
sql """
create catalog hive properties (
create catalog ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}'
);
"""
sql """switch hive"""
sql """switch ${catalog_name}"""
sql """use `default`"""
// order_qt_show_tables """show tables"""

q01()

sql """refresh catalog hive"""
sql """refresh catalog ${catalog_name}"""
q01()
sql """refresh database `default`"""
// order_qt_show_tables2 """show tables"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,18 @@ suite("test_hive_parquet", "p0") {
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String hms_port = context.config.otherConfigs.get("hms_port")
String catalog_name = "hive_test_parquet"
sql """admin set frontend config ("enable_multi_catalog" = "true")"""
sql """admin set frontend config ("enable_new_load_scan_node" = "true");"""
set_be_config.call('true')
sql """drop catalog if exists hive"""
sql """drop catalog if exists ${catalog_name}"""
sql """
create catalog if not exists hive properties (
create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}'
);
"""
sql """use `hive`.`default`"""
sql """use `${catalog_name}`.`default`"""

q01()
q02()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,5 @@ suite("test_mysql_jdbc_catalog", "p0") {
order_qt_ex_tb17 """ select * from ${ex_tb17} order by id; """
order_qt_ex_tb18 """ select * from ${ex_tb18} order by num_tinyint; """
order_qt_ex_tb19 """ select * from ${ex_tb19} order by date_value; """


sql """admin set frontend config ("enable_multi_catalog" = "false")"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suite("test_query_sys", "query,p0") {
sql "select random(20);"
sql "SELECT CONNECTION_ID();"
sql "SELECT CURRENT_USER();"
sql "select now();"
// sql "select now();"
sql "select localtime();"
sql "select localtimestamp();"
sql "select pi();"
Expand Down
Loading

0 comments on commit c2f5d67

Please sign in to comment.