Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: splitting MergedWarpMessage enhances the server parallel processing capability #6807

Open
wants to merge 28 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
57d5155
splitting MergedWarpMessage enhances the server's parallel processing…
funky-eyes Sep 2, 2024
618bbc9
splitting MergedWarpMessage enhances the server's parallel processing…
funky-eyes Sep 3, 2024
35d38ff
fix ut
funky-eyes Sep 3, 2024
c568f6c
Merge branch '2.x' into 0902
funky-eyes Sep 3, 2024
955812f
Merge branch '2.x' into 0902
funky-eyes Sep 4, 2024
ac60802
Merge branch '2.x' into 0902
funky-eyes Sep 8, 2024
fab0c9d
Merge branch '2.x' into 0902
funky-eyes Sep 24, 2024
b51c94e
opt
funky-eyes Sep 24, 2024
6457f6c
add test case
funky-eyes Sep 24, 2024
6567810
Merge branch '2.x' into 0902
funky-eyes Oct 2, 2024
31f6d19
Merge branch '2.x' into 0902
funky-eyes Oct 8, 2024
a06f5be
opt
funky-eyes Oct 8, 2024
e65fcd7
Merge branch '2.x' into 0902
funky-eyes Oct 8, 2024
10ae72a
opt
funky-eyes Oct 9, 2024
2d5521b
Merge remote-tracking branch 'funkye/0902' into 0902
funky-eyes Oct 9, 2024
7e48f02
test
funky-eyes Oct 9, 2024
b31c438
test
funky-eyes Oct 9, 2024
317a7c1
test
funky-eyes Oct 9, 2024
4dd0a00
test
funky-eyes Oct 9, 2024
53fe347
test
funky-eyes Oct 9, 2024
45ce966
test
funky-eyes Oct 9, 2024
a0c61da
test
funky-eyes Oct 9, 2024
4f1a705
bugfix: fix the issue of Codecov not generating reports
funky-eyes Oct 10, 2024
6779277
Merge branch '2.x' into 0902
funky-eyes Oct 10, 2024
73a4bf6
Update build/pom.xml
funky-eyes Oct 10, 2024
fc1d932
Merge branch '2.x' into 0902
funky-eyes Oct 11, 2024
4312df4
Merge branch '2.x' into 0902
funky-eyes Oct 12, 2024
430c9a8
Merge branch '2.x' into 0902
funky-eyes Oct 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6898](https://github.com/apache/incubator-seata/pull/6898)] upgrade npmjs version in saga module
- [[#6879](https://github.com/apache/incubator-seata/pull/6879)] fix log argument mismatch issue
- [[#6902](https://github.com/apache/incubator-seata/pull/6900)] optimize readme docs
- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] splitting MergedWarpMessage enhances the server parallel processing capability
- [[#6905](https://github.com/apache/incubator-seata/pull/6905)] remove incompatible licenses at build time
- [[#6906](https://github.com/apache/incubator-seata/pull/6906)] h2 dependency adds test scope
- [[#6911](https://github.com/apache/incubator-seata/pull/6911)] fix some typos in project
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- [[#6879](https://github.com/apache/incubator-seata/pull/6879)] 修复日志参数不匹配问题
- [[#6898](https://github.com/apache/incubator-seata/pull/6898)] 升级 saga 模块 npmjs 版本
- [[#6902](https://github.com/apache/incubator-seata/pull/6900)] 优化 readme 文档
- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] 分离merge消息使其能完全并行处理
- [[#6905](https://github.com/apache/incubator-seata/pull/6905)] 移除构建期不兼容的 license
- [[#6906](https://github.com/apache/incubator-seata/pull/6906)] h2依赖添加test scope
- [[#6911](https://github.com/apache/incubator-seata/pull/6911)] 修正项目中的部分拼写错误
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/java/org/apache/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class Version {
private static final String CURRENT = VersionInfo.VERSION;
private static final String VERSION_0_7_1 = "0.7.1";
private static final String VERSION_1_5_0 = "1.5.0";
private static final String VERSION_2_3_0 = "2.3.0";
private static final int MAX_VERSION_DOT = 3;

/**
Expand Down Expand Up @@ -86,15 +87,21 @@ public static String getChannelVersion(Channel c) {
* @return true: client version is above or equal version 1.5.0, false: on the contrary
*/
public static boolean isAboveOrEqualVersion150(String version) {
boolean isAboveOrEqualVersion150 = false;
return isAboveOrEqualVersion(version, VERSION_1_5_0);
}

public static boolean isAboveOrEqualVersion230(String version) {
return isAboveOrEqualVersion(version, VERSION_2_3_0);
}

public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) {
boolean isAboveOrEqualVersion = false;
try {
long clientVersion = convertVersion(version);
long divideVersion = convertVersion(VERSION_1_5_0);
isAboveOrEqualVersion150 = clientVersion >= divideVersion;
isAboveOrEqualVersion = convertVersion(clientVersion) >= convertVersion(divideVersion);
} catch (Exception e) {
LOGGER.error("convert version error, clientVersion:{}", version, e);
LOGGER.error("convert version error, clientVersion:{}", clientVersion, e);
}
return isAboveOrEqualVersion150;
return isAboveOrEqualVersion;
}

public static long convertVersion(String version) throws IncompatibleVersionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
*/
protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();

protected final Map<Integer, Integer> childToParentMap = new ConcurrentHashMap<>();

/**
* When batch sending is enabled, the message will be stored to basketMap
* Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable}
Expand Down Expand Up @@ -203,8 +205,15 @@ public void sendAsyncRequest(Channel channel, Object msg) {
RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
if (rpcMessage.getBody() instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
Object body = rpcMessage.getBody();
if (body instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)rpcMessage.getBody());
if (body instanceof MergedWarpMessage) {
Integer parentId = rpcMessage.getId();
for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
childToParentMap.put(msgId, parentId);
}
}
}
super.sendAsync(channel, rpcMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.MergedWarpMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.processor.Pair;
Expand Down Expand Up @@ -270,4 +274,32 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep
}

}

@Override
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// If the client is not version 2.3.0 or higher, splitting MergedWarpMessage will result in the client’s mergeMsgMap not being cleared
if (body instanceof MergedWarpMessage && (StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion230(rpcContext.getVersion()))) {
MergedWarpMessage mergedWarpMessage = (MergedWarpMessage)body;
for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) {
RpcMessage rpcMsg =
buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i));
super.processMessage(ctx, rpcMsg);
}
} else {
super.processMessage(ctx, rpcMessage);
}
}

private RpcMessage buildRequestMessage(AbstractMessage msg, RpcMessage rpcMessage,int id) {
RpcMessage rpcMsg = new RpcMessage();
rpcMsg.setId(id);
rpcMsg.setCodec(rpcMessage.getCodec());
rpcMsg.setCompressor(rpcMessage.getCompressor());
rpcMsg.setBody(msg);
return rpcMsg;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer {

private final AtomicBoolean initialized = new AtomicBoolean(false);

private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
NettyServerConfig.getMaxBranchResultPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("BranchResultHandlerThread", NettyServerConfig.getMaxBranchResultPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void registerProcessor() {
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.common.util.StringUtils.isNotBlank;

/**
* The rm netty client.
*
Expand Down Expand Up @@ -187,7 +189,7 @@ public void init() {
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (org.apache.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
if (isNotBlank(transactionServiceGroup)) {
initConnection();
}
}
Expand Down Expand Up @@ -247,7 +249,7 @@ protected Function<String, NettyPoolKey> getPoolKeyFunction() {
private void registerProcessor() {
// 1.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public class ClientOnResponseProcessor implements RemotingProcessor {
/**
* The Merge msg map from org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMap.
*/
private Map<Integer, MergeMessage> mergeMsgMap;
private final Map<Integer, MergeMessage> mergeMsgMap;

private final Map<Integer, Integer> childToParentMap;

/**
* The Futures from org.apache.seata.core.rpc.netty.AbstractNettyRemoting#futures
Expand All @@ -82,9 +84,10 @@ public class ClientOnResponseProcessor implements RemotingProcessor {
private final TransactionMessageHandler transactionMessageHandler;

public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,
ConcurrentHashMap<Integer, MessageFuture> futures,
ConcurrentHashMap<Integer, MessageFuture> futures, Map<Integer,Integer> childToParentMap,
TransactionMessageHandler transactionMessageHandler) {
this.mergeMsgMap = mergeMsgMap;
this.childToParentMap = childToParentMap;
this.futures = futures;
this.transactionMessageHandler = transactionMessageHandler;
}
Expand All @@ -97,40 +100,52 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
for (int i = 0; i < mergeMessage.msgs.size(); i++) {
int msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
// The old version of the server will return MergeResultMessage, so it is necessary to remove the msgId from the childToParentMap.
childToParentMap.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);
} else {
future.setResultMessage(results.getMsgs()[i]);
}
}
} else if (rpcMessage.getBody() instanceof BatchResultMessage) {
try {
BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
} else {
future.setResultMessage(batchResultMessage.getResultMessages().get(i));
}
BatchResultMessage batchResultMessage = (BatchResultMessage)rpcMessage.getBody();
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
// The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId
// from the childToParentMap.
Integer parentId = childToParentMap.remove(msgId);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,
batchResultMessage.getResultMessages().get(i));
} else {
future.setResultMessage(batchResultMessage.getResultMessages().get(i));
}
} finally {
// In order to be compatible with the old version, in the batch sending of version 1.5.0,
// batch messages will also be placed in the local cache of mergeMsgMap,
// but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap
mergeMsgMap.clear();
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
Integer id = rpcMessage.getId();
try {
MessageFuture messageFuture = futures.remove(id);
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage)rpcMessage.getBody(), null);
}
}
}
} finally {
// In version 2.3.0, the server does not return MergeResultMessage and BatchResultMessage
// so it is necessary to clear childToParentMap and mergeMsgMap here.
Integer parentId = childToParentMap.remove(id);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Map;


@EnabledIfSystemProperty(named = "redisCaseEnabled", matches = "true")
@SpringBootTest
public class RedisVGroupMappingStoreManagerTest {
private RedisVGroupMappingStoreManager redisVGroupMappingStoreManager;
Expand Down
Loading
Loading