From 57d5155308992b2f7cf62947d2ba7443f4808787 Mon Sep 17 00:00:00 2001 From: jianbin Date: Mon, 2 Sep 2024 18:29:45 +0800 Subject: [PATCH 01/16] splitting MergedWarpMessage enhances the server's parallel processing capability --- .../apache/seata/core/protocol/Version.java | 13 ++++++++ .../netty/AbstractNettyRemotingClient.java | 2 ++ .../netty/AbstractNettyRemotingServer.java | 32 +++++++++++++++++++ .../core/rpc/netty/RmNettyRemotingClient.java | 2 +- .../core/rpc/netty/TmNettyRemotingClient.java | 2 +- .../client/ClientOnResponseProcessor.java | 29 ++++++++++++----- 6 files changed, 70 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java b/core/src/main/java/org/apache/seata/core/protocol/Version.java index 12178d8fe05..26b275ec146 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/Version.java +++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java @@ -39,6 +39,7 @@ public class Version { private static final String CURRENT = VersionInfo.VERSION; private static final String VERSION_0_7_1 = "0.7.1"; private static final String VERSION_1_5_0 = "1.5.0"; + private static final String VERSION_2_3_0 = "2.3.0"; private static final int MAX_VERSION_DOT = 3; /** @@ -97,6 +98,18 @@ public static boolean isAboveOrEqualVersion150(String version) { return isAboveOrEqualVersion150; } + public static boolean isAboveOrEqualVersion230(String version) { + boolean isAboveOrEqualVersion230 = false; + try { + long clientVersion = convertVersion(version); + long divideVersion = convertVersion(VERSION_2_3_0); + isAboveOrEqualVersion230 = clientVersion >= divideVersion; + } catch (Exception e) { + LOGGER.error("convert version error, clientVersion:{}", version, e); + } + return isAboveOrEqualVersion230; + } + public static long convertVersion(String version) throws IncompatibleVersionException { if (StringUtils.isBlank(version)) { throw new IllegalArgumentException("The version must not be blank."); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 248e8f48f6d..96d798d427f 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -89,6 +89,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting */ protected final Map mergeMsgMap = new ConcurrentHashMap<>(); + protected final Map childToParentMap = new ConcurrentHashMap<>(); + /** * When batch sending is enabled, the message will be stored to basketMap * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 4b79f20d95c..9319f7bbe67 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -29,9 +29,13 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.HeartbeatMessage; +import org.apache.seata.core.protocol.MergedWarpMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.Version; import org.apache.seata.core.rpc.RemotingServer; import org.apache.seata.core.rpc.RpcContext; import org.apache.seata.core.rpc.processor.Pair; @@ -270,4 +274,32 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep } } + + @Override + protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + Object body = rpcMessage.getBody(); + RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); + // If the client is not version 2.3.0 or higher, splitting MergedWarpMessage will result in the client’s mergeMsgMap not being cleared + if (body instanceof MergedWarpMessage && (StringUtils.isNotBlank(rpcContext.getVersion()) + && Version.isAboveOrEqualVersion230(rpcContext.getVersion()))) { + MergedWarpMessage mergedWarpMessage = (MergedWarpMessage)body; + for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) { + RpcMessage rpcMsg = + buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i)); + processMessage(ctx, rpcMsg); + } + } else { + super.processMessage(ctx, rpcMessage); + } + } + + private RpcMessage buildRequestMessage(AbstractMessage msg, RpcMessage rpcMessage,int id) { + RpcMessage rpcMsg = new RpcMessage(); + rpcMsg.setId(id); + rpcMsg.setCodec(rpcMessage.getCodec()); + rpcMsg.setCompressor(rpcMessage.getCompressor()); + rpcMsg.setBody(msg); + return rpcMsg; + } + } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 92cbafd0a5d..0b9fd0e12bc 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -323,7 +323,7 @@ private void registerProcessor() { super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor); // 4.registry TC response processor ClientOnResponseProcessor onResponseProcessor = - new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); + new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index 68ff739bbb0..b57068e36ea 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -247,7 +247,7 @@ protected Function getPoolKeyFunction() { private void registerProcessor() { // 1.registry TC response processor ClientOnResponseProcessor onResponseProcessor = - new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); + new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null); diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java index f7b44c2e563..aee0a2cd335 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java @@ -71,6 +71,8 @@ public class ClientOnResponseProcessor implements RemotingProcessor { */ private Map mergeMsgMap; + private Map childToParentMap; + /** * The Futures from org.apache.seata.core.rpc.netty.AbstractNettyRemoting#futures */ @@ -82,9 +84,10 @@ public class ClientOnResponseProcessor implements RemotingProcessor { private final TransactionMessageHandler transactionMessageHandler; public ClientOnResponseProcessor(Map mergeMsgMap, - ConcurrentHashMap futures, + ConcurrentHashMap futures, Map childToParentMap, TransactionMessageHandler transactionMessageHandler) { this.mergeMsgMap = mergeMsgMap; + this.childToParentMap = childToParentMap; this.futures = futures; this.transactionMessageHandler = transactionMessageHandler; } @@ -122,15 +125,25 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc mergeMsgMap.clear(); } } else { - MessageFuture messageFuture = futures.remove(rpcMessage.getId()); - if (messageFuture != null) { - messageFuture.setResultMessage(rpcMessage.getBody()); - } else { - if (rpcMessage.getBody() instanceof AbstractResultMessage) { - if (transactionMessageHandler != null) { - transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null); + Integer id = rpcMessage.getId(); + try { + MessageFuture messageFuture = futures.remove(id); + if (messageFuture != null) { + messageFuture.setResultMessage(rpcMessage.getBody()); + } else { + if (rpcMessage.getBody() instanceof AbstractResultMessage) { + if (transactionMessageHandler != null) { + transactionMessageHandler.onResponse((AbstractResultMessage)rpcMessage.getBody(), null); + } } } + } finally { + // In version 2.3.0, the server does not return MergeResultMessage and BatchResultMessage + // so it is necessary to clear childToParentMap and mergeMsgMap here. + Integer parentId = childToParentMap.remove(id); + if (parentId != null) { + mergeMsgMap.remove(parentId); + } } } } From 618bbc95268f964b6aa246845086bdd6307091f8 Mon Sep 17 00:00:00 2001 From: jianbin Date: Tue, 3 Sep 2024 09:42:36 +0800 Subject: [PATCH 02/16] splitting MergedWarpMessage enhances the server's parallel processing capability --- .../apache/seata/core/protocol/Version.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java b/core/src/main/java/org/apache/seata/core/protocol/Version.java index 26b275ec146..f32bb163a69 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/Version.java +++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java @@ -87,27 +87,21 @@ public static String getChannelVersion(Channel c) { * @return true: client version is above or equal version 1.5.0, false: on the contrary */ public static boolean isAboveOrEqualVersion150(String version) { - boolean isAboveOrEqualVersion150 = false; - try { - long clientVersion = convertVersion(version); - long divideVersion = convertVersion(VERSION_1_5_0); - isAboveOrEqualVersion150 = clientVersion >= divideVersion; - } catch (Exception e) { - LOGGER.error("convert version error, clientVersion:{}", version, e); - } - return isAboveOrEqualVersion150; + return isAboveOrEqualVersion(version, VERSION_1_5_0); } public static boolean isAboveOrEqualVersion230(String version) { - boolean isAboveOrEqualVersion230 = false; + return isAboveOrEqualVersion(version, VERSION_2_3_0); + } + + public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) { + boolean isAboveOrEqualVersion = false; try { - long clientVersion = convertVersion(version); - long divideVersion = convertVersion(VERSION_2_3_0); - isAboveOrEqualVersion230 = clientVersion >= divideVersion; + isAboveOrEqualVersion = convertVersion(clientVersion) >= convertVersion(divideVersion); } catch (Exception e) { - LOGGER.error("convert version error, clientVersion:{}", version, e); + LOGGER.error("convert version error, clientVersion:{}", clientVersion, e); } - return isAboveOrEqualVersion230; + return isAboveOrEqualVersion; } public static long convertVersion(String version) throws IncompatibleVersionException { From 35d38ffcfab697be49f54d2a0fc46940e8b587b8 Mon Sep 17 00:00:00 2001 From: jianbin Date: Tue, 3 Sep 2024 09:57:56 +0800 Subject: [PATCH 03/16] fix ut --- .../seata/namingserver/NamingControllerTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java index 0a3ba29034b..8449f03c86c 100644 --- a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java +++ b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java @@ -213,16 +213,16 @@ void mockHeartbeat() throws InterruptedException { String namespace = "public5"; String unitName = String.valueOf(UUID.randomUUID()); NamingServerNode node = new NamingServerNode(); - node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty")); - node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http")); + node.setTransaction(new Node.Endpoint("127.0.0.1", 8094, "netty")); + node.setControl(new Node.Endpoint("127.0.0.1", 7094, "http")); Map meatadata = node.getMetadata(); Map vGroups = new HashMap<>(); vGroups.put("vgroup1",unitName); meatadata.put(CONSTANT_GROUP, vGroups); namingController.registerInstance(namespace, clusterName, unitName, node); NamingServerNode node2 = new NamingServerNode(); - node2.setTransaction(new Node.Endpoint("127.0.0.1", 8092, "netty")); - node2.setControl(new Node.Endpoint("127.0.0.1", 7092, "http")); + node2.setTransaction(new Node.Endpoint("127.0.0.1", 8093, "netty")); + node2.setControl(new Node.Endpoint("127.0.0.1", 7093, "http")); Map meatadata2 = node2.getMetadata(); Map vGroups2 = new HashMap<>(); String unitName2 = UUID.randomUUID().toString(); @@ -260,7 +260,7 @@ void mockHeartbeat() throws InterruptedException { unit = metaResponse.getClusterList().get(0).getUnitData().get(0); Node node1 = unit.getNamingInstanceList().get(0); assertEquals("127.0.0.1", node1.getTransaction().getHost()); - assertEquals(8091, node1.getTransaction().getPort()); + assertEquals(8094, node1.getTransaction().getPort()); } } \ No newline at end of file From b51c94e91cc200046f562325c7368918104924f6 Mon Sep 17 00:00:00 2001 From: jianbin Date: Tue, 24 Sep 2024 18:26:03 +0800 Subject: [PATCH 04/16] opt --- .../seata/core/rpc/netty/AbstractNettyRemotingServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 9319f7bbe67..67df2ea8494 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -286,7 +286,7 @@ protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) { RpcMessage rpcMsg = buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i)); - processMessage(ctx, rpcMsg); + super.processMessage(ctx, rpcMsg); } } else { super.processMessage(ctx, rpcMessage); From 6457f6c5cae7c2acd4cde3788f11d93e56eb439e Mon Sep 17 00:00:00 2001 From: funky-eyes Date: Tue, 24 Sep 2024 20:20:24 +0800 Subject: [PATCH 05/16] add test case --- .../core/rpc/netty/NettyRemotingServer.java | 2 +- .../core/rpc/netty/TmNettyRemotingClient.java | 4 +- .../core/rpc/netty/RmNettyClientTest.java | 116 ++++++++++++++++++ .../core/rpc/netty/TmNettyClientTest.java | 28 ++--- 4 files changed, 134 insertions(+), 16 deletions(-) create mode 100644 test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java index 3e6ec63c15e..3010efc4ae2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java @@ -45,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer { private final AtomicBoolean initialized = new AtomicBoolean(false); - private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(), + private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(), NettyServerConfig.getMaxBranchResultPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory("BranchResultHandlerThread", NettyServerConfig.getMaxBranchResultPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index b57068e36ea..28993b61f77 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -46,6 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.seata.common.util.StringUtils.isNotBlank; + /** * The rm netty client. * @@ -187,7 +189,7 @@ public void init() { registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); - if (org.apache.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) { + if (isNotBlank(transactionServiceGroup)) { initConnection(); } } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java new file mode 100644 index 00000000000..3d62a10214c --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.common.XID; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.UUIDGenerator; +import org.apache.seata.core.protocol.ResultCode; +import org.apache.seata.core.protocol.transaction.BranchRegisterRequest; +import org.apache.seata.core.protocol.transaction.BranchRegisterResponse; +import org.apache.seata.rm.tcc.TCCResourceManager; +import org.apache.seata.saga.engine.db.AbstractServerTest; +import org.apache.seata.server.coordinator.DefaultCoordinator; +import org.apache.seata.server.session.SessionHolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + +public class RmNettyClientTest extends AbstractServerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyClientTest.class); + + @BeforeAll + public static void init(){ + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091"); + } + @AfterAll + public static void after() { + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + } + + public static ThreadPoolExecutor initMessageExecutor() { + return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Test + public void testMergeMsg() throws Exception { + ThreadPoolExecutor workingThreads = initMessageExecutor(); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); + new Thread(() -> { + SessionHolder.init(null); + nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer)); + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8091); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + nettyRemotingServer.init(); + }).start(); + Thread.sleep(3000); + + String applicationId = "app 1"; + String transactionServiceGroup = "default_tx_group"; + RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); + rmNettyRemotingClient.setResourceManager(new TCCResourceManager()); + rmNettyRemotingClient.init(); + rmNettyRemotingClient.getClientChannelManager().initReconnect(transactionServiceGroup, true); + String serverAddress = "0.0.0.0:8091"; + Channel channel = RmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); + Assertions.assertNotNull(channel); + + CountDownLatch latch = new CountDownLatch(3); + for (int i = 0; i < 3; i++) { + CompletableFuture.runAsync(()->{ + BranchRegisterRequest request = new BranchRegisterRequest(); + request.setXid("127.0.0.1:8091:1249853"); + request.setLockKey("lock key testSendMsgWithResponse"); + request.setResourceId("resoutceId1"); + BranchRegisterResponse branchRegisterResponse = null; + try { + branchRegisterResponse = (BranchRegisterResponse) rmNettyRemotingClient.sendSyncRequest(request); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + Assertions.assertNotNull(branchRegisterResponse); + Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode()); + Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]", + branchRegisterResponse.getMsg()); + latch.countDown(); + }); + } + latch.await(10,TimeUnit.SECONDS); + nettyRemotingServer.destroy(); + rmNettyRemotingClient.destroy(); + } + +} diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java index ae723d23800..9a46210aee2 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java @@ -21,10 +21,9 @@ import org.apache.seata.common.ConfigurationTestHelper; import org.apache.seata.common.XID; import org.apache.seata.common.util.NetUtil; -import org.apache.seata.core.protocol.ResultCode; -import org.apache.seata.core.protocol.transaction.BranchRegisterRequest; -import org.apache.seata.core.protocol.transaction.BranchRegisterResponse; -import org.apache.seata.mockserver.MockServer; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.protocol.transaction.GlobalCommitRequest; +import org.apache.seata.core.protocol.transaction.GlobalCommitResponse; import org.apache.seata.saga.engine.db.AbstractServerTest; import org.apache.seata.common.util.UUIDGenerator; import org.apache.seata.server.coordinator.DefaultCoordinator; @@ -40,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -58,7 +58,7 @@ public static void after() { } public static ThreadPoolExecutor initMessageExecutor() { - return new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS, + return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, new LinkedBlockingQueue(20000), new ThreadPoolExecutor.CallerRunsPolicy()); } @@ -176,16 +176,16 @@ public void testSendMsgWithResponse() throws Exception { String serverAddress = "0.0.0.0:8091"; Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); Assertions.assertNotNull(channel); - - BranchRegisterRequest request = new BranchRegisterRequest(); + GlobalCommitRequest request = new GlobalCommitRequest(); request.setXid("127.0.0.1:8091:1249853"); - request.setLockKey("lock key testSendMsgWithResponse"); - request.setResourceId("resoutceId1"); - BranchRegisterResponse branchRegisterResponse = (BranchRegisterResponse) tmNettyRemotingClient.sendSyncRequest(request); - Assertions.assertNotNull(branchRegisterResponse); - Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode()); - Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]", - branchRegisterResponse.getMsg()); + GlobalCommitResponse globalCommitResponse = null; + try { + globalCommitResponse = (GlobalCommitResponse)tmNettyRemotingClient.sendSyncRequest(request); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + Assertions.assertNotNull(globalCommitResponse); + Assertions.assertEquals(GlobalStatus.Finished, globalCommitResponse.getGlobalStatus()); nettyRemotingServer.destroy(); tmNettyRemotingClient.destroy(); } From a06f5beff7bf5eaf223b29ffe7e567a3974b9320 Mon Sep 17 00:00:00 2001 From: jianbin Date: Tue, 8 Oct 2024 15:25:17 +0800 Subject: [PATCH 06/16] opt --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../core/rpc/netty/AbstractNettyRemotingClient.java | 11 +++++++++-- .../processor/client/ClientOnResponseProcessor.java | 8 ++++++-- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 96d8512a2dd..f053b5ac9b0 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -20,6 +20,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6889](https://github.com/apache/incubator-seata/pull/6889)] Correct word spelling errors - [[#6898](https://github.com/apache/incubator-seata/pull/6898)] upgrade npmjs version in saga module - [[#6902](https://github.com/apache/incubator-seata/pull/6900)] optimize readme docs +- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] splitting MergedWarpMessage enhances the server parallel processing capability ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index b100d18320c..971041eec29 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -20,6 +20,7 @@ - [[#6889](https://github.com/apache/incubator-seata/pull/6889)] 修正单词拼写错误 - [[#6898](https://github.com/apache/incubator-seata/pull/6898)] 升级 saga 模块 npmjs 版本 - [[#6902](https://github.com/apache/incubator-seata/pull/6900)] 优化 readme 文档 +- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] 分离merge消息使其能完全并行处理 ### refactor: diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 96d798d427f..c87d5f3aaac 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -205,8 +205,15 @@ public void sendAsyncRequest(Channel channel, Object msg) { RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); - if (rpcMessage.getBody() instanceof MergeMessage) { - mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody()); + Object body = rpcMessage.getBody(); + if (body instanceof MergeMessage) { + mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)rpcMessage.getBody()); + if (body instanceof MergedWarpMessage) { + Integer parentId = rpcMessage.getId(); + for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) { + childToParentMap.put(msgId, parentId); + } + } } super.sendAsync(channel, rpcMessage); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java index aee0a2cd335..565aaac1545 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java @@ -69,9 +69,9 @@ public class ClientOnResponseProcessor implements RemotingProcessor { /** * The Merge msg map from org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMap. */ - private Map mergeMsgMap; + private final Map mergeMsgMap; - private Map childToParentMap; + private final Map childToParentMap; /** * The Futures from org.apache.seata.core.rpc.netty.AbstractNettyRemoting#futures @@ -100,6 +100,8 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc for (int i = 0; i < mergeMessage.msgs.size(); i++) { int msgId = mergeMessage.msgIds.get(i); MessageFuture future = futures.remove(msgId); + // The old version of the server will return MergeResultMessage, so it is necessary to remove the msgId from the childToParentMap. + childToParentMap.remove(msgId); if (future == null) { LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]); } else { @@ -112,6 +114,8 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) { int msgId = batchResultMessage.getMsgIds().get(i); MessageFuture future = futures.remove(msgId); + // The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId from the childToParentMap. + childToParentMap.remove(msgId); if (future == null) { LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i)); } else { From 10ae72a7c71802ba1615a46a75567039c8f5743f Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 11:32:02 +0800 Subject: [PATCH 07/16] opt --- .../client/ClientOnResponseProcessor.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java index 565aaac1545..bb13c664488 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java @@ -109,24 +109,22 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc } } } else if (rpcMessage.getBody() instanceof BatchResultMessage) { - try { - BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody(); - for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) { - int msgId = batchResultMessage.getMsgIds().get(i); - MessageFuture future = futures.remove(msgId); - // The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId from the childToParentMap. - childToParentMap.remove(msgId); - if (future == null) { - LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i)); - } else { - future.setResultMessage(batchResultMessage.getResultMessages().get(i)); - } + BatchResultMessage batchResultMessage = (BatchResultMessage)rpcMessage.getBody(); + for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) { + int msgId = batchResultMessage.getMsgIds().get(i); + MessageFuture future = futures.remove(msgId); + // The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId + // from the childToParentMap. + Integer parentId = childToParentMap.remove(msgId); + if (parentId != null) { + mergeMsgMap.remove(parentId); + } + if (future == null) { + LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, + batchResultMessage.getResultMessages().get(i)); + } else { + future.setResultMessage(batchResultMessage.getResultMessages().get(i)); } - } finally { - // In order to be compatible with the old version, in the batch sending of version 1.5.0, - // batch messages will also be placed in the local cache of mergeMsgMap, - // but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap - mergeMsgMap.clear(); } } else { Integer id = rpcMessage.getId(); From 7e48f02b235aa26b2d71724ad7dd4806949b382e Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 14:30:37 +0800 Subject: [PATCH 08/16] test --- build/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/pom.xml b/build/pom.xml index e0c95be86fb..922cd23f6d5 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -112,7 +112,7 @@ 3.0.0-M3 3.0.0-M5 - 0.8.7 + 0.8.12 2.2.1 3.2.0 From b31c438f0a3f1435db9bec49706506956e986616 Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 15:00:51 +0800 Subject: [PATCH 09/16] test --- build/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/pom.xml b/build/pom.xml index 922cd23f6d5..1c9e656d217 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -109,9 +109,9 @@ 4.0 1.20 3.1.1 - 3.0.0-M3 + 3.5.0 - 3.0.0-M5 + 3.5.1 0.8.12 2.2.1 From 317a7c133f6b9f702d875595e8d25d547eef8bb8 Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 15:04:09 +0800 Subject: [PATCH 10/16] test --- build/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/pom.xml b/build/pom.xml index 1c9e656d217..546c3af561f 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -109,9 +109,9 @@ 4.0 1.20 3.1.1 - 3.5.0 + 3.3.0 - 3.5.1 + 3.3.0 0.8.12 2.2.1 From 4dd0a007553ee6ae8dc15d5a876cf4704d28bce5 Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 15:12:40 +0800 Subject: [PATCH 11/16] test --- build/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/pom.xml b/build/pom.xml index 546c3af561f..08a12856a46 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -109,9 +109,9 @@ 4.0 1.20 3.1.1 - 3.3.0 + 3.2.0 - 3.3.0 + 3.2.0 0.8.12 2.2.1 From 53fe347c6f0de9c359d29e303aee85ff07130e26 Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 15:25:25 +0800 Subject: [PATCH 12/16] test --- build/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/pom.xml b/build/pom.xml index 08a12856a46..9ee50887911 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -109,7 +109,7 @@ 4.0 1.20 3.1.1 - 3.2.0 + 3.2.1 3.2.0 0.8.12 From 45ce9662e9ae65247fe319a6d39300f55d13c52e Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 15:33:54 +0800 Subject: [PATCH 13/16] test --- build/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/pom.xml b/build/pom.xml index 9ee50887911..94065062077 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -111,7 +111,7 @@ 3.1.1 3.2.1 - 3.2.0 + 3.2.1 0.8.12 2.2.1 From a0c61da3facfd56b00499140cd4d9abcfa11231f Mon Sep 17 00:00:00 2001 From: jianbin Date: Wed, 9 Oct 2024 16:41:59 +0800 Subject: [PATCH 14/16] test --- pom.xml | 1 + .../storage/redis/store/RedisVGroupMappingStoreManagerTest.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pom.xml b/pom.xml index 8e78921ba96..d8f24521595 100644 --- a/pom.xml +++ b/pom.xml @@ -386,6 +386,7 @@ test report + report-aggregate diff --git a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java index fac04d66de1..755c10759c6 100644 --- a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java +++ b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java @@ -21,11 +21,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.springframework.boot.test.context.SpringBootTest; import java.util.Map; +@EnabledIfSystemProperty(named = "redisCaseEnabled", matches = "true") @SpringBootTest public class RedisVGroupMappingStoreManagerTest { private RedisVGroupMappingStoreManager redisVGroupMappingStoreManager; From 4f1a705be65989bcf30a91ed629d0ff2823aafc3 Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 10 Oct 2024 09:13:28 +0800 Subject: [PATCH 15/16] bugfix: fix the issue of Codecov not generating reports --- build/pom.xml | 4 ++-- pom.xml | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/build/pom.xml b/build/pom.xml index 94065062077..922cd23f6d5 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -109,9 +109,9 @@ 4.0 1.20 3.1.1 - 3.2.1 + 3.0.0-M3 - 3.2.1 + 3.0.0-M5 0.8.12 2.2.1 diff --git a/pom.xml b/pom.xml index d8f24521595..8e78921ba96 100644 --- a/pom.xml +++ b/pom.xml @@ -386,7 +386,6 @@ test report - report-aggregate From 73a4bf6e9f7d7de6862a6df22a1c2b2ca474a40f Mon Sep 17 00:00:00 2001 From: funkye Date: Thu, 10 Oct 2024 10:32:16 +0800 Subject: [PATCH 16/16] Update build/pom.xml --- build/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/pom.xml b/build/pom.xml index 04718522453..9e26a12f1a1 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -112,7 +112,7 @@ 3.0.0-M3 3.0.0-M5 - 0.8.12 + 0.8.7 2.2.1 3.2.0