Skip to content

Commit

Permalink
fixed #203 Merge batch events
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored and gaohongtao committed Dec 1, 2016
1 parent 78b31ef commit bc0abfd
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEventBus;
import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.AbstractExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.google.common.base.Optional;
import lombok.RequiredArgsConstructor;

Expand All @@ -43,22 +42,14 @@ class EventPostman {

void postExecutionEvents() {
for (AbstractExecutorWrapper each : statementExecutorWrappers) {
if (each instanceof BatchPreparedStatementExecutorWrapper) {
postBatchExecutionEvent((BatchPreparedStatementExecutorWrapper) each);
} else if (each.getDMLExecutionEvent().isPresent()) {
if (each.getDMLExecutionEvent().isPresent()) {
DMLExecutionEventBus.post(each.getDMLExecutionEvent().get());
} else if (each.getDQLExecutionEvent().isPresent()) {
DQLExecutionEventBus.post(each.getDQLExecutionEvent().get());
}
}
}

private void postBatchExecutionEvent(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper) {
for (DMLExecutionEvent each : batchPreparedStatementExecutorWrapper.getDmlExecutionEvents()) {
DMLExecutionEventBus.post(each);
}
}

void postExecutionEventsAfterExecution(final AbstractExecutorWrapper statementExecutorWrapper) {
postExecutionEventsAfterExecution(statementExecutorWrapper, EventExecutionType.EXECUTE_SUCCESS, Optional.<SQLException>absent());
}
Expand All @@ -76,17 +67,4 @@ void postExecutionEventsAfterExecution(final AbstractExecutorWrapper statementEx
DQLExecutionEventBus.post(event);
}
}

void postBatchExecutionEventsAfterExecution(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper) {
postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_SUCCESS, Optional.<SQLException>absent());
}

