Skip to content

Commit

Permalink
[Fix-9352][datasource-api]data source client uses a cache with an exp…
Browse files Browse the repository at this point in the history
…iration time (#9353)

* datasource client cache with expiration time

* remove unused import

* fixed unit test

* fix

* fix

* fix

* fix

* fix

* fix

* fix
  • Loading branch information
zhuxt2015 authored Apr 27, 2022
1 parent 70dae69 commit 206b7c1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,33 @@

package org.apache.dolphinscheduler.plugin.datasource.api.plugin;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSourceClientProvider {
private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);

private static final Map<String, DataSourceClient> uniqueId2dataSourceClientMap = new ConcurrentHashMap<>();

private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder()
.expireAfterWrite(duration, TimeUnit.HOURS)
.maximumSize(100)
.build();
private DataSourcePluginManager dataSourcePluginManager;

private DataSourceClientProvider() {
Expand All @@ -50,12 +58,12 @@ public static DataSourceClientProvider getInstance() {
return DataSourceClientProviderHolder.INSTANCE;
}

public Connection getConnection(DbType dbType, ConnectionParam connectionParam) {
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);

DataSourceClient dataSourceClient = uniqueId2dataSourceClientMap.computeIfAbsent(datasourceUniqueId, $ -> {
DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());
if (null == dataSourceChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,31 @@
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, MySQLDataSourceProcessor.class, DataSourceClientProvider.class, PasswordUtils.class, CommonUtils.class})
@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
@PrepareForTest({Class.class, DriverManager.class, MySQLDataSourceProcessor.class, DataSourceClientProvider.class, PasswordUtils.class, CommonUtils.class, PropertyUtils.class})
public class DataSourceUtilsTest {

@Test
Expand Down Expand Up @@ -85,8 +91,9 @@ public void testBuildConnectionParams2() {
}

@Test
public void testGetConnection() {

public void testGetConnection() throws ExecutionException {
PowerMockito.mockStatic(PropertyUtils.class);
PowerMockito.when(PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24L)).thenReturn(24L);
PowerMockito.mockStatic(DataSourceClientProvider.class);
DataSourceClientProvider clientProvider = PowerMockito.mock(DataSourceClientProvider.class);
PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.dolphinscheduler.plugin.datasource.hive;

import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;

import com.zaxxer.hikari.HikariDataSource;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.plugin.datasource.utils.CommonUtil;
Expand All @@ -29,9 +26,11 @@
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.security.krb5.Config;

import java.io.IOException;
import java.lang.reflect.Field;
Expand All @@ -41,12 +40,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zaxxer.hikari.HikariDataSource;

import sun.security.krb5.Config;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;

public class HiveDataSourceClient extends CommonDataSourceClient {

Expand All @@ -57,6 +51,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
private Configuration hadoopConf;
protected HikariDataSource oneSessionDataSource;
private UserGroupInformation ugi;
private boolean retryGetConnection = true;

public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
super(baseConnectionParam, dbType);
Expand Down Expand Up @@ -149,6 +144,14 @@ public Connection getConnection() {
try {
return oneSessionDataSource.getConnection();
} catch (SQLException e) {
boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
if (retryGetConnection && kerberosStartupState) {
retryGetConnection = false;
createUserGroupInformation(baseConnectionParam.getUser());
Connection connection = getConnection();
retryGetConnection = true;
return connection;
}
logger.error("get oneSessionDataSource Connection fail SQLException: {}", e.getMessage(), e);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -557,8 +558,8 @@ public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectio
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i);
}
} catch (SQLException e) {
logger.warn(e.getMessage(), e);
} catch (SQLException | ExecutionException e) {
logger.error(e.getMessage(), e);
return null;
}

Expand Down

0 comments on commit 206b7c1

Please sign in to comment.