diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/ShardingDataSource.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/ShardingDataSource.java index 45ee1ba86ff74..b932780fb0971 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/ShardingDataSource.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/ShardingDataSource.java @@ -18,19 +18,20 @@ package com.dangdang.ddframe.rdb.sharding.api; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.SQLException; import java.util.Properties; -import javax.sql.DataSource; - import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfiguration; import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfigurationConstant; import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule; import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException; +import com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine; import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection; +import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingContext; import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDataSourceAdapter; import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext; +import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; +import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository; import com.google.common.base.Preconditions; /** @@ -40,51 +41,40 @@ */ public class ShardingDataSource extends AbstractDataSourceAdapter { - private final ShardingRule shardingRule; - - private final DatabaseMetaData databaseMetaData; + private final ThreadLocalObjectRepository threadLocalObjectRepository = new ThreadLocalObjectRepository(); - private final ShardingConfiguration configuration; - - private final MetricsContext metricsContext; + private final ShardingContext context; public ShardingDataSource(final ShardingRule shardingRule) { this(shardingRule, new Properties()); } public ShardingDataSource(final ShardingRule shardingRule, final Properties props) { - this.shardingRule = shardingRule; - databaseMetaData = getDatabaseMetaData(); - configuration = new ShardingConfiguration(props); - metricsContext = new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class), - configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), - configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class)); + Preconditions.checkNotNull(shardingRule); + Preconditions.checkNotNull(props); + ShardingConfiguration configuration = new ShardingConfiguration(props); + initThreadLocalObjectRepository(configuration); + DatabaseType type; + try { + type = DatabaseType.valueFrom(ShardingConnection.getDatabaseMetaDataFromDataSource(shardingRule.getDataSourceRule().getDataSources()).getDatabaseProductName()); + } catch (final SQLException e) { + throw new ShardingJdbcException("Can not get database product name", e); + } + context = new ShardingContext(shardingRule, new SQLRouteEngine(shardingRule, type), new ExecutorEngine(configuration)); } - private DatabaseMetaData getDatabaseMetaData() { - String databaseProductName = null; - DatabaseMetaData result = null; - for (DataSource each : shardingRule.getDataSourceRule().getDataSources()) { - String databaseProductNameInEach; - DatabaseMetaData metaDataInEach; - try { - metaDataInEach = each.getConnection().getMetaData(); - databaseProductNameInEach = metaDataInEach.getDatabaseProductName(); - } catch (final SQLException ex) { - throw new ShardingJdbcException("Can not get data source DatabaseProductName", ex); - } - Preconditions.checkState(null == databaseProductName || databaseProductName.equals(databaseProductNameInEach), - String.format("Database type inconsistent with '%s' and '%s'", databaseProductName, databaseProductNameInEach)); - databaseProductName = databaseProductNameInEach; - result = metaDataInEach; + private void initThreadLocalObjectRepository(final ShardingConfiguration configuration) { + boolean enableMetrics = configuration.getConfig(ShardingConfigurationConstant.METRICS_ENABLE, boolean.class); + if (enableMetrics) { + threadLocalObjectRepository.addItem(new MetricsContext(configuration.getConfig(ShardingConfigurationConstant.METRICS_SECOND_PERIOD, long.class), + configuration.getConfig(ShardingConfigurationConstant.METRICS_PACKAGE_NAME, String.class))); } - return result; } @Override public ShardingConnection getConnection() throws SQLException { - metricsContext.register(); - return new ShardingConnection(shardingRule, databaseMetaData); + threadLocalObjectRepository.open(); + return new ShardingConnection(context); } @Override diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/config/ShardingConfigurationConstant.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/config/ShardingConfigurationConstant.java index d037e8b7e4b3d..97b30952bdd42 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/config/ShardingConfigurationConstant.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/api/config/ShardingConfigurationConstant.java @@ -17,6 +17,8 @@ package com.dangdang.ddframe.rdb.sharding.api.config; +import java.util.concurrent.TimeUnit; + import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -45,9 +47,33 @@ public enum ShardingConfigurationConstant { /** * 度量输出在日志中的标识名称. */ - METRICS_PACKAGE_NAME("metrics.package.name", "com.dangdang.ddframe.rdb.sharding.metrics"); + METRICS_PACKAGE_NAME("metrics.package.name", "com.dangdang.ddframe.rdb.sharding.metrics"), + + /** + * 最小空闲工作现成数量. + */ + PARALLEL_EXECUTOR_WORKER_MIN_IDLE_SIZE("parallelExecutor.worker.minIdleSize", "0"), + + /** + * 最大工作现成数量. + */ + PARALLEL_EXECUTOR_WORKER_MAX_SIZE("parallelExecutor.worker.maxSize", defaultMaxThread()), + + /** + * 工作线程空闲时超时时间. + */ + PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT("parallelExecutor.worker.maxIdleTimeout", "60"), + + /** + * 工作线程空闲时超时时间单位. + */ + PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT_TIME_UNIT("parallelExecutor.worker.maxIdleTimeout.timeUnit", TimeUnit.SECONDS.toString()); private final String key; private final String defaultValue; + + private static String defaultMaxThread() { + return String.valueOf(Runtime.getRuntime().availableProcessors() * 2); + } } diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/exception/ShardingJdbcException.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/exception/ShardingJdbcException.java index e8c0b1c9b0284..a6bb8c1262ed8 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/exception/ShardingJdbcException.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/exception/ShardingJdbcException.java @@ -31,7 +31,7 @@ public ShardingJdbcException(final String errorMessage, final Object... args) { } public ShardingJdbcException(final String message, final Exception cause) { - super(message,cause); + super(message, cause); } public ShardingJdbcException(final Exception cause) { diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java index 1defea8912904..2e648cc35dfe6 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/ExecutorEngine.java @@ -23,17 +23,18 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfiguration; +import com.dangdang.ddframe.rdb.sharding.api.config.ShardingConfigurationConstant; import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; /** @@ -41,10 +42,20 @@ * * @author gaohongtao */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) @Slf4j public final class ExecutorEngine { + private final ListeningExecutorService executorService; + + public ExecutorEngine(final ShardingConfiguration configuration) { + executorService = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService( + new ThreadPoolExecutor(configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MIN_IDLE_SIZE, int.class), + configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_SIZE, int.class), + configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT, long.class), + TimeUnit.valueOf(configuration.getConfig(ShardingConfigurationConstant.PARALLEL_EXECUTOR_WORKER_MAX_IDLE_TIMEOUT_TIME_UNIT)), + new LinkedBlockingQueue()))); + } + /** * 多线程执行任务. * @@ -54,7 +65,7 @@ public final class ExecutorEngine { * @param 出参类型 * @return 执行结果 */ - public static List execute(final Collection inputs, final ExecuteUnit executeUnit) { + public List execute(final Collection inputs, final ExecuteUnit executeUnit) { ListenableFuture> futures = submitFutures(inputs, executeUnit); addCallback(futures); return getFutureResults(futures); @@ -71,15 +82,14 @@ public static List execute(final Collection inputs, final ExecuteUn * @param 最终结果类型 * @return 执行结果 */ - public static O execute(final Collection inputs, final ExecuteUnit executeUnit, final MergeUnit mergeUnit) { + public O execute(final Collection inputs, final ExecuteUnit executeUnit, final MergeUnit mergeUnit) { return mergeUnit.merge(execute(inputs, executeUnit)); } - private static ListenableFuture> submitFutures(final Collection inputs, final ExecuteUnit executeUnit) { + private ListenableFuture> submitFutures(final Collection inputs, final ExecuteUnit executeUnit) { Set> result = new HashSet<>(inputs.size()); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(inputs.size())); for (final I each : inputs) { - result.add(service.submit(new Callable() { + result.add(executorService.submit(new Callable() { @Override public O call() throws Exception { @@ -87,13 +97,11 @@ public O call() throws Exception { } })); } - service.shutdown(); return Futures.allAsList(result); } - private static void addCallback(final ListenableFuture allFutures) { + private void addCallback(final ListenableFuture allFutures) { Futures.addCallback(allFutures, new FutureCallback() { - @Override public void onSuccess(final T result) { log.trace("Concurrent execute result success {}", result); @@ -106,7 +114,7 @@ public void onFailure(final Throwable thrown) { }); } - private static O getFutureResults(final ListenableFuture futures) { + private O getFutureResults(final ListenableFuture futures) { try { return futures.get(); } catch (final InterruptedException | ExecutionException ex) { diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java index 52ed3dce08c8f..af4758a2918df 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java @@ -26,7 +26,6 @@ import com.codahale.metrics.Timer.Context; import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext; - import lombok.RequiredArgsConstructor; /** @@ -37,6 +36,8 @@ @RequiredArgsConstructor public final class PreparedStatementExecutor { + private final ExecutorEngine executorEngine; + private final Collection preparedStatements; /** @@ -53,7 +54,7 @@ public List executeQuery() throws SQLException { MetricsContext.stop(context); return result; } - result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit() { + result = executorEngine.execute(preparedStatements, new ExecuteUnit() { @Override public ResultSet execute(final PreparedStatement input) throws Exception { @@ -78,7 +79,7 @@ public int executeUpdate() throws SQLException { MetricsContext.stop(context); return result; } - result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit() { + result = executorEngine.execute(preparedStatements, new ExecuteUnit() { @Override public Integer execute(final PreparedStatement input) throws Exception { @@ -112,7 +113,7 @@ public boolean execute() throws SQLException { MetricsContext.stop(context); return result; } - List result = ExecutorEngine.execute(preparedStatements, new ExecuteUnit() { + List result = executorEngine.execute(preparedStatements, new ExecuteUnit() { @Override public Boolean execute(final PreparedStatement input) throws Exception { diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/StatementExecutor.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/StatementExecutor.java index 2a25040695353..43d1a9ed14a6b 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/StatementExecutor.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/StatementExecutor.java @@ -27,7 +27,6 @@ import com.codahale.metrics.Timer.Context; import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext; - import lombok.RequiredArgsConstructor; /** @@ -38,6 +37,8 @@ @RequiredArgsConstructor public final class StatementExecutor { + private final ExecutorEngine executorEngine; + private final Collection statements = new ArrayList<>(); /** @@ -65,7 +66,7 @@ public List executeQuery() throws SQLException { MetricsContext.stop(context); return result; } - result = ExecutorEngine.execute(statements, new ExecuteUnit() { + result = executorEngine.execute(statements, new ExecuteUnit() { @Override public ResultSet execute(final StatementEntity input) throws Exception { @@ -131,7 +132,7 @@ private int executeUpdate(final Updater updater) throws SQLException { MetricsContext.stop(context); return result; } - result = ExecutorEngine.execute(statements, new ExecuteUnit() { + result = executorEngine.execute(statements, new ExecuteUnit() { @Override public Integer execute(final StatementEntity input) throws Exception { @@ -206,7 +207,7 @@ private boolean execute(final Executor executor) throws SQLException { MetricsContext.stop(context); return result; } - List result = ExecutorEngine.execute(statements, new ExecuteUnit() { + List result = executorEngine.execute(statements, new ExecuteUnit() { @Override public Boolean execute(final StatementEntity input) throws Exception { diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingConnection.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingConnection.java index a7c0ffff21831..523684b446404 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingConnection.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingConnection.java @@ -25,35 +25,32 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import javax.sql.DataSource; import com.codahale.metrics.Timer.Context; -import com.dangdang.ddframe.rdb.sharding.api.DatabaseType; -import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule; +import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException; import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractConnectionAdapter; import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext; -import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Collections2; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; /** * 支持分片的数据库连接. * - * @author zhangliang + * @author zhangliang,gaohongtao */ +@RequiredArgsConstructor public final class ShardingConnection extends AbstractConnectionAdapter { - private final ShardingRule shardingRule; - - private final DatabaseMetaData metaData; + @Getter(AccessLevel.PACKAGE) + private final ShardingContext context; private Map connectionMap = new HashMap<>(); - private SQLRouteEngine sqlRouteEngine; - - public ShardingConnection(final ShardingRule shardingRule, final DatabaseMetaData metaData) throws SQLException { - this.shardingRule = shardingRule; - this.metaData = metaData; - sqlRouteEngine = new SQLRouteEngine(shardingRule, DatabaseType.valueFrom(metaData.getDatabaseProductName())); - } - /** * 根据数据源名称获取相应的数据库连接. * @@ -65,7 +62,7 @@ public Connection getConnection(final String dataSourceName) throws SQLException return connectionMap.get(dataSourceName); } Context context = MetricsContext.start("ShardingConnection-getConnection", dataSourceName); - Connection connection = shardingRule.getDataSourceRule().getDataSource(dataSourceName).getConnection(); + Connection connection = this.context.getShardingRule().getDataSourceRule().getDataSource(dataSourceName).getConnection(); MetricsContext.stop(context); replayMethodsInvovation(connection); connectionMap.put(dataSourceName, connection); @@ -74,52 +71,104 @@ public Connection getConnection(final String dataSourceName) throws SQLException @Override public DatabaseMetaData getMetaData() throws SQLException { - return metaData; + if (connectionMap.isEmpty()) { + return getDatabaseMetaDataFromDataSource(context.getShardingRule().getDataSourceRule().getDataSources()); + } else { + return getDatabaseMetaDataFromConnection(connectionMap.values()); + } + } + + public static DatabaseMetaData getDatabaseMetaDataFromDataSource(final Collection dataSources) { + Collection connectionCollection = null; + + try { + connectionCollection = Collections2.transform(dataSources, new Function() { + + @Override + public Connection apply(final DataSource input) { + try { + return input.getConnection(); + } catch (final SQLException e) { + throw new ShardingJdbcException(e); + } + } + }); + return getDatabaseMetaDataFromConnection(connectionCollection); + } finally { + if (null != connectionCollection) { + for (Connection each : connectionCollection) { + try { + each.close(); + } catch (final SQLException ignored) { + } + } + } + } + } + + private static DatabaseMetaData getDatabaseMetaDataFromConnection(final Collection connections) { + String databaseProductName = null; + DatabaseMetaData result = null; + for (Connection each : connections) { + String databaseProductNameInEach; + DatabaseMetaData metaDataInEach; + try { + metaDataInEach = each.getMetaData(); + databaseProductNameInEach = metaDataInEach.getDatabaseProductName(); + } catch (final SQLException ex) { + throw new ShardingJdbcException("Can not get data source DatabaseProductName", ex); + } + Preconditions.checkState(null == databaseProductName || databaseProductName.equals(databaseProductNameInEach), + String.format("Database type inconsistent with '%s' and '%s'", databaseProductName, databaseProductNameInEach)); + databaseProductName = databaseProductNameInEach; + result = metaDataInEach; + } + return result; } @Override public PreparedStatement prepareStatement(final String sql) throws SQLException { - return new ShardingPreparedStatement(sqlRouteEngine, this, sql); + return new ShardingPreparedStatement(this, sql); } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { - return new ShardingPreparedStatement(sqlRouteEngine, this, sql, resultSetType, resultSetConcurrency); + return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency); } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { - return new ShardingPreparedStatement(sqlRouteEngine, this, sql, resultSetType, resultSetConcurrency, resultSetHoldability); + return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { - return new ShardingPreparedStatement(sqlRouteEngine, this, sql, autoGeneratedKeys); + return new ShardingPreparedStatement(this, sql, autoGeneratedKeys); } @Override public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { - return new ShardingPreparedStatement(sqlRouteEngine, this, sql, columnIndexes); + return new ShardingPreparedStatement(this, sql, columnIndexes); } @Override public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { - return new ShardingPreparedStatement(sqlRouteEngine, this, sql, columnNames); + return new ShardingPreparedStatement(this, sql, columnNames); } @Override public Statement createStatement() throws SQLException { - return new ShardingStatement(sqlRouteEngine, this); + return new ShardingStatement(this); } @Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency) throws SQLException { - return new ShardingStatement(sqlRouteEngine, this, resultSetType, resultSetConcurrency); + return new ShardingStatement(this, resultSetType, resultSetConcurrency); } @Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { - return new ShardingStatement(sqlRouteEngine, this, resultSetType, resultSetConcurrency, resultSetHoldability); + return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingContext.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingContext.java new file mode 100644 index 0000000000000..37d03e8c659fe --- /dev/null +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingContext.java @@ -0,0 +1,40 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.rdb.sharding.jdbc; + +import com.dangdang.ddframe.rdb.sharding.api.rule.ShardingRule; +import com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine; +import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * ShardingDataSource运行期上下文. + * + * @author gaohongtao + */ +@RequiredArgsConstructor +@Getter +public final class ShardingContext { + + private final ShardingRule shardingRule; + + private final SQLRouteEngine sqlRouteEngine; + + private final ExecutorEngine executorEngine; +} diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java index 817790dd38413..1dd1fc4aa7a24 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java @@ -31,8 +31,8 @@ import com.dangdang.ddframe.rdb.sharding.executor.PreparedStatementExecutor; import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter; import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory; +import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext; import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit; -import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult; import com.google.common.collect.Lists; @@ -57,53 +57,54 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd private final List> batchParameters = new ArrayList<>(); - public ShardingPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, final String sql) throws SQLException { - this(sqlRouteEngine, shardingConnection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); + public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql) throws SQLException { + this(shardingConnection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } - public ShardingPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, + public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { - this(sqlRouteEngine, shardingConnection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); + this(shardingConnection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } - public ShardingPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, + public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { - super(sqlRouteEngine, shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); + super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); this.sql = sql; } - public ShardingPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) throws SQLException { - this(sqlRouteEngine, shardingConnection, sql); + public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) throws SQLException { + this(shardingConnection, sql); this.autoGeneratedKeys = autoGeneratedKeys; } - public ShardingPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, final String sql, final int[] columnIndexes) throws SQLException { - this(sqlRouteEngine, shardingConnection, sql); + public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int[] columnIndexes) throws SQLException { + this(shardingConnection, sql); this.columnIndexes = columnIndexes; } - public ShardingPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, final String sql, final String[] columnNames) throws SQLException { - this(sqlRouteEngine, shardingConnection, sql); + public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final String[] columnNames) throws SQLException { + this(shardingConnection, sql); this.columnNames = columnNames; } @Override public ResultSet executeQuery() throws SQLException { hasExecuted = true; - setCurrentResultSet(ResultSetFactory.getResultSet(new PreparedStatementExecutor(getRoutedPreparedStatements()).executeQuery(), getMergeContext())); + setCurrentResultSet(ResultSetFactory.getResultSet(new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), + getRoutedPreparedStatements()).executeQuery(), getMergeContext())); return getCurrentResultSet(); } @Override public int executeUpdate() throws SQLException { hasExecuted = true; - return new PreparedStatementExecutor(getRoutedPreparedStatements()).executeUpdate(); + return new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), getRoutedPreparedStatements()).executeUpdate(); } @Override public boolean execute() throws SQLException { hasExecuted = true; - return new PreparedStatementExecutor(getRoutedPreparedStatements()).execute(); + return new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), getRoutedPreparedStatements()).execute(); } @Override @@ -123,7 +124,7 @@ public int[] executeBatch() throws SQLException { int[] result = new int[batchParameters.size()]; int i = 0; for (List each : batchParameters) { - result[i++] = new PreparedStatementExecutor(routeSQL(each)).executeUpdate(); + result[i++] = new PreparedStatementExecutor(getShardingConnection().getContext().getExecutorEngine(), routeSQL(each)).executeUpdate(); } return result; } @@ -150,8 +151,10 @@ private void routeIfNeed() throws SQLException { private List routeSQL(final List parameters) throws SQLException { List result = new ArrayList<>(); - SQLRouteResult sqlRouteResult = getSqlRouteEngine().route(sql, parameters); - setMergeContext(sqlRouteResult.getMergeContext()); + SQLRouteResult sqlRouteResult = getShardingConnection().getContext().getSqlRouteEngine().route(sql, parameters); + MergeContext mergeContext = sqlRouteResult.getMergeContext(); + mergeContext.setExecutorEngine(getShardingConnection().getContext().getExecutorEngine()); + setMergeContext(mergeContext); for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { PreparedStatement preparedStatement = generatePrepareStatement(getShardingConnection().getConnection(each.getDataSource()), each.getSql()); replayMethodsInvovation(preparedStatement); diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java index cd62d2863e4ce..490915a6ead6d 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java @@ -33,12 +33,10 @@ import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory; import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext; import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit; -import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult; import com.google.common.base.Charsets; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; - import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -53,9 +51,6 @@ public class ShardingStatement extends AbstractStatementAdapter { @Getter(AccessLevel.PROTECTED) private final ShardingConnection shardingConnection; - @Getter(AccessLevel.PROTECTED) - private final SQLRouteEngine sqlRouteEngine; - @Getter private final int resultSetType; @@ -75,19 +70,18 @@ public class ShardingStatement extends AbstractStatementAdapter { @Setter(AccessLevel.PROTECTED) private ResultSet currentResultSet; - public ShardingStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection) throws SQLException { - this(sqlRouteEngine, shardingConnection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); + public ShardingStatement(final ShardingConnection shardingConnection) throws SQLException { + this(shardingConnection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } - public ShardingStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) throws SQLException { - this(sqlRouteEngine, shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); + public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) throws SQLException { + this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } - public ShardingStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, - final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { + public ShardingStatement(final ShardingConnection shardingConnection, + final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { super(Statement.class); this.shardingConnection = shardingConnection; - this.sqlRouteEngine = sqlRouteEngine; this.resultSetType = resultSetType; this.resultSetConcurrency = resultSetConcurrency; this.resultSetHoldability = resultSetHoldability; @@ -148,9 +142,10 @@ public boolean execute(final String sql, final String[] columnNames) throws SQLE } private StatementExecutor generateExecutor(final String sql) throws SQLException { - StatementExecutor result = new StatementExecutor(); - SQLRouteResult sqlRouteResult = sqlRouteEngine.route(sql, Collections.emptyList()); + StatementExecutor result = new StatementExecutor(shardingConnection.getContext().getExecutorEngine()); + SQLRouteResult sqlRouteResult = shardingConnection.getContext().getSqlRouteEngine().route(sql, Collections.emptyList()); mergeContext = sqlRouteResult.getMergeContext(); + mergeContext.setExecutorEngine(shardingConnection.getContext().getExecutorEngine()); for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) { result.addStatement(each.getSql(), generateStatement(each.getSql(), each.getDataSource())); } diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractPreparedStatementAdapter.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractPreparedStatementAdapter.java index f160fc963e5b6..a23a1714e1721 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractPreparedStatementAdapter.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractPreparedStatementAdapter.java @@ -35,8 +35,6 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection; import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement; -import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; - import lombok.Getter; /** @@ -53,9 +51,9 @@ public abstract class AbstractPreparedStatementAdapter extends AbstractUnsupport @Getter private final List parameters = new ArrayList<>(); - public AbstractPreparedStatementAdapter(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, + public AbstractPreparedStatementAdapter(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { - super(sqlRouteEngine, shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); + super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationPreparedStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationPreparedStatement.java index d6581cc295827..989b439c52f9f 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationPreparedStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationPreparedStatement.java @@ -29,7 +29,6 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection; import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingStatement; -import com.dangdang.ddframe.rdb.sharding.router.SQLRouteEngine; /** * 声明不支持操作的预编译语句对象. @@ -38,9 +37,9 @@ */ public abstract class AbstractUnsupportedOperationPreparedStatement extends ShardingStatement implements PreparedStatement { - public AbstractUnsupportedOperationPreparedStatement(final SQLRouteEngine sqlRouteEngine, final ShardingConnection shardingConnection, + public AbstractUnsupportedOperationPreparedStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { - super(sqlRouteEngine, shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); + super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/groupby/GroupByResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/groupby/GroupByResultSet.java index 9dc0bae3c87c5..edbd3360fc552 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/groupby/GroupByResultSet.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/groupby/GroupByResultSet.java @@ -73,6 +73,8 @@ public final class GroupByResultSet extends AbstractShardingResultSet { private final List columnLabels; + private final ExecutorEngine executorEngine; + private Iterator groupByResultIterator; @Getter(AccessLevel.PROTECTED) @@ -86,6 +88,7 @@ public GroupByResultSet(final List resultSets, final MergeContext mer resultSetMetaData = getResultSets().iterator().next().getMetaData(); columnLabels = new ArrayList<>(resultSetMetaData.getColumnCount()); fillRelatedColumnNames(); + executorEngine = mergeContext.getExecutorEngine(); } private void fillRelatedColumnNames() throws SQLException { @@ -143,7 +146,7 @@ public Multimap merge(final List result = ExecutorEngine.execute(getResultSets(), executeUnit, mergeUnit); + Multimap result = executorEngine.execute(getResultSets(), executeUnit, mergeUnit); log.trace("Mapped result: {}", result); return result; } diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContext.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContext.java index 8ec4a59bc95c7..810a65f63dd9e 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContext.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContext.java @@ -19,14 +19,13 @@ import java.util.concurrent.TimeUnit; -import org.slf4j.LoggerFactory; - import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Slf4jReporter; import com.codahale.metrics.Slf4jReporter.LoggingLevel; import com.codahale.metrics.Timer.Context; +import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository; import com.google.common.base.Joiner; -import com.google.common.base.Optional; +import org.slf4j.LoggerFactory; /** * 度量工具上下文. @@ -35,32 +34,18 @@ */ public final class MetricsContext { - private static final ThreadLocal CONTEXT = new ThreadLocal<>(); - - private final Optional metricRegistry; - - public MetricsContext(final boolean enable, final long period, final String packageName) { - if (enable) { - metricRegistry = Optional.of(new MetricRegistry()); - Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry.get()) - .outputTo(LoggerFactory.getLogger(packageName)) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .withLoggingLevel(LoggingLevel.DEBUG) - .build(); - reporter.start(period, TimeUnit.SECONDS); - } else { - metricRegistry = Optional.absent(); - } - } + private final MetricRegistry metricRegistry; - /** - * 注册度量上下文. - */ - public void register() { - if (metricRegistry.isPresent() && !this.equals(CONTEXT.get())) { - CONTEXT.set(this); - } + public MetricsContext(final long period, final String packageName) { + metricRegistry = new MetricRegistry(); + Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry) + .outputTo(LoggerFactory.getLogger(packageName)) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .withLoggingLevel(LoggingLevel.DEBUG) + .build(); + reporter.start(period, TimeUnit.SECONDS); + } /** @@ -71,7 +56,8 @@ public void register() { * @return 计时上下文 */ public static Context start(final String... name) { - return null == CONTEXT.get() ? null : CONTEXT.get().metricRegistry.get().timer(MetricRegistry.name(Joiner.on("-").join(name))).time(); + MetricsContext context = ThreadLocalObjectRepository.getItem(MetricsContext.class); + return null == context ? null : context.metricRegistry.timer(MetricRegistry.name(Joiner.on("-").join(name))).time(); } /** diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/parser/result/merger/MergeContext.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/parser/result/merger/MergeContext.java index 470473aa02889..e625f75ce58df 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/parser/result/merger/MergeContext.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/parser/result/merger/MergeContext.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -42,6 +43,9 @@ public final class MergeContext { @Setter private Limit limit; + @Setter + private ExecutorEngine executorEngine; + /** * 获取结果集类型. * diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/threadlocal/ThreadLocalObjectRepository.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/threadlocal/ThreadLocalObjectRepository.java new file mode 100644 index 0000000000000..32cb2f54c867f --- /dev/null +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/threadlocal/ThreadLocalObjectRepository.java @@ -0,0 +1,71 @@ +/** + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.rdb.sharding.threadlocal; + +import java.util.HashMap; +import java.util.Map; + +import lombok.NoArgsConstructor; + +/** + * ThreadLocal对象仓库. + * 多个ShardingDataSource使用static对象会造成数据污染,故使用该类来将这些对象绑定到ThreadLocal中. + * + * @author gaohongtao + */ +@NoArgsConstructor +public final class ThreadLocalObjectRepository { + + private static final ThreadLocal> THREAD_LOCAL_REPOSITORY = new ThreadLocal<>(); + + private Map repository = new HashMap<>(); + + /** + * 向仓库内添加对象. + * + * @param item 受仓库管理的对象 + */ + public void addItem(final Object item) { + repository.put(item.getClass().getName(), item); + } + + /** + * 开始使用仓库. + * 在本线程开始执行前要调用此方法设置线程对象状态. + * + */ + public void open() { + Map repositoryInThisThread = THREAD_LOCAL_REPOSITORY.get(); + if (null != repositoryInThisThread && repositoryInThisThread.equals(repository)) { + return; + } + THREAD_LOCAL_REPOSITORY.remove(); + THREAD_LOCAL_REPOSITORY.set(repository); + } + + /** + * 获取线程中对象. + * + * @param tClass 对象类型 + * @return 对象 + */ + public static T getItem(final Class tClass) { + return (T) THREAD_LOCAL_REPOSITORY.get().get(tClass.getName()); + } + +} diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContextTest.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContextTest.java index d4b8bd95ebe29..d979500aca192 100644 --- a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContextTest.java +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/metrics/MetricsContextTest.java @@ -17,6 +17,7 @@ package com.dangdang.ddframe.rdb.sharding.metrics; +import com.dangdang.ddframe.rdb.sharding.threadlocal.ThreadLocalObjectRepository; import org.junit.Test; import com.codahale.metrics.Timer.Context; @@ -34,8 +35,11 @@ public void assertMetricsContextDisable() { } private void run(final boolean enable) { - MetricsContext metricsContext = new MetricsContext(enable, 1000000L, "example"); - metricsContext.register(); + if(enable){ + ThreadLocalObjectRepository repository = new ThreadLocalObjectRepository(); + repository.addItem(new MetricsContext(1000000L, "example")); + repository.open(); + } Context context = MetricsContext.start("example"); MetricsContext.stop(context); } diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/parser/result/SQLParsedResultTest.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/parser/result/SQLParsedResultTest.java index b35a8e82c70f0..45062269d98c7 100644 --- a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/parser/result/SQLParsedResultTest.java +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/parser/result/SQLParsedResultTest.java @@ -42,7 +42,7 @@ public void assertToString() throws IOException { + "orderByColumns=[OrderByColumn(name=Optional.of(id), index=Optional.absent(), alias=Optional.of(a), orderByType=DESC)], " + "groupByColumns=[GroupByColumn(name=id, alias=d, orderByType=ASC)], " + "aggregationColumns=[AggregationColumn(expression=COUNT(id), aggregationType=COUNT, alias=Optional.of(c), option=Optional.absent(), derivedColumns=[], index=-1)], " - + "limit=Limit(offset=0, rowCount=10)))")); + + "limit=Limit(offset=0, rowCount=10), executorEngine=null))")); } private void generateRouteContext(final RouteContext routeContext) throws IOException { diff --git a/sharding-jdbc-doc/content/post/stress_test.md b/sharding-jdbc-doc/content/post/stress_test.md index dfed51f303f8d..3b2dd8f10422a 100644 --- a/sharding-jdbc-doc/content/post/stress_test.md +++ b/sharding-jdbc-doc/content/post/stress_test.md @@ -25,7 +25,7 @@ chart = true | 数据表行数 | 1000万 | 1000万 | ## 网络拓扑 -![网络拓扑图](img/stress_test_arch.png) +![网络拓扑图](../../img/stress_test_arch.png) ## 单库情况下Sharding-JDBC与JDBC性能对比 测试结论: diff --git a/sharding-jdbc-doc/content/post/user_guide.md b/sharding-jdbc-doc/content/post/user_guide.md index 6b769e62c00df..e52c29187da22 100644 --- a/sharding-jdbc-doc/content/post/user_guide.md +++ b/sharding-jdbc-doc/content/post/user_guide.md @@ -225,7 +225,7 @@ public Collection doBetweenSharding(final Collection availableTa 下面是一个余2的算法的例子,当分片键的值除以2余数就是实际表的结尾。注意注释中提供了一些算法生成SQL的结果,参数`tableNames`集合中有两个参数`t_order_0`和`t_order_1` ```java - public final class ModuloDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm { + public final class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm { /** * select * from t_order from t_order where order_id = 11 @@ -254,9 +254,9 @@ public Collection doBetweenSharding(final Collection availableTa public Collection doInSharding(final Collection tableNames, final ShardingValue shardingValue) { Collection result = new LinkedHashSet<>(tableNames.size()); for (Integer value : shardingValue.getValues()) { - for (String dataSourceName : tableNames) { - if (dataSourceName.endsWith(value % 2 + "")) { - result.add(dataSourceName); + for (String tableName : tableNames) { + if (tableName.endsWith(value % 2 + "")) { + result.add(tableName); } } }