Skip to content

Commit

Permalink
Merge pull request #1 from dangdangdotcom/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
sam2099 committed Feb 26, 2016
2 parents 15a37c1 + 5d03d89 commit 631b939
Show file tree
Hide file tree
Showing 20 changed files with 340 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
package com.dangdang.ddframe.rdb.sharding.api;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Properties;

import javax.sql.DataSource;

import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfiguration;
import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfigurationConstant;
import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingContext;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDataSourceAdapter;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine;
import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository;
import com.google.common.base.Preconditions;

/**
Expand All @@ -40,51 +41,40 @@
*/
public class ShardingDataSource extends AbstractDataSourceAdapter {

private final ShardingRule shardingRule;

private final DatabaseMetaData databaseMetaData;
private final ThreadLocalObjectRepository threadLocalObjectRepository = new ThreadLocalObjectRepository();

private final ShardingConfiguration configuration;

private final MetricsContext metricsContext;
private final ShardingContext context;

public ShardingDataSource(final ShardingRule shardingRule) {
this(shardingRule, new Properties());
}

public ShardingDataSource(final ShardingRule shardingRule, final Properties props) {
this.shardingRule = shardingRule;
databaseMetaData = getDatabaseMetaData();
configuration = new ShardingConfiguration(props);
metricsContext = new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class),
configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class),
configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class));
Preconditions.checkNotNull(shardingRule);
Preconditions.checkNotNull(props);
ShardingConfiguration configuration = new ShardingConfiguration(props);
initThreadLocalObjectRepository(configuration);
DatabaseType type;
try {
type = DatabaseType.valueFrom(ShardingConnection.getDatabaseMetaDataFromDataSource(shardingRule.getDataSourceRule().getDataSources()).getDatabaseProductName());
} catch (final SQLException e) {
throw new ShardingJdbcException("Can not get database product name", e);
}
context = new ShardingContext(shardingRule, new SQLRouteEngine(shardingRule, type), new ExecutorEngine(configuration));
}

private DatabaseMetaData getDatabaseMetaData() {
String databaseProductName = null;
DatabaseMetaData result = null;
for (DataSource each : shardingRule.getDataSourceRule().getDataSources()) {
String databaseProductNameInEach;
DatabaseMetaData metaDataInEach;
try {
metaDataInEach = each.getConnection().getMetaData();
databaseProductNameInEach = metaDataInEach.getDatabaseProductName();
} catch (final SQLException ex) {
throw new ShardingJdbcException("Can not get data source DatabaseProductName", ex);
}
Preconditions.checkState(null == databaseProductName || databaseProductName.equals(databaseProductNameInEach),
String.format("Database type inconsistent with '%s' and '%s'", databaseProductName, databaseProductNameInEach));
databaseProductName = databaseProductNameInEach;
result = metaDataInEach;
private void initThreadLocalObjectRepository(final ShardingConfiguration configuration) {
boolean enableMetrics = configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class);
if (enableMetrics) {
threadLocalObjectRepository.addItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class),
configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class)));
}
return result;
}

@Override
public ShardingConnection getConnection() throws SQLException {
metricsContext.register();
return new ShardingConnection(shardingRule, databaseMetaData);
threadLocalObjectRepository.open();
return new ShardingConnection(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.dangdang.ddframe.rdb.sharding.api.config;

import java.util.concurrent.TimeUnit;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -45,9 +47,33 @@ public enum ShardingConfigurationConstant {
/**
* 度量输出在日志中的标识名称.
*/
METRICS_PACKAGE_NAME("metrics.package.name", "com.dangdang.ddframe.rdb.sharding.metrics");
METRICS_PACKAGE_NAME("metrics.package.name", "com.dangdang.ddframe.rdb.sharding.metrics"),

/**
* 最小空闲工作现成数量.
*/
PARALLEL_EXECUTOR_WORKER_MIN_IDLE_SIZE("parallelExecutor.worker.minIdleSize", "0"),

/**
* 最大工作现成数量.
*/
PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThread()),

/**
* 工作线程空闲时超时时间.
*/
PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT("parallelExecutor.worker.maxIdleTimeout", "60"),

/**
* 工作线程空闲时超时时间单位.
*/
PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT_TIME_UNIT("parallelExecutor.worker.maxIdleTimeout.timeUnit", TimeUnit.SECONDS.toString());

private final String key;

private final String defaultValue;

private static String defaultMaxThread() {
return String.valueOf(Runtime.getRuntime().availableProcessors() * 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public ShardingJdbcException(final String errorMessage, final Object... args) {
}

public ShardingJdbcException(final String message, final Exception cause) {
super(message,cause);
super(message, cause);
}

public ShardingJdbcException(final Exception cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,39 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfiguration;
import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfigurationConstant;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* 多线程执行框架.
*
* @author gaohongtao
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class ExecutorEngine {

private final ListeningExecutorService executorService;

public ExecutorEngine(final ShardingConfiguration configuration) {
executorService = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MIN_IDLE_SIZE, int.class),
configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_SIZE, int.class),
configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT, long.class),
TimeUnit.valueOf(configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT_TIME_UNIT)),
new LinkedBlockingQueue<Runnable>())));
}

