From bac28084c297393f1453a5410a44df3cc927b5bd Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 14 Dec 2022 15:59:53 +0800 Subject: [PATCH 1/6] fix loadbalance --- .../main/java/io/seata/core/rpc/RemotingServer.java | 3 ++- .../core/rpc/netty/AbstractNettyRemotingServer.java | 5 +++-- .../java/io/seata/core/rpc/netty/ChannelManager.java | 4 ++-- .../java/io/seata/server/coordinator/AbstractCore.java | 10 ++++++++-- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/RemotingServer.java b/core/src/main/java/io/seata/core/rpc/RemotingServer.java index 4839e2fc0c5..fa388ed9e48 100644 --- a/core/src/main/java/io/seata/core/rpc/RemotingServer.java +++ b/core/src/main/java/io/seata/core/rpc/RemotingServer.java @@ -37,10 +37,11 @@ public interface RemotingServer { * @param resourceId rm client resourceId * @param clientId rm client id * @param msg transaction message {@code io.seata.core.protocol} + * @param tryOtherApp try other app * @return client result message * @throws TimeoutException TimeoutException */ - Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException; + Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException; /** * server send sync request. diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 880b8f47308..159b9cde6b8 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -63,8 +63,9 @@ public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServ } @Override - public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException { - Channel channel = ChannelManager.getChannel(resourceId, clientId); + public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) + throws TimeoutException { + Channel channel = ChannelManager.getChannel(resourceId, clientId , tryOtherApp); if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } diff --git a/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java b/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java index f5c93c3c56f..86c516774bb 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java +++ b/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java @@ -276,7 +276,7 @@ private static Channel getChannelFromSameClientMap(Map clie * @param clientId Client ID - ApplicationId:IP:Port * @return Corresponding channel, NULL if not found. */ - public static Channel getChannel(String resourceId, String clientId) { + public static Channel getChannel(String resourceId, String clientId,boolean tryOtherApp) { Channel resultChannel = null; String[] clientIdInfo = readClientId(clientId); @@ -381,7 +381,7 @@ public static Channel getChannel(String resourceId, String clientId) { } } - if (resultChannel == null) { + if (resultChannel == null && tryOtherApp) { resultChannel = tryOtherApp(applicationIdMap, targetApplicationId); if (resultChannel == null) { diff --git a/server/src/main/java/io/seata/server/coordinator/AbstractCore.java b/server/src/main/java/io/seata/server/coordinator/AbstractCore.java index 84ab8036b2e..d3a47d89d2f 100644 --- a/server/src/main/java/io/seata/server/coordinator/AbstractCore.java +++ b/server/src/main/java/io/seata/server/coordinator/AbstractCore.java @@ -172,8 +172,9 @@ public BranchStatus branchCommit(GlobalSession globalSession, BranchSession bran protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException { + BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request); + branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); return response.getBranchStatus(); } @@ -196,11 +197,16 @@ public BranchStatus branchRollback(GlobalSession globalSession, BranchSession br protected BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException { + BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request); + branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); return response.getBranchStatus(); } + protected boolean tryOtherApp(BranchSession branchSession){ + return branchSession.getBranchType()==BranchType.AT; + } + @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { From cbca3ce26ba8c9354f381932a08c074bb53ae210 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 14 Dec 2022 16:03:39 +0800 Subject: [PATCH 2/6] fix loadbalance --- .../seata/core/rpc/netty/AbstractNettyRemotingServer.java | 2 +- .../main/java/io/seata/core/rpc/netty/ChannelManager.java | 2 +- .../java/io/seata/server/coordinator/AbstractCore.java | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 159b9cde6b8..4e67446b547 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -65,7 +65,7 @@ public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServ @Override public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException { - Channel channel = ChannelManager.getChannel(resourceId, clientId , tryOtherApp); + Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp); if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } diff --git a/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java b/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java index 86c516774bb..caaabba879b 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java +++ b/core/src/main/java/io/seata/core/rpc/netty/ChannelManager.java @@ -276,7 +276,7 @@ private static Channel getChannelFromSameClientMap(Map clie * @param clientId Client ID - ApplicationId:IP:Port * @return Corresponding channel, NULL if not found. */ - public static Channel getChannel(String resourceId, String clientId,boolean tryOtherApp) { + public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) { Channel resultChannel = null; String[] clientIdInfo = readClientId(clientId); diff --git a/server/src/main/java/io/seata/server/coordinator/AbstractCore.java b/server/src/main/java/io/seata/server/coordinator/AbstractCore.java index d3a47d89d2f..dfc5a855c75 100644 --- a/server/src/main/java/io/seata/server/coordinator/AbstractCore.java +++ b/server/src/main/java/io/seata/server/coordinator/AbstractCore.java @@ -174,7 +174,7 @@ protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSessi BranchSession branchSession) throws IOException, TimeoutException { BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); + branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); return response.getBranchStatus(); } @@ -199,12 +199,12 @@ protected BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalS BranchSession branchSession) throws IOException, TimeoutException { BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); + branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); return response.getBranchStatus(); } - protected boolean tryOtherApp(BranchSession branchSession){ - return branchSession.getBranchType()==BranchType.AT; + protected boolean tryOtherApp(BranchSession branchSession) { + return branchSession.getBranchType() == BranchType.AT; } @Override From a702d69e07752a0db541ac60dc4e0e95b91fd6ff Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 14 Dec 2022 16:11:12 +0800 Subject: [PATCH 3/6] fix test --- .../io/seata/server/coordinator/DefaultCoordinatorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/io/seata/server/coordinator/DefaultCoordinatorTest.java b/server/src/test/java/io/seata/server/coordinator/DefaultCoordinatorTest.java index 39ae7873c02..c83eefdc30c 100644 --- a/server/src/test/java/io/seata/server/coordinator/DefaultCoordinatorTest.java +++ b/server/src/test/java/io/seata/server/coordinator/DefaultCoordinatorTest.java @@ -249,7 +249,8 @@ static Stream xidAndBranchIdProviderForRollback() throws Exception { public static class MockServerMessageSender implements RemotingServer { @Override - public Object sendSyncRequest(String resourceId, String clientId, Object message) throws TimeoutException { + public Object sendSyncRequest(String resourceId, String clientId, Object message, boolean tryOtherApp) + throws TimeoutException { if (message instanceof BranchCommitRequest) { final BranchCommitResponse branchCommitResponse = new BranchCommitResponse(); branchCommitResponse.setBranchStatus(BranchStatus.PhaseTwo_Committed); From f9429632b1c63b5b6c1e9da095d501537f29ef1e Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 14 Dec 2022 16:14:46 +0800 Subject: [PATCH 4/6] branchSession.isAT() --- .../java/io/seata/server/coordinator/AbstractCore.java | 8 ++------ .../main/java/io/seata/server/session/BranchSession.java | 4 ++++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/seata/server/coordinator/AbstractCore.java b/server/src/main/java/io/seata/server/coordinator/AbstractCore.java index dfc5a855c75..9b8f79ab39c 100644 --- a/server/src/main/java/io/seata/server/coordinator/AbstractCore.java +++ b/server/src/main/java/io/seata/server/coordinator/AbstractCore.java @@ -174,7 +174,7 @@ protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSessi BranchSession branchSession) throws IOException, TimeoutException { BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); + branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT()); return response.getBranchStatus(); } @@ -199,14 +199,10 @@ protected BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalS BranchSession branchSession) throws IOException, TimeoutException { BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request, tryOtherApp(branchSession)); + branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT()); return response.getBranchStatus(); } - protected boolean tryOtherApp(BranchSession branchSession) { - return branchSession.getBranchType() == BranchType.AT; - } - @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { diff --git a/server/src/main/java/io/seata/server/session/BranchSession.java b/server/src/main/java/io/seata/server/session/BranchSession.java index 402701f4493..1d6b606ddf6 100644 --- a/server/src/main/java/io/seata/server/session/BranchSession.java +++ b/server/src/main/java/io/seata/server/session/BranchSession.java @@ -297,6 +297,10 @@ public boolean unlock() throws TransactionException { return true; } + public boolean isAT(){ + return this.getBranchType() == BranchType.AT; + } + public LockStatus getLockStatus() { return lockStatus; } From a1766bb342bdbc03fa78c245f043ab07e7c9c747 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Wed, 14 Dec 2022 16:25:05 +0800 Subject: [PATCH 5/6] branchSession.isAT() --- server/src/main/java/io/seata/server/session/BranchSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/seata/server/session/BranchSession.java b/server/src/main/java/io/seata/server/session/BranchSession.java index 1d6b606ddf6..af0bcf63295 100644 --- a/server/src/main/java/io/seata/server/session/BranchSession.java +++ b/server/src/main/java/io/seata/server/session/BranchSession.java @@ -297,7 +297,7 @@ public boolean unlock() throws TransactionException { return true; } - public boolean isAT(){ + public boolean isAT() { return this.getBranchType() == BranchType.AT; } From 165e3aa56b87dfe87b0aac50b50e883377bc1378 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 17 Jan 2023 16:26:32 +0800 Subject: [PATCH 6/6] changes --- changes/en-us/develop.md | 2 ++ changes/zh-cn/develop.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/changes/en-us/develop.md b/changes/en-us/develop.md index 90c5f323cac..e82d886cd5d 100644 --- a/changes/en-us/develop.md +++ b/changes/en-us/develop.md @@ -18,6 +18,7 @@ Add changes here for all PR submitted to the develop branch. - [[#5208](https://github.com/seata/seata/pull/5208)] optimize throwable getCause once more - [[#5212](https://github.com/seata/seata/pull/5212)] optimize log message level - [[#5237](https://github.com/seata/seata/pull/5237)] optimize exception log message print(EnhancedServiceLoader.loadFile#cahtch) +- [[#5153](https://github.com/seata/seata/pull/5153)] Only AT mode try to get channel with other app ### test: @@ -32,5 +33,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [PeppaO](https://github.com/PeppaO) - [yuruixin](https://github.com/yuruixin) - [xingfudeshi](https://github.com/xingfudeshi) +- [Bughue](https://github.com/Bughue) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/develop.md b/changes/zh-cn/develop.md index 9248b04fc5e..511597fe61a 100644 --- a/changes/zh-cn/develop.md +++ b/changes/zh-cn/develop.md @@ -17,6 +17,7 @@ - [[#5208](https://github.com/seata/seata/pull/5208)] 优化多次重复获取Throwable#getCause问题 - [[#5212](https://github.com/seata/seata/pull/5212)] 优化不合理的日志信息级别 - [[#5237](https://github.com/seata/seata/pull/5237)] 优化异常日志打印(EnhancedServiceLoader.loadFile#cahtch) +- [[#5153](https://github.com/seata/seata/pull/5153)] 只允许AT去尝试跨RM获取channel ### test: @@ -31,5 +32,6 @@ - [PeppaO](https://github.com/PeppaO) - [yuruixin](https://github.com/yuruixin) - [xingfudeshi](https://github.com/xingfudeshi) +- [Bughue](https://github.com/Bughue) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。