Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: duplicate image row for update join #5004

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#4479](https://github.com/seata/seata/pull/4479)] TCC mode supports tcc annotation marked on both interface and implementation class
- [[#4877](https://github.com/seata/seata/pull/4877)] seata client support jdk17
- [[#4468](https://github.com/seata/seata/pull/4968)] support kryo 5.3.0
- [[#4914](https://github.com/seata/seata/pull/4914)] support mysql update join sql


### bugfix:
Expand All @@ -24,6 +25,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#4928](https://github.com/seata/seata/pull/4928)] fix rpcContext.getClientRMHolderMap NPE
- [[#4953](https://github.com/seata/seata/pull/4953)] fix InsertOnDuplicateUpdate bypass modify pk
- [[#4978](https://github.com/seata/seata/pull/4978)] fix kryo support circular reference
- [[#5004](https://github.com/seata/seata/pull/5004)] fix duplicate image row for update join


### optimize:
Expand Down Expand Up @@ -77,5 +79,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [tuwenlin](https://github.com/tuwenlin)
- [CrazyLionLi](https://github.com/JavaLionLi)
- [whxxxxx](https://github.com/whxxxxx)
- [renliangyu857](https://github.com/renliangyu857)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 3 additions & 0 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [[#4479](https://github.com/seata/seata/pull/4479)] TCC注解支持添加在实现类及其方法上也生效
- [[#4877](https://github.com/seata/seata/pull/4877)] seata client支持jdk17
- [[#4468](https://github.com/seata/seata/pull/4968)] 支持kryo 5.3.0
- [[#4914](https://github.com/seata/seata/pull/4914)] 支持mysql的update join联表更新语法



Expand All @@ -26,6 +27,7 @@
- [[#4953](https://github.com/seata/seata/pull/4953)] 修复InsertOnDuplicateUpdate可绕过修改主键的问题
- [[#4978](https://github.com/seata/seata/pull/4978)] 修复 kryo 支持循环依赖
- [[#4985](https://github.com/seata/seata/pull/4985)] 修复 undo_log id重复的问题
- [[#5004](https://github.com/seata/seata/pull/5004)] 修复mysql update join行数据重复的问题


### optimize:
Expand Down Expand Up @@ -77,5 +79,6 @@
- [tuwenlin](https://github.com/tuwenlin)
- [CrazyLionLi](https://github.com/JavaLionLi)
- [whxxxxx](https://github.com/whxxxxx)
- [renliangyu857](https://github.com/renliangyu857)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
9 changes: 9 additions & 0 deletions core/src/main/java/io/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ public static long convertVersion(String version) throws IncompatibleVersionExce
return result;
}

public static long convertVersionNotThrowException(String version) {
try {
return convertVersion(version);
} catch (Exception e) {
LOGGER.error("convert version error,version:{}",version,e);
}
return -1;
}

private static long calculatePartValue(String partNumeric, int size, int index) {
return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size - index)).longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.seata.rm.datasource;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -26,6 +28,7 @@
import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.common.ConfigurationKeys;
import io.seata.core.constants.DBType;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.core.model.Resource;
Expand Down Expand Up @@ -60,6 +63,8 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource

private String userName;

private String version;

/**
* Enable the table meta checker
*/
Expand Down Expand Up @@ -109,6 +114,7 @@ private void init(DataSource dataSource, String resourceGroupId) {
} else if (JdbcConstants.MARIADB.equals(dbType)) {
dbType = JdbcConstants.MYSQL;
}
version = selectDbVersion(connection);
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
Expand Down Expand Up @@ -268,4 +274,22 @@ private void initPGResourceId() {
public BranchType getBranchType() {
return BranchType.AT;
}

public String getVersion() {
return version;
}

private String selectDbVersion(Connection connection) {
if (DBType.MYSQL.name().equalsIgnoreCase(dbType)) {
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT VERSION()");
ResultSet versionResult = preparedStatement.executeQuery()) {
if (versionResult.next()) {
return versionResult.getString("VERSION()");
}
} catch (Exception e) {
LOGGER.error("get mysql version fail error: {}", e.getMessage());
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends Ba

protected static final String WHERE = " WHERE ";

protected static final String GROUP_BY = " GROUP BY ";


/**
* Instantiates a new Abstract dml base executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.StringJoiner;

import io.seata.common.util.CollectionUtils;
import io.seata.core.protocol.Version;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import io.seata.rm.datasource.undo.SQLUndoLog;
Expand All @@ -42,16 +43,21 @@
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

import io.seata.sqlparser.util.ColumnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* @author renliangyu857
*/
public class MySQLUpdateJoinExecutor<T, S extends Statement> extends UpdateExecutor<T, S> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLUpdateJoinExecutor.class);
private static final String DOT = ".";
private final Map<String, TableRecords> beforeImagesMap = new LinkedHashMap<>(4);
private final Map<String, TableRecords> afterImagesMap = new LinkedHashMap<>(4);
private final boolean isLowerSupportGroupByPksVersion = Version.convertVersionNotThrowException(getDbVersion()) < Version.convertVersionNotThrowException("5.7.5");
private String sqlMode = "";

/**
* Instantiates a new Update executor.
Expand Down Expand Up @@ -89,6 +95,7 @@ protected TableRecords beforeImage() throws SQLException {
private String buildBeforeImageSQL(String joinTable, String itemTable, List<String> itemTableUpdateColumns,
ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
TableMeta itemTableMeta = getTableMeta(itemTable);
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(joinTable);
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
Expand All @@ -103,9 +110,13 @@ private String buildBeforeImageSQL(String joinTable, String itemTable, List<Stri
if (StringUtils.isNotBlank(limitCondition)) {
suffix.append(" ").append(limitCondition);
}
//maybe duplicate row for select join sql.remove duplicate row by 'group by' condition
suffix.append(GROUP_BY);
List<String> pkColumnNames = getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
suffix.append(buildGroupBy(pkColumnNames,needUpdateColumns));
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoin.add(needUpdateColumn);
}
Expand Down Expand Up @@ -143,11 +154,15 @@ private String buildAfterImageSQL(String joinTable, String itemTable,
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
TableMeta itemTableMeta = getTableMeta(itemTable);
StringBuilder prefix = new StringBuilder("SELECT ");
String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName()), beforeImage.pkRows().size(), getDbType());
List<String> pkColumns = getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName());
String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(pkColumns, beforeImage.pkRows().size(), getDbType());
String suffix = " FROM " + joinTable + " WHERE " + whereSql;
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
//maybe duplicate row for select join sql.remove duplicate row by 'group by' condition
suffix += GROUP_BY;
List<String> itemTableUpdateColumns = getItemUpdateColumns(itemTableMeta, recognizer.getUpdateColumns());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
suffix += buildGroupBy(pkColumns,needUpdateColumns);
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoiner.add(needUpdateColumn);
}
Expand Down Expand Up @@ -214,4 +229,47 @@ protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterI
sqlUndoLog.setAfterImage(afterImage);
return sqlUndoLog;
}

/**
* build group by condition which used for removing duplicate row in select join sql"
*
* @param pkColumns pkColumnsList
* @param allSelectColumns allSelectColumns
* @return return group by condition string.
*/
private String buildGroupBy(List<String> pkColumns,List<String> allSelectColumns) {
boolean groupByPks = true;
//only pks group by is valid when db version >= 5.7.5
try {
if (isLowerSupportGroupByPksVersion) {
if (StringUtils.isEmpty(sqlMode)) {
try (PreparedStatement preparedStatement = statementProxy.getConnection().prepareStatement("SELECT @@SQL_MODE");
ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
sqlMode = resultSet.getString("@@SQL_MODE");
}
}
}
if (sqlMode.contains("ONLY_FULL_GROUP_BY")) {
groupByPks = false;
}
}
} catch (Exception e) {
groupByPks = false;
LOGGER.warn("determine group by pks or all columns error:{}",e.getMessage());
}
List<String> groupByColumns = groupByPks ? pkColumns : allSelectColumns;
StringBuilder groupByStr = new StringBuilder();
for (int i = 0; i < groupByColumns.size(); i++) {
if (i > 0) {
groupByStr.append(",");
}
groupByStr.append(ColumnUtils.addEscape(groupByColumns.get(i),getDbType()));
}
return groupByStr.toString();
}

private String getDbVersion() {
return statementProxy.getConnectionProxy().getDataSourceProxy().getVersion();
}
}