/**
* 多线程执行任务.
*
Expand All @@ -54,7 +65,7 @@ public final class ExecutorEngine {
* @param <O> 出参类型
* @return 执行结果
*/
public static <I, O> List<O> execute(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
public <I, O> List<O> execute(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
ListenableFuture<List<O>> futures = submitFutures(inputs, executeUnit);
addCallback(futures);
return getFutureResults(futures);
Expand All @@ -71,29 +82,26 @@ public static <I, O> List<O> execute(final Collection<I> inputs, final ExecuteUn
* @param <O> 最终结果类型
* @return 执行结果
*/
public static <I, M, O> O execute(final Collection<I> inputs, final ExecuteUnit<I, M> executeUnit, final MergeUnit<M, O> mergeUnit) {
public <I, M, O> O execute(final Collection<I> inputs, final ExecuteUnit<I, M> executeUnit, final MergeUnit<M, O> mergeUnit) {
return mergeUnit.merge(execute(inputs, executeUnit));
}

private static <I, O> ListenableFuture<List<O>> submitFutures(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
private <I, O> ListenableFuture<List<O>> submitFutures(final Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
Set<ListenableFuture<O>> result = new HashSet<>(inputs.size());
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(inputs.size()));
for (final I each : inputs) {
result.add(service.submit(new Callable<O>() {
result.add(executorService.submit(new Callable<O>() {

@Override
public O call() throws Exception {
return executeUnit.execute(each);
}
}));
}
service.shutdown();
return Futures.allAsList(result);
}

private static <T> void addCallback(final ListenableFuture<T> allFutures) {
private <T> void addCallback(final ListenableFuture<T> allFutures) {
Futures.addCallback(allFutures, new FutureCallback<T>() {

@Override
public void onSuccess(final T result) {
log.trace("Concurrent execute result success {}", result);
Expand All @@ -106,7 +114,7 @@ public void onFailure(final Throwable thrown) {
});
}

private static <O> O getFutureResults(final ListenableFuture<O> futures) {
private <O> O getFutureResults(final ListenableFuture<O> futures) {
try {
return futures.get();
} catch (final InterruptedException | ExecutionException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;

import lombok.RequiredArgsConstructor;

/**
Expand All @@ -37,6 +36,8 @@
@RequiredArgsConstructor
public final class PreparedStatementExecutor {

private final ExecutorEngine executorEngine;

private final Collection<PreparedStatement> preparedStatements;

/**
Expand All @@ -53,7 +54,7 @@ public List<ResultSet> executeQuery() throws SQLException {
MetricsContext.stop(context);
return result;
}
result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, ResultSet>() {
result = executorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, ResultSet>() {

@Override
public ResultSet execute(final PreparedStatement input) throws Exception {
Expand All @@ -78,7 +79,7 @@ public int executeUpdate() throws SQLException {
MetricsContext.stop(context);
return result;
}
result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Integer>() {
result = executorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Integer>() {

@Override
public Integer execute(final PreparedStatement input) throws Exception {
Expand Down Expand Up @@ -112,7 +113,7 @@ public boolean execute() throws SQLException {
MetricsContext.stop(context);
return result;
}
List<Boolean> result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Boolean>() {
List<Boolean> result = executorEngine.execute(preparedStatements, new ExecuteUnit<PreparedStatement, Boolean>() {

@Override
public Boolean execute(final PreparedStatement input) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;

import lombok.RequiredArgsConstructor;

/**
Expand All @@ -38,6 +37,8 @@
@RequiredArgsConstructor
public final class StatementExecutor {

private final ExecutorEngine executorEngine;

private final Collection<StatementEntity> statements = new ArrayList<>();

/**
Expand Down Expand Up @@ -65,7 +66,7 @@ public List<ResultSet> executeQuery() throws SQLException {
MetricsContext.stop(context);
return result;
}
result = ExecutorEngine.execute(statements, new ExecuteUnit<StatementEntity, ResultSet>() {
result = executorEngine.execute(statements, new ExecuteUnit<StatementEntity, ResultSet>() {

@Override
public ResultSet execute(final StatementEntity input) throws Exception {
Expand Down Expand Up @@ -131,7 +132,7 @@ private int executeUpdate(final Updater updater) throws SQLException {
MetricsContext.stop(context);
return result;
}
result = ExecutorEngine.execute(statements, new ExecuteUnit<StatementEntity, Integer>() {
result = executorEngine.execute(statements, new ExecuteUnit<StatementEntity, Integer>() {

@Override
public Integer execute(final StatementEntity input) throws Exception {
Expand Down Expand Up @@ -206,7 +207,7 @@ private boolean execute(final Executor executor) throws SQLException {
MetricsContext.stop(context);
return result;
}
List<Boolean> result = ExecutorEngine.execute(statements, new ExecuteUnit<StatementEntity, Boolean>() {
List<Boolean> result = executorEngine.execute(statements, new ExecuteUnit<StatementEntity, Boolean>() {

@Override
public Boolean execute(final StatementEntity input) throws Exception {
Expand Down
Loading

0 comments on commit 631b939

Please sign in to comment.