void postBatchExecutionEventsAfterExecution(
final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final EventExecutionType eventExecutionType, final Optional<SQLException> exp) {
for (DMLExecutionEvent each : batchPreparedStatementExecutorWrapper.getDmlExecutionEvents()) {
each.setEventExecutionType(eventExecutionType);
each.setExp(exp);
DMLExecutionEventBus.post(each);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.codahale.metrics.Timer.Context;
import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -213,14 +212,14 @@ public int[] executeBatch() {
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
try {
if (1 == preparedStatementExecutorWrappers.size()) {
return executeBatchInternal((BatchPreparedStatementExecutorWrapper) preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
return executeBatchInternal(preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
}
return executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, int[]>() {

@Override
public int[] execute(final PreparedStatementExecutorWrapper input) throws Exception {
synchronized (input.getPreparedStatement().getConnection()) {
return executeBatchInternal((BatchPreparedStatementExecutorWrapper) input, isExceptionThrown, dataMap);
return executeBatchInternal(input, isExceptionThrown, dataMap);
}
}
}, new MergeUnit<int[], int[]>() {
Expand Down Expand Up @@ -248,18 +247,18 @@ public int[] merge(final List<int[]> results) {
}
}

private int[] executeBatchInternal(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
private int[] executeBatchInternal(final PreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
int[] result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = batchPreparedStatementExecutorWrapper.getPreparedStatement().executeBatch();
} catch (final SQLException ex) {
eventPostman.postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_FAILURE, Optional.of(ex));
eventPostman.postExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_FAILURE, Optional.of(ex));
ExecutorExceptionHandler.handleException(ex);
return null;
}
eventPostman.postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper);
eventPostman.postExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package com.dangdang.ddframe.rdb.sharding.executor.event;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
Expand All @@ -40,7 +42,7 @@ public class ExecutionEvent {

private final String sql;

private final List<Object> parameters;
private final List<List<Object>> parameters = new ArrayList<>();

@Setter
private EventExecutionType eventExecutionType = EventExecutionType.BEFORE_EXECUTE;
Expand All @@ -57,6 +59,42 @@ public class ExecutionEvent {
id = UUID.randomUUID().toString();
this.dataSource = dataSource;
this.sql = sql;
this.parameters = parameters;
this.parameters.add(parameters);
}

/**
* 获取参数.
* 调用该方法前需要调用{@linkplain #isBatch()},
* 如果返回值为{@code false}那么可以调用该方法获取参数.
*
* @return 参数列表
*/
public List<Object> getParameters() {
return parameters.get(0);
}

/**
* 判断事件是否为批量操作事件.
* 如果返回值为{@code false}那么可以调用{@link #getParameters()}获取参数,
* 如果返回值为{@code true}那么可以调用{@link #getBatchParameters()}获取参数.
*
* @return {@code true}是批量操作事件,{@code false}不是批量操作事件
*/
public boolean isBatch() {
return parameters.size() > 1;
}

/**
* 获取批量参数.
* 不论{@linkplain #isBatch()}返回值是什么,该方法都可以获得所有的参数.
*
* @return 参数列表
*/
public List<List<Object>> getBatchParameters() {
return parameters;
}

public void addBatchParameters(final List<Object> parameters) {
this.parameters.add(Lists.newArrayList(parameters));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import lombok.Getter;

Expand Down Expand Up @@ -66,4 +67,14 @@ public Optional<DMLExecutionEvent> getDMLExecutionEvent() {
public Optional<DQLExecutionEvent> getDQLExecutionEvent() {
return dqlExecutionEvent;
}

/**
* 增加批量参数.
*
* @param parameters 参数列表
*/
public void addBatchParameters(final List<Object> parameters) {
Preconditions.checkArgument(isDML() && dmlExecutionEvent.isPresent());
dmlExecutionEvent.get().addBatchParameters(Lists.newArrayList(parameters));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
package com.dangdang.ddframe.rdb.sharding.jdbc;

import com.dangdang.ddframe.rdb.sharding.executor.PreparedStatementExecutor;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
import com.dangdang.ddframe.rdb.sharding.router.PreparedSQLRouter;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* 支持分片的预编译语句对象.
Expand All @@ -46,7 +47,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd

private final PreparedSQLRouter preparedSQLRouter;

private final Map<PreparedStatement, PreparedStatementExecutorWrapper> cachedRoutePreparedStatementMap = new HashMap<>();
private final List<PreparedStatementExecutorWrapper> cachedPreparedStatementWrappers = new ArrayList<>();

private Integer autoGeneratedKeys;

Expand Down Expand Up @@ -116,49 +117,42 @@ public boolean execute() throws SQLException {
}

protected void clearRouteContext() throws SQLException {
super.clearRouteContext();
clearParameters();
resetBatch();
cachedPreparedStatementWrappers.clear();
}

@Override
public void clearBatch() throws SQLException {
clearRouteContext();
}

@Override
public void addBatch() throws SQLException {
try {
for (PreparedStatementExecutorWrapper each : routeSQL()) {
each.getPreparedStatement().addBatch();
BatchPreparedStatementExecutorWrapper wrapper;
if (cachedRoutePreparedStatementMap.containsKey(each.getPreparedStatement())) {
wrapper = (BatchPreparedStatementExecutorWrapper) cachedRoutePreparedStatementMap.get(each.getPreparedStatement());
} else {
wrapper = new BatchPreparedStatementExecutorWrapper(each.getPreparedStatement(), getParameters(), each.getSqlExecutionUnit());
cachedRoutePreparedStatementMap.put(each.getPreparedStatement(), wrapper);
}
if (each.getDMLExecutionEvent().isPresent()) {
wrapper.getDmlExecutionEvents().add(each.getDMLExecutionEvent().get());
}
}
getGeneratedKeyContext().addRow();
} finally {
clearRouteContext();
resetBatch();
}
}

@Override
public void clearBatch() throws SQLException {
cachedRoutePreparedStatementMap.clear();
clearRouteContext();
private void resetBatch() throws SQLException {
super.clearRouteContext();
clearParameters();
}

@Override
public int[] executeBatch() throws SQLException {
try {
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedRoutePreparedStatementMap.values()).executeBatch();
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedPreparedStatementWrappers).executeBatch();
} finally {
clearBatch();
clearRouteContext();
}
}

private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
List<Object> parameters = getParameters();
List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
SQLRouteResult sqlRouteResult = preparedSQLRouter.route(getParameters());
MergeContext mergeContext = sqlRouteResult.getMergeContext();
Expand All @@ -168,8 +162,24 @@ private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());
replayMethodsInvocation(preparedStatement);
getParameters().replayMethodsInvocation(preparedStatement);
result.add(new PreparedStatementExecutorWrapper(preparedStatement, parameters, each));
result.add(wrap(preparedStatement, each));
}
return result;
}

private PreparedStatementExecutorWrapper wrap(final PreparedStatement preparedStatement, final SQLExecutionUnit sqlExecutionUnit) {
Optional<PreparedStatementExecutorWrapper> wrapperOptional = Iterators.tryFind(cachedPreparedStatementWrappers.iterator(), new Predicate<PreparedStatementExecutorWrapper>() {
@Override
public boolean apply(final PreparedStatementExecutorWrapper input) {
return Objects.equals(input.getPreparedStatement(), preparedStatement);
}
});
if (wrapperOptional.isPresent()) {
wrapperOptional.get().addBatchParameters(getParameters());
return wrapperOptional.get();
}
PreparedStatementExecutorWrapper result = new PreparedStatementExecutorWrapper(preparedStatement, getParameters(), sqlExecutionUnit);
cachedPreparedStatementWrappers.add(result);
return result;
}

Expand Down
Loading

0 comments on commit bc0abfd

Please sign in to comment.